How to change akka.remote.startup-timeout in spark

2016-04-21 Thread yuemeng (A)
When I run a spark application,sometimes I get follow ERROR:
16/04/21 09:26:45 ERROR SparkContext: Error initializing SparkContext.
java.util.concurrent.TimeoutException: Futures timed out after [1 
milliseconds]
 at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at akka.remote.Remoting.start(Remoting.scala:180)
 at 
akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
 at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
 at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
 at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
 at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
 at 
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)
 at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
 at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
 at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1995)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1986)
 at 
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
 at 
org.apache.spark.rpc.akka.AkkaRpcEnvFactory.create(AkkaRpcEnv.scala:245)


AND I track the code ,I think if we update akka.remote.startup-timeout mabe 
solve this problem,but I can’t find any way to change this,
Do anybody met this problem and know how to change akka config in spark?
Thanks a lot

岳猛(Rick) 00277916
大数据技术开发部
*

[cid:image012.jpg@01D0D9C8.DDEDCC20]文档包

[cid:image009.png@01D0DA69.58E5C9A0]培训中心

[cid:image010.png@01D0DA69.58E5C9A0]案例库

  中软大数据3ms团队: http://3ms.huawei.com/hi/group/2031037





how to change akka.remote.startup-timeout value in spark

2016-04-21 Thread yuemeng (A)
When I run a spark application ,sometimes I will get follow error:
16/04/21 09:26:45 ERROR SparkContext: Error initializing SparkContext.
java.util.concurrent.TimeoutException: Futures timed out after [1 
milliseconds]
 at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at akka.remote.Remoting.start(Remoting.scala:180)
 at 
akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
 at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
 at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
 at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
 at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
 at 
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)
 at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
 at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
 at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1995)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1986)
 at 
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
 at 
org.apache.spark.rpc.akka.AkkaRpcEnvFactory.create(AkkaRpcEnv.scala:245)

And,I find maybe we can increase akka.remote.startup-timeout vaule to solve 
this problem after I track the code,but I can't find any way to change this 
value
Can we have some methods to change akka config in spark,or some good way to 
solve this problem
Any advise will be thankful
Thanks a lot




Spark streaming applicaiton don't generate Jobs after run a week ,At last,it throw oom exeception

2016-04-14 Thread yuemeng (A)
@All

There is a strange problem,I had been running a spark streaming application for 
long time,follow is the application info:


1)   Fetch data from kafka use dricet api

2)   Use sql to write each rdd data of Dstream into redis

3)   Read data from redis

Everything seems ok during one week ,after one week,this application don't 
generate jobs any more ,only print follow info in driver log after last jobs 
generated,the


16/04/14 10:37:49 INFO JobScheduler: Added jobs for time 146060068 ms

16/04/14 10:37:51 INFO MetadataCleaner: Ran metadata cleaner for 
MAP_OUTPUT_TRACKER
16/04/14 10:37:51 INFO MetadataCleaner: Ran metadata cleaner for SPARK_CONTEXT
16/04/14 10:37:51 INFO BlockManager: Dropping non broadcast blocks older than 
1460601351512
16/04/14 10:37:51 INFO BlockManager: Dropping broadcast blocks older than 
1460601351512
16/04/14 10:37:51 INFO MetadataCleaner: Ran metadata cleaner for BROADCAST_VARS
16/04/14 10:37:51 INFO MetadataCleaner: Ran metadata cleaner for BLOCK_MANAGER
16/04/14 10:37:51 INFO SparkContext: Starting job: transform at 
BindCard.scala:44
16/04/14 10:38:03 INFO BlockManager: Dropping non broadcast blocks older than 
1460601363512
16/04/14 10:38:03 INFO MetadataCleaner: Ran metadata cleaner for BLOCK_MANAGER
16/04/14 10:38:03 INFO MetadataCleaner: Ran metadata cleaner for 
MAP_OUTPUT_TRACKER
16/04/14 10:38:03 INFO MetadataCleaner: Ran metadata cleaner for SPARK_CONTEXT
16/04/14 10:38:03 INFO BlockManager: Dropping broadcast blocks older than 
1460601363513
16/04/14 10:38:03 INFO MetadataCleaner: Ran metadata cleaner for BROADCAST_VARS
16/04/14 10:38:15 INFO BlockManager: Dropping non broadcast blocks older than 
1460601375512
16/04/14 10:38:15 INFO MetadataCleaner: Ran metadata cleaner for BLOCK_MANAGER
16/04/14 10:38:15 INFO MetadataCleaner: Ran metadata cleaner for 
MAP_OUTPUT_TRACKER
16/04/14 10:38:15 INFO MetadataCleaner: Ran metadata cleaner for SPARK_CONTEXT
16/04/14 10:38:15 INFO BlockManager: Dropping broadcast blocks older than 
1460601375513
16/04/14 10:38:15 INFO MetadataCleaner: Ran metadata cleaner for BROADCAST_VARS
16/04/14 10:38:27 INFO BlockManager: Dropping non broadcast blocks older than 
1460601387513

