Re: SPARK STREAMING PROBLEM
You must start the StreamingContext by calling ssc.start() On Thu, May 28, 2015 at 6:57 PM, Animesh Baranawal animeshbarana...@gmail.com wrote: Hi, I am trying to extract the filenames from which a Dstream is generated by parsing the toDebugString method on RDD I am implementing the following code in spark-shell: import org.apache.spark.streaming.{StreamingContext, Seconds} val ssc = new StreamingContext(sc,Seconds(10)) val lines = ssc.textFileStream(// directory //) def g : List[String] = { var res = List[String]() lines.foreachRDD{ rdd = { if(rdd.count 0){ val files = rdd.toDebugString.split(\n).filter(_.contains(:\)) files.foreach{ ms = { res = ms.split( )(2)::res }} } }} res } g.foreach(x = {println(x); println()}) However when I run the code, nothing gets printed on the console apart from the logs. Am I doing something wrong? And is there any better way to extract the file names from DStream ? Thanks in advance Animesh -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: SPARK STREAMING PROBLEM
The oproblem lies the way you are doing the processing. After the g.foreach(x = {println(x); println()}) are you doing ssc.start. It means till now what you did is just setup the computation stpes but spark has not started any real processing. so when you do g.foreach what it iterates over is the empyt list yopu are returning from the g method hence it does not print anything On Thu, May 28, 2015 at 7:02 PM, Animesh Baranawal animeshbarana...@gmail.com wrote: I also started the streaming context by running ssc.start() but still apart from logs nothing of g gets printed. -- Forwarded message -- From: Animesh Baranawal animeshbarana...@gmail.com Date: Thu, May 28, 2015 at 6:57 PM Subject: SPARK STREAMING PROBLEM To: user@spark.apache.org Hi, I am trying to extract the filenames from which a Dstream is generated by parsing the toDebugString method on RDD I am implementing the following code in spark-shell: import org.apache.spark.streaming.{StreamingContext, Seconds} val ssc = new StreamingContext(sc,Seconds(10)) val lines = ssc.textFileStream(// directory //) def g : List[String] = { var res = List[String]() lines.foreachRDD{ rdd = { if(rdd.count 0){ val files = rdd.toDebugString.split(\n).filter(_.contains(:\)) files.foreach{ ms = { res = ms.split( )(2)::res }} } }} res } g.foreach(x = {println(x); println()}) However when I run the code, nothing gets printed on the console apart from the logs. Am I doing something wrong? And is there any better way to extract the file names from DStream ? Thanks in advance Animesh -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: Best practices on testing Spark jobs
Hi, Can you give some tutorials/examples how to write test case based on the mentioned framework? Thanks, Sourav On Tue, Apr 28, 2015 at 9:22 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Sorry that’s correct, I was thinking you were maybe trying to mock certain aspects of Spark core to write your tests. This is a library to help write unit tests by managing the SparkContext and StreamingContext. So you can test your transformations as necessary. More importantly on the streaming side it really helps simplify running tests on batch outputs. If you’re having serialization issues you may need to look at using transient lazy initializers, to see if that helps? From: Michal Michalski Date: Tuesday, April 28, 2015 at 11:42 AM To: Silvio Fiorito Cc: user Subject: Re: Best practices on testing Spark jobs Thanks Silvio. I might be missing something, but it looks like this project is a kind of a framework for setting up Spark for a testing, but after taking a quick look at the code it doesn't seem like it's solving the problem with mocking which is my main concern now - am I wrong? Kind regards, Michał Michalski, michal.michal...@boxever.com On 28 April 2015 at 16:35, Silvio Fiorito silvio.fior...@granturing.com wrote: Hi Michal, Please try spark-testing-base by Holden. I’ve used it and it works well for unit testing batch and streaming jobs https://github.com/holdenk/spark-testing-base Thanks, Silvio From: Michal Michalski Date: Tuesday, April 28, 2015 at 11:32 AM To: user Subject: Best practices on testing Spark jobs Hi, I have two questions regarding testing Spark jobs: 1. Is it possible to use Mockito for that purpose? I tried to use it, but it looks like there are no interactions with mocks. I didn't dive into the details of how Mockito works, but I guess it might be because of the serialization and how Spark distributes tasks. I'm not sure about it though and I'm looking for confirmation. 2. If not mockito, what's the alternative? What's the recommended way to test Spark jobs? Should I manually create mocks by e.g. extending all the classes I'd normally mock and changing the implementation of some methods? I don't like this idea but I can't really see any other options now. Kind regards, Michał Michalski, michal.michal...@boxever.com -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error
the application using spark-submit tool with *cluster as deploy mode* it *crashes after some time with Out Of Memory error *with single master and worker (driver, master and worker are running in same machine) * 15/04/23 05:47:49 Executor: Exception in task 0.0 in stage 858.0 (TID 1464)* *java.lang.OutOfMemoryError: Java heap space* *at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)* *at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)* *at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)* *at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)* *at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)* *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* *at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)* *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* *at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:606)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* *at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:40)* *at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)* *at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)* *at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)* *at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)* *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* *at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)* *at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:606)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* Thanks, Sourav On Wed, Apr 22, 2015 at 11:40 PM, Tathagata Das t...@databricks.com wrote: It could very well be that your executor memory is not enough to store the state RDDs AND operate on the data. 1G per executor is quite low. Definitely give more memory. And have you tried increasing the number of partitions (specify number of partitions in updateStateByKey) ? On Wed, Apr 22, 2015 at 2:34 AM, Sourav Chandra sourav.chan...@livestream.com wrote: Anyone? On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra sourav.chan...@livestream.com wrote: Hi Olivier, *the update function is as below*: *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, Long)]) = {* * val previousCount = state.getOrElse((0L, 0L))._2* * var startValue: IConcurrentUsers = ConcurrentViewers(0)* * var currentCount = 0L* * val lastIndexOfConcurrentUsers =* *values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])* * val subList = values.slice(0, lastIndexOfConcurrentUsers)* * val currentCountFromSubList = subList.foldLeft(startValue)(_ op _).count + previousCount* * val lastConcurrentViewersCount = values(lastIndexOfConcurrentUsers).count* * if (math.abs(lastConcurrentViewersCount - currentCountFromSubList) = 1) {* *logger.error(* * sCount using state updation $currentCountFromSubList, +* *sConcurrentUsers count $lastConcurrentViewersCount +* *s resetting to $lastConcurrentViewersCount* *)* *currentCount = lastConcurrentViewersCount* * }* * val remainingValuesList = values.diff(subList)* * startValue = ConcurrentViewers(currentCount)* * currentCount = remainingValuesList.foldLeft(startValue)(_ op _).count* * if (currentCount 0) {* *logger.error
Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error
*bump* On Thu, Apr 23, 2015 at 3:46 PM, Sourav Chandra sourav.chan...@livestream.com wrote: HI TD, Some observations: 1. If I submit the application using spark-submit tool with *client as deploy mode* it works fine with single master and worker (driver, master and worker are running in same machine) 2. If I submit the application using spark-submit tool with client as deploy mode it *crashes after some time with StackOverflowError* *single master and 2 workers* (driver, master and 1 worker is running in same machine, other worker is in different machine) *15/04/23 05:42:04 Executor: Exception in task 0.0 in stage 23153.0 (TID 5412)* *java.lang.StackOverflowError* *at java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)* *at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072)* *at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671)* *at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)* *at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)* *at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* *at scala.collection.immutable.$colon$colon.readObject(List.scala:362)* *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:606)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* *at scala.collection.immutable.$colon$colon.readObject(List.scala:362)* *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:606)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.readObject
Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error
Anyone? On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra sourav.chan...@livestream.com wrote: Hi Olivier, *the update function is as below*: *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, Long)]) = {* * val previousCount = state.getOrElse((0L, 0L))._2* * var startValue: IConcurrentUsers = ConcurrentViewers(0)* * var currentCount = 0L* * val lastIndexOfConcurrentUsers =* *values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])* * val subList = values.slice(0, lastIndexOfConcurrentUsers)* * val currentCountFromSubList = subList.foldLeft(startValue)(_ op _).count + previousCount* * val lastConcurrentViewersCount = values(lastIndexOfConcurrentUsers).count* * if (math.abs(lastConcurrentViewersCount - currentCountFromSubList) = 1) {* *logger.error(* * sCount using state updation $currentCountFromSubList, +* *sConcurrentUsers count $lastConcurrentViewersCount +* *s resetting to $lastConcurrentViewersCount* *)* *currentCount = lastConcurrentViewersCount* * }* * val remainingValuesList = values.diff(subList)* * startValue = ConcurrentViewers(currentCount)* * currentCount = remainingValuesList.foldLeft(startValue)(_ op _).count* * if (currentCount 0) {* *logger.error(* * sERROR: Got new count $currentCount 0, value:$values, state:$state, resetting to 0* *)* *currentCount = 0* * }* * // to stop pushing subsequent 0 after receiving first 0* * if (currentCount == 0 previousCount == 0) None* * else Some(previousCount, currentCount)* *}* *trait IConcurrentUsers {* * val count: Long* * def op(a: IConcurrentUsers): IConcurrentUsers = IConcurrentUsers.op(this, a)* *}* *object IConcurrentUsers {* * def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers = (a, b) match {* *case (_, _: ConcurrentViewers) = * * ConcurrentViewers(b.count)* *case (_: ConcurrentViewers, _: IncrementConcurrentViewers) = * * ConcurrentViewers(a.count + b.count)* *case (_: ConcurrentViewers, _: DecrementConcurrentViewers) = * * ConcurrentViewers(a.count - b.count)* * }* *}* *case class IncrementConcurrentViewers(count: Long) extends IConcurrentUsers* *case class DecrementConcurrentViewers(count: Long) extends IConcurrentUsers* *case class ConcurrentViewers(count: Long) extends IConcurrentUsers* *also the error stack trace copied from executor logs is:* *java.lang.OutOfMemoryError: Java heap space* *at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)* *at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)* *at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)* *at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)* *at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)* *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* *at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)* *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* *at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:601)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)* *at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)* *at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)* *at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)* *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* *at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)* *at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:601)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866
Spark Streaming updatyeStateByKey throws OutOfMemory Error
Hi, We are building a spark streaming application which reads from kafka, does updateStateBykey based on the received message type and finally stores into redis. After running for few seconds the executor process get killed by throwing OutOfMemory error. The code snippet is below: *NoOfReceiverInstances = 1* *val kafkaStreams = (1 to NoOfReceiverInstances).map(* * _ = KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)* *)* *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, Long)]) = {...}* *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))* *object RedisHelper {* * private val client = scredis.Redis(* * ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)* * )* * def update(**itr: Iterator[(String, (Long, Long))]) {* *// redis save operation* * }* *}* *Below is the spark configuration:* *spark.app.name http://spark.app.name = XXX* *spark.jars = .jar* *spark.home = /spark-1.1.1-bin-hadoop2.4* *spark.executor.memory = 1g* *spark.streaming.concurrentJobs = 1000* *spark.logConf = true* *spark.cleaner.ttl = 3600 //in milliseconds* *spark.default.parallelism = 12* *spark.executor.extraJavaOptions = -Xloggc:gc.log -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof -XX:+HeapDumpOnOutOfMemoryError* *spark.executor.logs.rolling.strategy = size* *spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB* *spark.executor.logs.rolling.maxRetainedFiles = 10* *spark.serializer = org.apache.spark.serializer.KryoSerializer* *spark.kryo.registrator = xxx.NoOpKryoRegistrator* other configurations are below *streaming {* *// All streaming context related configs should come here* *batch-duration = 1 second* *checkpoint-directory = /tmp* *checkpoint-duration = 10 seconds* *slide-duration = 1 second* *window-duration = 1 second* *partitions-for-shuffle-task = 32* * }* * kafka {* *no-of-receivers = 1* *zookeeper-quorum = :2181* *consumer-group = x* *topic = x:2* * }* We tried different combinations like - with spark 1.1.0 and 1.1.1. - by increasing executor memory - by changing the serialization strategy (switching between kryo and normal java) - by changing broadcast strategy (switching between http and torrent broadcast) Can anyone give any insight what we are missing here? How can we fix this? Due to akka version mismatch with some other libraries we cannot upgrade the spark version. Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
spark logging issue
Hi, I am using spark 1.1.0 and setting below properties while creating spark context. *spark.executor.logs.rolling.maxRetainedFiles = 10* *spark.executor.logs.rolling.size.maxBytes = 104857600* *spark.executor.logs.rolling.strategy = size* Even though I am setting to rollover after 100 MB, the log files (i.e. stderr) file is not rolled over. What am I missing here? -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
[Spark/ Spark Streaming] Spark 1.1.0 fails working with akka 2.3.6
Hi, I have created a spark streaming application based on spark-1.1.0. While running it failed saying akk jar version mismatch. Some projects are using akka 2.3.6 so I have no choice to change the akka version as it will affect others. What should I do? *Caused by: akka.ConfigurationException: Akka JAR version [2.3.6] does not match the provided config version [2.2.3]* * at akka.actor.ActorSystem$Settings.init(ActorSystem.scala:209) ~[analytics-engine.jar:1.0.0]* * at akka.actor.ActorSystemImpl.init(ActorSystem.scala:504) ~[analytics-engine.jar:1.0.0]* * at akka.actor.ActorSystem$.apply(ActorSystem.scala:141) ~[analytics-engine.jar:1.0.0]* * at akka.actor.ActorSystem$.apply(ActorSystem.scala:118) ~[analytics-engine.jar:1.0.0]* * at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) ~[analytics-engine.jar:1.0.0]* * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) ~[analytics-engine.jar:1.0.0]* * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) ~[analytics-engine.jar:1.0.0]* * at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446) ~[analytics-engine.jar:1.0.0]* * at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) ~[analytics-engine.jar:1.0.0]* * at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442) ~[analytics-engine.jar:1.0.0]* * at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) ~[analytics-engine.jar:1.0.0]* * at org.apache.spark.SparkEnv$.create(SparkEnv.scala:153) ~[analytics-engine.jar:1.0.0]* * at org.apache.spark.SparkContext.init(SparkContext.scala:203) ~[analytics-engine.jar:1.0.0]* * at com.livestream.analytics.engine.context.ApplicationContextFactory$.createSimpleContext(ApplicationContext.scala:63) ~[analytics-engine.jar:1.0.0]* * ... 13 common frames omitted* Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Spark Streaming with Kafka is failing with Error
Hi, While running my spark streaming application built on spark 1.1.0 I am getting below error. *14/11/18 15:35:30 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.AbstractMethodError* * at org.apache.spark.Logging$class.log(Logging.scala:52)* * at org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:66)* * at org.apache.spark.Logging$class.logInfo(Logging.scala:59)* * at org.apache.spark.streaming.kafka.KafkaReceiver.logInfo(KafkaInputDStream.scala:66)* * at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:86)* * at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)* * at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)* * at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)* * at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)* * at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)* * at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)* * at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)* * at org.apache.spark.scheduler.Task.run(Task.scala:54)* * at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)* * at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)* * at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)* * at java.lang.Thread.run(Thread.java:722)* Can you guys please help me out here? -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: What's wrong with my spark filter? I get org.apache.spark.SparkException: Task not serializable
It might be due to the object is nested within some class which may not be serializable. Also you can run the appluication using this jvm parameter to see detailed info about serialization -Dsun.io.serialization.extendedDebugInfo=true On Fri, Oct 17, 2014 at 4:07 PM, shahab shahab.mok...@gmail.com wrote: Hi, Probably I am missing very simple principle , but something is wrong with my filter, i get org.apache.spark.SparkException: Task not serializable expetion. here is my filter function: object OBJ { def f1(): Boolean = { var i = 1; for (j-1 to 10) i = i +1; true; } } rdd.filter(row = OBJ.f1()) And when I run, I get the following exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.filter(RDD.scala:282) ... Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ... best, /Shahab -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Questions regarding different spark pre-built packages
Hi, I am just curious to know what are the difference between the prebuilt packages for Hadoop1, 2, CDH etc. I am using spark standalone cluster and we dont use hadoop at all. Can we use any one of the pre-buil;t packages OR we have to run make-distribution.sh script from the code? Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Spark streaming issue
HI, I am facing a weird issue. I am using spark 0.9 and running a streaming application. In the UI, the duration shows order of seconds but if I dig into that particular stage details, it shows total time taken across all tasks for the stage is much much less (in milliseconds) I am using Fair scheduling policy and pool name is counter-metric-persistor. What could the reason for this? *Stage screenshot: Stage 97* 97 counter-metric-persistorhttp://204.77.213.186:4040/stages/pool?poolname=counter-metric-persistor foreach at RealTimeAnalyticsApplication.scala:33http://204.77.213.186:4040/stages/stage?id=972014/05/27 07:22:2314.5 s 6/6 *Stage details screenshot: Stage 97* Details for Stage 97 - *Total task time across all tasks: *154 ms Summary Metrics for 6 Completed Tasks MetricMin 25th percentileMedian 75th percentile Max Result serialization time 0 ms 0 ms 0 ms 0 ms 0 ms Duration 12 ms 13 ms 23 ms 30 ms 54 ms Time spent fetching task results 0 ms 0 ms 0 ms 0 ms 0 ms Scheduler delay 7 ms 7 ms 8 ms 8 ms 8 ms Aggregated Metrics by Executor Executor ID Address Task TimeTotal TasksFailed TasksSucceeded Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk)0ls230-127-p.nyc0.ls.local:53463199 ms6060.0 B0.0 B0.0 B0.0 B Tasks Task IndexTask ID StatusLocality Level ExecutorLaunch Time DurationGC TimeResult Ser TimeErrors0408SUCCESSPROCESS_LOCALls230-127-p.nyc0.ls.local2014/05/27 07:22:3730 ms 1 411 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3722 ms 2 412 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3723 ms 3 414 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3713 ms 4 415 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3712 ms 5 416 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3754 ms Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Spark Streaming Error: SparkException: Error sending message to BlockManagerMaster
Hi, I am running Spark streaming application. I have faced some uncaught exception after which my worker stops processing any further messages. I am using *spark 0.9.0* Could you please let me know what could be the cause of this and how to overcome this issue? [ERROR] [05/22/2014 03:23:26.250] [spark-akka.actor.default-dispatcher-18] [ActorSystem(spark)] exception while executing timer task org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = HeartBeat(BlockManagerId(driver, 10.29.29.226, 36655, 0))] at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:176) at org.apache.spark.storage.BlockManagerMaster.sendHeartBeat(BlockManagerMaster.scala:52) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$heartBeat(BlockManager.scala:97) at org.apache.spark.storage.BlockManager$$anonfun$initialize$1.apply$mcV$sp(BlockManager.scala:135) at akka.actor.Scheduler$$anon$9.run(Scheduler.scala:80) at akka.actor.LightArrayRevolverScheduler$$anon$3$$anon$2.run(Scheduler.scala:241) at akka.actor.LightArrayRevolverScheduler$TaskHolder.run(Scheduler.scala:464) at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:281) at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:280) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at akka.actor.LightArrayRevolverScheduler.close(Scheduler.scala:279) at akka.actor.ActorSystemImpl.stopScheduler(ActorSystem.scala:630) at akka.actor.ActorSystemImpl$$anonfun$_start$1.apply$mcV$sp(ActorSystem.scala:582) at akka.actor.ActorSystemImpl$$anonfun$_start$1.apply(ActorSystem.scala:582) at akka.actor.ActorSystemImpl$$anonfun$_start$1.apply(ActorSystem.scala:582) at akka.actor.ActorSystemImpl$$anon$3.run(ActorSystem.scala:596) at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.runNext$1(ActorSystem.scala:750) at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply$mcV$sp(ActorSystem.scala:753) at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:746) at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:746) at akka.util.ReentrantGuard.withGuard(LockUtil.scala:15) at akka.actor.ActorSystemImpl$TerminationCallbacks.run(ActorSystem.scala:746) at akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:593) at akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:593) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://spark/user/BlockManagerMaster#1305432112]] had already been terminated. at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:161) ... 39 more Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: what does broadcast_0 stand for
Apart from user defined broadcast variable, there are others which is being created by spark. This could be one of those. As I had mentioned you can do a small program where you create a broadcast variable. Check the broadcast variable id(say its x). Then go to the /tmp to open broadcast_x file. You will find the content is serialized output of your variable. On Mon, Apr 28, 2014 at 4:26 PM, wxhsdp wxh...@gmail.com wrote: thank you for your help, Sourav. i found broadcast_0 binary file in /tmp directory. it's size is 33.4kB, not equal to estimated size 135.6 KB. i opened it and found it's content has no relations with my read in file. i guess broadcast_0 is a config file about spark, is that right? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-does-broadcast-0-stand-for-tp4934p4936.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: Access Last Element of RDD
You can use rdd.takeOrdered(1)(reverseOrdrering) reverseOrdering is you Ordering[T] instance where you define the ordering logic. This you have to pass in the method On Thu, Apr 24, 2014 at 11:21 AM, Frank Austin Nothaft fnoth...@berkeley.edu wrote: If you do this, you could simplify to: RDD.collect().last However, this has the problem of collecting all data to the driver. Is your data sorted? If so, you could reverse the sort and take the first. Alternatively, a hackey implementation might involve a mapPartitionsWithIndex that returns an empty iterator for all partitions except for the last. For the last partition, you would filter all elements except for the last element in your iterator. This should leave one element, which is your last element. Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Apr 23, 2014, at 10:44 PM, Adnan Yaqoob nsyaq...@gmail.com wrote: This function will return scala List, you can use List's last function to get the last element. For example: RDD.take(RDD.count()).last On Thu, Apr 24, 2014 at 10:28 AM, Sai Prasanna ansaiprasa...@gmail.comwrote: Adnan, but RDD.take(RDD.count()) returns all the elements of the RDD. I want only to access the last element. On Thu, Apr 24, 2014 at 10:33 AM, Sai Prasanna ansaiprasa...@gmail.comwrote: Oh ya, Thanks Adnan. On Thu, Apr 24, 2014 at 10:30 AM, Adnan Yaqoob nsyaq...@gmail.comwrote: You can use following code: RDD.take(RDD.count()) On Thu, Apr 24, 2014 at 9:51 AM, Sai Prasanna ansaiprasa...@gmail.comwrote: Hi All, Some help ! RDD.first or RDD.take(1) gives the first item, is there a straight forward way to access the last element in a similar way ? I coudnt fine a tail/last method for RDD. !! -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: Access Last Element of RDD
Also same thing can be done using rdd.top(1)(reverseOrdering) On Thu, Apr 24, 2014 at 11:28 AM, Sourav Chandra sourav.chan...@livestream.com wrote: You can use rdd.takeOrdered(1)(reverseOrdrering) reverseOrdering is you Ordering[T] instance where you define the ordering logic. This you have to pass in the method On Thu, Apr 24, 2014 at 11:21 AM, Frank Austin Nothaft fnoth...@berkeley.edu wrote: If you do this, you could simplify to: RDD.collect().last However, this has the problem of collecting all data to the driver. Is your data sorted? If so, you could reverse the sort and take the first. Alternatively, a hackey implementation might involve a mapPartitionsWithIndex that returns an empty iterator for all partitions except for the last. For the last partition, you would filter all elements except for the last element in your iterator. This should leave one element, which is your last element. Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Apr 23, 2014, at 10:44 PM, Adnan Yaqoob nsyaq...@gmail.com wrote: This function will return scala List, you can use List's last function to get the last element. For example: RDD.take(RDD.count()).last On Thu, Apr 24, 2014 at 10:28 AM, Sai Prasanna ansaiprasa...@gmail.comwrote: Adnan, but RDD.take(RDD.count()) returns all the elements of the RDD. I want only to access the last element. On Thu, Apr 24, 2014 at 10:33 AM, Sai Prasanna ansaiprasa...@gmail.comwrote: Oh ya, Thanks Adnan. On Thu, Apr 24, 2014 at 10:30 AM, Adnan Yaqoob nsyaq...@gmail.comwrote: You can use following code: RDD.take(RDD.count()) On Thu, Apr 24, 2014 at 9:51 AM, Sai Prasanna ansaiprasa...@gmail.com wrote: Hi All, Some help ! RDD.first or RDD.take(1) gives the first item, is there a straight forward way to access the last element in a similar way ? I coudnt fine a tail/last method for RDD. !! -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: about rdd.filter()
This could happen if variable is defined in such a way that it pulls its own class reference into the closure. Hence serilization tries to serialize the whole outer class reference which is not serializable and whole thing failed. On Wed, Apr 23, 2014 at 3:15 PM, randylu randyl...@gmail.com wrote: my code is like: rdd2 = rdd1.filter(_._2.length 1) rdd2.collect() it works well, but if i use a variable /num/ instead of 1: var num = 1 rdd2 = rdd1.filter(_._2.length num) rdd2.collect() it fails at rdd2.collect() so strange? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/about-rdd-filter-tp4657.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: Change print() in JavaNetworkWordCount
You can extend DStream and override print() method. Then you can create your own DSTream extending from this. On Tue, Mar 25, 2014 at 6:07 PM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi Guys, I think that I already did this question, but I don't remember if anyone has answered me. I would like changing in the function print() the quantity of words and the frequency number that are sent to driver's screen. The default value is 10. Anyone could help me with this? Best Regards -- Informativa sulla Privacy: http://www.unibs.it/node/8155 -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
spark executor/driver log files management
Hi, I have few questions regarding log file management in spark: 1. Currently I did not find any way to modify the lof file name for executor/drivers). Its hardcoded as stdout and stderr. Also there is no log rotation. In case of streaming application this will grow forever and become unmanageable. Is there any way to overcome this? Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Spark usage patterns and questions
Hi, I have some questions regarding usage patterns and debugging in spark/spark streaming. 1. What is some used design patterns of using broadcast variable? In my application i created some and also created a scheduled task which periodically refreshes the variables. I want to know how efficiently and in modular way people generally achieve this? 2. Sometimes a uncaught exception in driver program/worker does not get traced anywhere? How can we debug this? 3. In our usecase we read from Kafka, do some mapping and lastly persists data to cassandra as well as pushes the data over remote actor for realtime update in dashboard. I used below approaches - First tried to use vary naive way like stream.map(...).foreachRDD( pushes to actor) It does not work and stage failed saying akka exception - Second tried to use akka.serialization.JavaSerilizer.withSystem(system){...} approach It does not work and stage failed BUT without any trace anywhere in lofs - Finally did rdd.collect to collect the output into driver and then pushes to actor It worked. I would like to know is there any efficient way of achieving this sort of usecases 4. Sometimes I see failed stages but when opened those stage details it said stage did not start. What does this mean? Looking forward for some interesting responses :) Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: [External] Re: no stdout output from worker
Hi Ranjan, Whatever code is being passed as closure to spark operations like map, flatmap, filter etc are part of task All others are in driver. Thanks, Sourav On Mon, Mar 10, 2014 at 12:03 PM, Sen, Ranjan [USA] sen_ran...@bah.comwrote: Hi Patrick How do I know which part of the code is in the driver and which in task? The structure of my code is as below- Š Static boolean done=false; Š Public static void main(.. .. JavaRDDString lines = .. .. While (!done) { .. While (..) { JavaPairRDDInteger, ListInteger labs1 = labs.map (new PairFunctionŠ ); !! Here I have System.out.println (A) } // inner while !! Here I have System.out.println (B) If (Š) { Done = true; !! Also here some System.out.println (C) Break; } Else { If (Š) { !! More System.out.println (D) labs = labs.map(Š) ; } } } // outer while !! Even more System.out.println (E) } // main } //class I get the console outputs on the master for (B) and (E). I do not see any stdout in the worker node. I find the stdout and stderr in the spark/work/appid/0/. I see output in stderr but not in stdout. I do get all the outputs on the console when I run it in local mode. Sorry I am new and may be asking some naïve question but it is really confusing to me. Thanks for your help. Ranjan On 3/9/14, 10:50 PM, Patrick Wendell pwend...@gmail.com wrote: Hey Sen, Is your code in the driver code or inside one of the tasks? If it's in the tasks, the place you would expect these to be is in stdout file under spark/appid/work/[stdout/stderr]. Are you seeing at least stderr logs in that folder? If not then the tasks might not be running on the workers machines. If you see stderr but not stdout that's a bit of a puzzler since they both go through the same mechanism. - Patrick On Sun, Mar 9, 2014 at 2:32 PM, Sen, Ranjan [USA] sen_ran...@bah.com wrote: Hi I have some System.out.println in my Java code that is working ok in a local environment. But when I run the same code on a standalone mode in a EC2 cluster I do not see them at the worker stdout (in the worker node under spark location/work ) or at the driver console. Could you help me understand how do I troubleshoot? Thanks Ranjan -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com