Anyone had met this problem?
Can anyone give me some advise for this issue,any possible reasons?
I suspect  wether the cpu for driver is all used for Full GC,no time for job 
generate.








岳猛(Rick) 00277916
大数据技术开发部
*

[cid:image012.jpg@01D0D9C8.DDEDCC20]文档包

[cid:image009.png@01D0DA69.58E5C9A0]培训中心

[cid:image010.png@01D0DA69.58E5C9A0]案例库

  中软大数据3ms团队: http://3ms.huawei.com/hi/group/2031037





Spark stream test throw org.apache.spark.SparkException: Task not serializable when execute in spark shell

2015-06-24 Thread yuemeng (A)
hi ,all

there two examples one is throw Task not serializable when execute in spark 
shell,the other one is ok,i am very puzzled,can anyone give what's different 
about this two code and why the other is ok

1.The one which throw Task not serializable :

import org.apache.spark._
import SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.broadcast._



@transient val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.textFileStream(/a.log)





val testFun = (line:String) = {
if ((line.contains( ERROR)) || (line.startsWith(Spark))){
true
}
else{
false
}
}

val p_date_bc = sc.broadcast(^\\w+ \\wfile://\\w+ \\dfile://\\d+ 
\\d{2}:\\d{2}:\\d{2file://\\d{2}:\\d{2}:\\d{2} \\d{4}.rfile://\\d{4}.r)
val p_ORA_bc = sc.broadcast(^ORA-\\d+.+.r)
val A = (iter: 
Iterator[String],data_bc:Broadcast[scala.util.matching.Regex],ORA_bc:Broadcast[scala.util.matching.Regex])
 = {
val p_date = data_bc.value
val p_ORA = ORA_bc.value
var res = List[String]()
var lasttime = 

while (iter.hasNext) {
val line = iter.next.toString
val currentcode = p_ORA findFirstIn line getOrElse null
if (currentcode != null){
res ::= lasttime +  |  + currentcode
}else{
val currentdate = p_date findFirstIn line getOrElse null
if (currentdate != null){
lasttime = currentdate
}
}
}
res.iterator
}

val cdd = lines.filter(testFun).mapPartitions(x = A(x,p_date_bc,p_ORA_bc))  
//org.apache.spark.SparkException: Task not serializable



2.The other one is ok:



import org.apache.spark._
import SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.broadcast._



val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.textFileStream(/a.log)





val testFun = (line:String) = {
if ((line.contains( ERROR)) || (line.startsWith(Spark))){
true
}
else{
false
}
}



val A = (iter: Iterator[String]) = {

var res = List[String]()
var lasttime = 
while (iter.hasNext) {
val line = iter.next.toString
val currentcode = ^\\w+ \\wfile://\\w+ \\dfile://\\d+ 
\\d{2}:\\d{2}:\\d{2file://\\d{2}:\\d{2}:\\d{2} 
\\d{4}.r.findFirstIn(line).getOrElse(nullfile://\\d{4}.r.findFirstIn(line).getOrElse(null)
if (currentcode != null){
res ::= lasttime +  |  + currentcode
}else{
 val currentdate = 
^ORA-\\d+.+.r.findFirstIn(line).getOrElse(null)
if (currentdate != null){
lasttime = currentdate
}
}
}
res.iterator
}



val cdd= lines.filter(testFun).mapPartitions(A)










how to change /tmp folder for spark ut use sbt

2015-06-18 Thread yuemeng (A)
hi,all

if i want to change the /tmp folder to any other folder for spark ut use 
sbt,how can i do?