Re: SPARK STREAMING PROBLEM

2015-05-28 Thread Sourav Chandra
You must start the StreamingContext by calling ssc.start()

On Thu, May 28, 2015 at 6:57 PM, Animesh Baranawal 
animeshbarana...@gmail.com wrote:

 Hi,

 I am trying to extract the filenames from which a Dstream is generated by
 parsing the toDebugString method on RDD
 I am implementing the following code in spark-shell:

 import org.apache.spark.streaming.{StreamingContext, Seconds}
 val ssc = new StreamingContext(sc,Seconds(10))
 val lines = ssc.textFileStream(// directory //)

 def g : List[String] = {
var res = List[String]()
lines.foreachRDD{ rdd = {
   if(rdd.count  0){
   val files = rdd.toDebugString.split(\n).filter(_.contains(:\))
   files.foreach{ ms = {
  res = ms.split( )(2)::res
   }}   }
}}
res
 }

 g.foreach(x = {println(x); println()})

 However when I run the code, nothing gets printed on the console apart
 from the logs. Am I doing something wrong?
 And is there any better way to extract the file names from DStream ?

 Thanks in advance


 Animesh




-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: SPARK STREAMING PROBLEM

2015-05-28 Thread Sourav Chandra
The oproblem lies the way you are doing the processing.

After the g.foreach(x = {println(x); println()}) are you
doing ssc.start. It means till now what you did is just setup the
computation stpes but spark has not started any real processing. so when
you do g.foreach what it iterates over is the empyt list yopu are returning
from the g method hence it does not print anything

On Thu, May 28, 2015 at 7:02 PM, Animesh Baranawal 
animeshbarana...@gmail.com wrote:

 I also started the streaming context by running ssc.start() but still
 apart from logs nothing of g gets printed.

 -- Forwarded message --
 From: Animesh Baranawal animeshbarana...@gmail.com
 Date: Thu, May 28, 2015 at 6:57 PM
 Subject: SPARK STREAMING PROBLEM
 To: user@spark.apache.org


 Hi,

 I am trying to extract the filenames from which a Dstream is generated by
 parsing the toDebugString method on RDD
 I am implementing the following code in spark-shell:

 import org.apache.spark.streaming.{StreamingContext, Seconds}
 val ssc = new StreamingContext(sc,Seconds(10))
 val lines = ssc.textFileStream(// directory //)

 def g : List[String] = {
var res = List[String]()
lines.foreachRDD{ rdd = {
   if(rdd.count  0){
   val files = rdd.toDebugString.split(\n).filter(_.contains(:\))
   files.foreach{ ms = {
  res = ms.split( )(2)::res
   }}   }
}}
res
 }

 g.foreach(x = {println(x); println()})

 However when I run the code, nothing gets printed on the console apart
 from the logs. Am I doing something wrong?
 And is there any better way to extract the file names from DStream ?

 Thanks in advance


 Animesh





-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: Best practices on testing Spark jobs

2015-04-28 Thread Sourav Chandra
Hi,

Can you give some tutorials/examples how to write test case based on the
mentioned framework?

Thanks,
Sourav

On Tue, Apr 28, 2015 at 9:22 PM, Silvio Fiorito 
silvio.fior...@granturing.com wrote:

  Sorry that’s correct, I was thinking you were maybe trying to mock
 certain aspects of Spark core to write your tests. This is a library to
 help write unit tests by managing the SparkContext and StreamingContext. So
 you can test your transformations as necessary. More importantly on the
 streaming side it really helps simplify running tests on batch outputs.

  If you’re having serialization issues you may need to look at using
 transient lazy initializers, to see if that helps?

   From: Michal Michalski
 Date: Tuesday, April 28, 2015 at 11:42 AM
 To: Silvio Fiorito
 Cc: user
 Subject: Re: Best practices on testing Spark jobs

   Thanks Silvio. I might be missing something, but it looks like this
 project is a kind of a framework for setting up Spark for a testing, but
 after taking a quick look at the code it doesn't seem like it's solving the
 problem with mocking which is my main concern now - am I wrong?

  Kind regards,
 Michał Michalski,
 michal.michal...@boxever.com

 On 28 April 2015 at 16:35, Silvio Fiorito silvio.fior...@granturing.com
 wrote:

   Hi Michal,

  Please try spark-testing-base by Holden. I’ve used it and it works well
 for unit testing batch and streaming jobs

  https://github.com/holdenk/spark-testing-base

  Thanks,
 Silvio

   From: Michal Michalski
 Date: Tuesday, April 28, 2015 at 11:32 AM
 To: user
 Subject: Best practices on testing Spark jobs

   Hi,

  I have two questions regarding testing Spark jobs:

  1. Is it possible to use Mockito for that purpose? I tried to use it,
 but it looks like there are no interactions with mocks. I didn't dive into
 the details of how Mockito works, but I guess it might be because of the
 serialization and how Spark distributes tasks. I'm not sure about it though
 and I'm looking for confirmation.

  2. If not mockito, what's the alternative? What's the recommended way
 to test Spark jobs? Should I manually create mocks by e.g. extending all
 the classes I'd normally mock and changing the implementation of some
 methods? I don't like this idea but I can't really see any other options
 now.


  Kind regards,
 Michał Michalski,
 michal.michal...@boxever.com





-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-23 Thread Sourav Chandra
 the application using spark-submit tool with *cluster as
deploy mode* it *crashes after some time with Out Of Memory error  *with
single master and worker (driver, master and worker are running in same
machine)

*   15/04/23 05:47:49 Executor: Exception in task 0.0 in stage 858.0 (TID
1464)*
*java.lang.OutOfMemoryError: Java heap space*
*at
org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)*
*at
org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)*
*at
org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)*
*at
org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)*
*at
org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)*
*at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
*at
org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)*
*at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
*at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
*at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
*at java.lang.reflect.Method.invoke(Method.java:606)*
*at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
*at
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:40)*
*at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)*
*at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)*
*at
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)*
*at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)*
*at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
*at
org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)*
*at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)*
*at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
*at java.lang.reflect.Method.invoke(Method.java:606)*
*at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
*at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
*at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
*at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
*at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*

Thanks,
Sourav

On Wed, Apr 22, 2015 at 11:40 PM, Tathagata Das t...@databricks.com wrote:

 It could very well be that your executor memory is not enough to store the
 state RDDs AND operate on the data. 1G per executor is quite low.
 Definitely give more memory. And have you tried increasing the number of
 partitions (specify number of partitions in updateStateByKey) ?

 On Wed, Apr 22, 2015 at 2:34 AM, Sourav Chandra 
 sourav.chan...@livestream.com wrote:

 Anyone?

 On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra 
 sourav.chan...@livestream.com wrote:

  Hi Olivier,
 
  *the update function is as below*:
 
  *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
  Long)]) = {*
  *  val previousCount = state.getOrElse((0L, 0L))._2*
  *  var startValue: IConcurrentUsers = ConcurrentViewers(0)*
  *  var currentCount = 0L*
  *  val lastIndexOfConcurrentUsers =*
  *values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])*
  *  val subList = values.slice(0, lastIndexOfConcurrentUsers)*
  *  val currentCountFromSubList = subList.foldLeft(startValue)(_ op
  _).count + previousCount*
  *  val lastConcurrentViewersCount =
  values(lastIndexOfConcurrentUsers).count*
 
  *  if (math.abs(lastConcurrentViewersCount -
 currentCountFromSubList)
  = 1) {*
  *logger.error(*
  *  sCount using state updation $currentCountFromSubList,  +*
  *sConcurrentUsers count $lastConcurrentViewersCount +*
  *s resetting to $lastConcurrentViewersCount*
  *)*
  *currentCount = lastConcurrentViewersCount*
  *  }*
  *  val remainingValuesList = values.diff(subList)*
  *  startValue = ConcurrentViewers(currentCount)*
  *  currentCount = remainingValuesList.foldLeft(startValue)(_ op
  _).count*
 
  *  if (currentCount  0) {*
 
  *logger.error

Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-23 Thread Sourav Chandra
*bump*

On Thu, Apr 23, 2015 at 3:46 PM, Sourav Chandra 
sourav.chan...@livestream.com wrote:

 HI TD,

 Some observations:

 1. If I submit the application using spark-submit tool with *client as
 deploy mode* it works fine with single master and worker (driver, master
 and worker are running in same machine)
 2. If I submit the application using spark-submit tool with client as
 deploy mode it *crashes after some time with  StackOverflowError* *single
 master and 2 workers* (driver, master and 1 worker is running in same
 machine, other
 worker is in different machine)
  *15/04/23 05:42:04 Executor: Exception in task 0.0 in stage 23153.0
 (TID 5412)*
 *java.lang.StackOverflowError*
 *at
 java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)*
 *at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072)*
 *at
 java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671)*
 *at
 java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)*
 *at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)*
 *at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
 *at
 scala.collection.immutable.$colon$colon.readObject(List.scala:362)*
 *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
 *at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
 *at java.lang.reflect.Method.invoke(Method.java:606)*
 *at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)*
 *at
 scala.collection.immutable.$colon$colon.readObject(List.scala:362)*
 *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)*
 *at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
 *at java.lang.reflect.Method.invoke(Method.java:606)*
 *at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)*
 *at
 java.io.ObjectInputStream.readObject

Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-22 Thread Sourav Chandra
Anyone?

On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra 
sourav.chan...@livestream.com wrote:

 Hi Olivier,

 *the update function is as below*:

 *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
 Long)]) = {*
 *  val previousCount = state.getOrElse((0L, 0L))._2*
 *  var startValue: IConcurrentUsers = ConcurrentViewers(0)*
 *  var currentCount = 0L*
 *  val lastIndexOfConcurrentUsers =*
 *values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])*
 *  val subList = values.slice(0, lastIndexOfConcurrentUsers)*
 *  val currentCountFromSubList = subList.foldLeft(startValue)(_ op
 _).count + previousCount*
 *  val lastConcurrentViewersCount =
 values(lastIndexOfConcurrentUsers).count*

 *  if (math.abs(lastConcurrentViewersCount - currentCountFromSubList)
 = 1) {*
 *logger.error(*
 *  sCount using state updation $currentCountFromSubList,  +*
 *sConcurrentUsers count $lastConcurrentViewersCount +*
 *s resetting to $lastConcurrentViewersCount*
 *)*
 *currentCount = lastConcurrentViewersCount*
 *  }*
 *  val remainingValuesList = values.diff(subList)*
 *  startValue = ConcurrentViewers(currentCount)*
 *  currentCount = remainingValuesList.foldLeft(startValue)(_ op
 _).count*

 *  if (currentCount  0) {*

 *logger.error(*
 *  sERROR: Got new count $currentCount  0, value:$values,
 state:$state, resetting to 0*
 *)*
 *currentCount = 0*
 *  }*
 *  // to stop pushing subsequent 0 after receiving first 0*
 *  if (currentCount == 0  previousCount == 0) None*
 *  else Some(previousCount, currentCount)*
 *}*

 *trait IConcurrentUsers {*
 *  val count: Long*
 *  def op(a: IConcurrentUsers): IConcurrentUsers =
 IConcurrentUsers.op(this, a)*
 *}*

 *object IConcurrentUsers {*
 *  def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers =
 (a, b) match {*
 *case (_, _: ConcurrentViewers) = *
 *  ConcurrentViewers(b.count)*
 *case (_: ConcurrentViewers, _: IncrementConcurrentViewers) = *
 *  ConcurrentViewers(a.count + b.count)*
 *case (_: ConcurrentViewers, _: DecrementConcurrentViewers) = *
 *  ConcurrentViewers(a.count - b.count)*
 *  }*
 *}*

 *case class IncrementConcurrentViewers(count: Long) extends
 IConcurrentUsers*
 *case class DecrementConcurrentViewers(count: Long) extends
 IConcurrentUsers*
 *case class ConcurrentViewers(count: Long) extends IConcurrentUsers*


 *also the error stack trace copied from executor logs is:*

 *java.lang.OutOfMemoryError: Java heap space*
 *at
 org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)*
 *at
 org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)*
 *at
 org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)*
 *at
 org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)*
 *at
 org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)*
 *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
 *at
 org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)*
 *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
 *at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
 *at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
 *at java.lang.reflect.Method.invoke(Method.java:601)*
 *at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)*
 *at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
 *at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
 *at
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)*
 *at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)*
 *at
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)*
 *at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)*
 *at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
 *at
 org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)*
 *at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)*
 *at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
 *at java.lang.reflect.Method.invoke(Method.java:601)*
 *at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
 *at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866

Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-21 Thread Sourav Chandra
Hi,

We are building a spark streaming application which reads from kafka, does
updateStateBykey based on the received message type and finally stores into
redis.

After running for few seconds the executor process get killed by throwing
OutOfMemory error.

The code snippet is below:


*NoOfReceiverInstances = 1*

*val kafkaStreams = (1 to NoOfReceiverInstances).map(*
*  _ = KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)*
*)*
*val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
Long)]) = {...}*

*ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))*



*object RedisHelper {*
*  private val client = scredis.Redis(*
*
ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)*
*  )*

*  def update(**itr: Iterator[(String, (Long, Long))]) {*
*// redis save operation*
*  }*

*}*


*Below is the spark configuration:*


*spark.app.name http://spark.app.name = XXX*
*spark.jars = .jar*
*spark.home = /spark-1.1.1-bin-hadoop2.4*
*spark.executor.memory = 1g*
*spark.streaming.concurrentJobs = 1000*
*spark.logConf = true*
*spark.cleaner.ttl = 3600 //in milliseconds*
*spark.default.parallelism = 12*
*spark.executor.extraJavaOptions = -Xloggc:gc.log -XX:+PrintGCDetails
-XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof
-XX:+HeapDumpOnOutOfMemoryError*
*spark.executor.logs.rolling.strategy = size*
*spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB*
*spark.executor.logs.rolling.maxRetainedFiles = 10*
*spark.serializer = org.apache.spark.serializer.KryoSerializer*
*spark.kryo.registrator = xxx.NoOpKryoRegistrator*


other configurations are below

*streaming {*
*// All streaming context related configs should come here*
*batch-duration = 1 second*
*checkpoint-directory = /tmp*
*checkpoint-duration = 10 seconds*
*slide-duration = 1 second*
*window-duration = 1 second*
*partitions-for-shuffle-task = 32*
*  }*
*  kafka {*
*no-of-receivers = 1*
*zookeeper-quorum = :2181*
*consumer-group = x*
*topic = x:2*
*  }*

We tried different combinations like
 - with spark 1.1.0 and 1.1.1.
 - by increasing executor memory
 - by changing the serialization strategy (switching between kryo and
normal java)
 - by changing broadcast strategy (switching between http and torrent
broadcast)


Can anyone give any insight what we are missing here? How can we fix this?

Due to akka version mismatch with some other libraries we cannot upgrade
the spark version.

Thanks,
-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


spark logging issue

2014-12-11 Thread Sourav Chandra
Hi,

I am using spark 1.1.0 and setting below properties while creating spark
context.


*spark.executor.logs.rolling.maxRetainedFiles = 10*

*spark.executor.logs.rolling.size.maxBytes = 104857600*
*spark.executor.logs.rolling.strategy = size*

Even though I am setting to rollover after 100 MB, the log files (i.e.
stderr) file is not rolled over.

What am I missing here?

-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


[Spark/ Spark Streaming] Spark 1.1.0 fails working with akka 2.3.6

2014-11-18 Thread Sourav Chandra
Hi,

I have created a spark streaming application based on spark-1.1.0. While
running it failed saying akk jar version mismatch. Some projects are using
akka 2.3.6 so I have no choice to change the akka version as it will affect
others.

What should I do?

*Caused by: akka.ConfigurationException: Akka JAR version [2.3.6] does not
match the provided config version [2.2.3]*
* at akka.actor.ActorSystem$Settings.init(ActorSystem.scala:209)
~[analytics-engine.jar:1.0.0]*
* at akka.actor.ActorSystemImpl.init(ActorSystem.scala:504)
~[analytics-engine.jar:1.0.0]*
* at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
~[analytics-engine.jar:1.0.0]*
* at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
~[analytics-engine.jar:1.0.0]*
* at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
~[analytics-engine.jar:1.0.0]*
* at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
~[analytics-engine.jar:1.0.0]*
* at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
~[analytics-engine.jar:1.0.0]*
* at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446)
~[analytics-engine.jar:1.0.0]*
* at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
~[analytics-engine.jar:1.0.0]*
* at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442)
~[analytics-engine.jar:1.0.0]*
* at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
~[analytics-engine.jar:1.0.0]*
* at org.apache.spark.SparkEnv$.create(SparkEnv.scala:153)
~[analytics-engine.jar:1.0.0]*
* at org.apache.spark.SparkContext.init(SparkContext.scala:203)
~[analytics-engine.jar:1.0.0]*
* at
com.livestream.analytics.engine.context.ApplicationContextFactory$.createSimpleContext(ApplicationContext.scala:63)
~[analytics-engine.jar:1.0.0]*
* ... 13 common frames omitted*

Thanks,
-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Spark Streaming with Kafka is failing with Error

2014-11-18 Thread Sourav Chandra
Hi,

While running my spark streaming application built on spark 1.1.0 I am
getting below error.

*14/11/18 15:35:30 ERROR ReceiverTracker: Deregistered receiver for stream
0: Error starting receiver 0 - java.lang.AbstractMethodError*
* at org.apache.spark.Logging$class.log(Logging.scala:52)*
* at
org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:66)*
* at org.apache.spark.Logging$class.logInfo(Logging.scala:59)*
* at
org.apache.spark.streaming.kafka.KafkaReceiver.logInfo(KafkaInputDStream.scala:66)*
* at
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:86)*
* at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)*
* at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)*
* at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)*
* at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)*
* at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)*
* at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)*
* at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)*
* at org.apache.spark.scheduler.Task.run(Task.scala:54)*
* at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)*
* at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)*
* at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)*
* at java.lang.Thread.run(Thread.java:722)*



Can you guys please help me out here?
-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: What's wrong with my spark filter? I get org.apache.spark.SparkException: Task not serializable

2014-10-17 Thread Sourav Chandra
It might be due to the object is nested within some class which may not be
serializable.

Also you can run the appluication using this jvm parameter to see detailed
info about serialization -Dsun.io.serialization.extendedDebugInfo=true

On Fri, Oct 17, 2014 at 4:07 PM, shahab shahab.mok...@gmail.com wrote:

 Hi,

 Probably I am missing very simple principle , but something is wrong with
 my filter,
 i get org.apache.spark.SparkException: Task not serializable expetion.

 here is my filter function:
 object OBJ {
def f1(): Boolean = {
  var i = 1;
  for (j-1 to 10) i = i +1;
  true;
}
 }

 rdd.filter(row = OBJ.f1())


 And when I run, I get the following exception:

 org.apache.spark.SparkException: Task not serializable
 at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
 at org.apache.spark.rdd.RDD.filter(RDD.scala:282)
 ...
 Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf
 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 ...



 best,
 /Shahab




-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Questions regarding different spark pre-built packages

2014-06-24 Thread Sourav Chandra
Hi,

I am just curious to know what are the difference between the prebuilt
packages for Hadoop1, 2, CDH etc.

I am using spark standalone cluster and we dont use hadoop at all.

Can we use any one of the pre-buil;t packages OR we have to run
make-distribution.sh script from the code?

Thanks,
-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Spark streaming issue

2014-05-27 Thread Sourav Chandra
HI,

I am facing a weird issue. I am using spark 0.9 and running a streaming
application.

In the UI, the duration shows order of seconds but if I dig into that
particular stage details, it shows total time taken across all tasks for
the stage is much much less (in milliseconds)

I am using Fair scheduling policy and pool name is counter-metric-persistor.

What could the reason for this?

*Stage screenshot: Stage 97*


 97 
counter-metric-persistorhttp://204.77.213.186:4040/stages/pool?poolname=counter-metric-persistor
foreach
at 
RealTimeAnalyticsApplication.scala:33http://204.77.213.186:4040/stages/stage?id=972014/05/27
07:22:2314.5 s
6/6

*Stage details screenshot: Stage 97*

Details for Stage 97

   - *Total task time across all tasks: *154 ms

Summary Metrics for 6 Completed Tasks
 MetricMin 25th percentileMedian 75th percentile Max Result serialization
time 0 ms 0 ms 0 ms 0 ms 0 ms Duration 12 ms 13 ms 23 ms 30 ms 54 ms Time
spent fetching task results 0 ms 0 ms 0 ms 0 ms 0 ms Scheduler delay 7 ms 7
ms 8 ms 8 ms 8 ms
Aggregated Metrics by Executor Executor ID Address Task TimeTotal
TasksFailed TasksSucceeded
Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) Shuffle Spill
(Disk)0ls230-127-p.nyc0.ls.local:53463199 ms6060.0 B0.0 B0.0 B0.0 B
Tasks
 Task IndexTask ID StatusLocality Level ExecutorLaunch Time DurationGC
TimeResult Ser 
TimeErrors0408SUCCESSPROCESS_LOCALls230-127-p.nyc0.ls.local2014/05/27
07:22:3730 ms
1 411 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3722 ms
2 412 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3723 ms
3 414 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3713 ms
4 415 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3712 ms
5 416 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3754 ms


Thanks,
-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Spark Streaming Error: SparkException: Error sending message to BlockManagerMaster

2014-05-22 Thread Sourav Chandra
Hi,

I am running Spark streaming application. I have faced some uncaught
exception after which my worker stops processing any further messages.

I am using *spark 0.9.0*

Could you please let me know what could be the cause of this and how to
overcome this issue?

[ERROR] [05/22/2014 03:23:26.250] [spark-akka.actor.default-dispatcher-18]
[ActorSystem(spark)] exception while executing timer task
org.apache.spark.SparkException: Error sending message to
BlockManagerMaster [message = HeartBeat(BlockManagerId(driver,
10.29.29.226, 36655, 0))]
at
org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:176)
 at
org.apache.spark.storage.BlockManagerMaster.sendHeartBeat(BlockManagerMaster.scala:52)
at org.apache.spark.storage.BlockManager.org
$apache$spark$storage$BlockManager$$heartBeat(BlockManager.scala:97)
 at
org.apache.spark.storage.BlockManager$$anonfun$initialize$1.apply$mcV$sp(BlockManager.scala:135)
at akka.actor.Scheduler$$anon$9.run(Scheduler.scala:80)
 at
akka.actor.LightArrayRevolverScheduler$$anon$3$$anon$2.run(Scheduler.scala:241)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.run(Scheduler.scala:464)
 at
akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:281)
at
akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:280)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at akka.actor.LightArrayRevolverScheduler.close(Scheduler.scala:279)
at akka.actor.ActorSystemImpl.stopScheduler(ActorSystem.scala:630)
 at
akka.actor.ActorSystemImpl$$anonfun$_start$1.apply$mcV$sp(ActorSystem.scala:582)
at akka.actor.ActorSystemImpl$$anonfun$_start$1.apply(ActorSystem.scala:582)
 at
akka.actor.ActorSystemImpl$$anonfun$_start$1.apply(ActorSystem.scala:582)
at akka.actor.ActorSystemImpl$$anon$3.run(ActorSystem.scala:596)
 at
akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.runNext$1(ActorSystem.scala:750)
at
akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply$mcV$sp(ActorSystem.scala:753)
 at
akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:746)
at
akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:746)
 at akka.util.ReentrantGuard.withGuard(LockUtil.scala:15)
at
akka.actor.ActorSystemImpl$TerminationCallbacks.run(ActorSystem.scala:746)
 at
akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:593)
at
akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:593)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)
 at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.pattern.AskTimeoutException:
Recipient[Actor[akka://spark/user/BlockManagerMaster#1305432112]] had
already been terminated.
 at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at
org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:161)
 ... 39 more

Thanks,
-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: what does broadcast_0 stand for

2014-04-28 Thread Sourav Chandra
Apart from user defined broadcast variable, there are others which is being
created by spark. This could be one of those.



As I had mentioned you can do a small program where you create a broadcast
variable. Check the broadcast variable id(say its x). Then go to the /tmp
to open broadcast_x file. You will find the content is serialized output of
your variable.





On Mon, Apr 28, 2014 at 4:26 PM, wxhsdp wxh...@gmail.com wrote:

 thank you for your help, Sourav.
 i found broadcast_0 binary file in /tmp directory. it's size is 33.4kB, not
 equal to estimated size 135.6 KB.
 i opened it and found it's content has no relations with my read in file. i
 guess broadcast_0 is a config
 file about spark, is that right?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/what-does-broadcast-0-stand-for-tp4934p4936.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: Access Last Element of RDD

2014-04-24 Thread Sourav Chandra
You can use rdd.takeOrdered(1)(reverseOrdrering)

reverseOrdering is you Ordering[T] instance where you define the ordering
logic. This you have to pass in the method



On Thu, Apr 24, 2014 at 11:21 AM, Frank Austin Nothaft 
fnoth...@berkeley.edu wrote:

 If you do this, you could simplify to:

 RDD.collect().last

 However, this has the problem of collecting all data to the driver.

 Is your data sorted? If so, you could reverse the sort and take the first.
 Alternatively, a hackey implementation might involve a
 mapPartitionsWithIndex that returns an empty iterator for all partitions
 except for the last. For the last partition, you would filter all elements
 except for the last element in your iterator. This should leave one
 element, which is your last element.

 Frank Austin Nothaft
 fnoth...@berkeley.edu
 fnoth...@eecs.berkeley.edu
 202-340-0466

 On Apr 23, 2014, at 10:44 PM, Adnan Yaqoob nsyaq...@gmail.com wrote:

 This function will return scala List, you can use List's last function to
 get the last element.

 For example:

 RDD.take(RDD.count()).last


 On Thu, Apr 24, 2014 at 10:28 AM, Sai Prasanna ansaiprasa...@gmail.comwrote:

 Adnan, but RDD.take(RDD.count()) returns all the elements of the RDD.

 I want only to access the last element.


 On Thu, Apr 24, 2014 at 10:33 AM, Sai Prasanna 
 ansaiprasa...@gmail.comwrote:

 Oh ya, Thanks Adnan.


 On Thu, Apr 24, 2014 at 10:30 AM, Adnan Yaqoob nsyaq...@gmail.comwrote:

 You can use following code:

 RDD.take(RDD.count())


 On Thu, Apr 24, 2014 at 9:51 AM, Sai Prasanna 
 ansaiprasa...@gmail.comwrote:

 Hi All, Some help !
 RDD.first or RDD.take(1) gives the first item, is there a straight
 forward way to access the last element in a similar way ?

 I coudnt fine a tail/last method for RDD. !!









-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: Access Last Element of RDD

2014-04-24 Thread Sourav Chandra
Also same thing can be done using rdd.top(1)(reverseOrdering)



On Thu, Apr 24, 2014 at 11:28 AM, Sourav Chandra 
sourav.chan...@livestream.com wrote:

 You can use rdd.takeOrdered(1)(reverseOrdrering)

 reverseOrdering is you Ordering[T] instance where you define the ordering
 logic. This you have to pass in the method



 On Thu, Apr 24, 2014 at 11:21 AM, Frank Austin Nothaft 
 fnoth...@berkeley.edu wrote:

 If you do this, you could simplify to:

 RDD.collect().last

 However, this has the problem of collecting all data to the driver.

 Is your data sorted? If so, you could reverse the sort and take the
 first. Alternatively, a hackey implementation might involve a
 mapPartitionsWithIndex that returns an empty iterator for all partitions
 except for the last. For the last partition, you would filter all elements
 except for the last element in your iterator. This should leave one
 element, which is your last element.

 Frank Austin Nothaft
 fnoth...@berkeley.edu
 fnoth...@eecs.berkeley.edu
 202-340-0466

 On Apr 23, 2014, at 10:44 PM, Adnan Yaqoob nsyaq...@gmail.com wrote:

 This function will return scala List, you can use List's last function to
 get the last element.

 For example:

 RDD.take(RDD.count()).last


 On Thu, Apr 24, 2014 at 10:28 AM, Sai Prasanna 
 ansaiprasa...@gmail.comwrote:

 Adnan, but RDD.take(RDD.count()) returns all the elements of the RDD.

 I want only to access the last element.


 On Thu, Apr 24, 2014 at 10:33 AM, Sai Prasanna 
 ansaiprasa...@gmail.comwrote:

 Oh ya, Thanks Adnan.


 On Thu, Apr 24, 2014 at 10:30 AM, Adnan Yaqoob nsyaq...@gmail.comwrote:

 You can use following code:

 RDD.take(RDD.count())


 On Thu, Apr 24, 2014 at 9:51 AM, Sai Prasanna ansaiprasa...@gmail.com
  wrote:

 Hi All, Some help !
 RDD.first or RDD.take(1) gives the first item, is there a straight
 forward way to access the last element in a similar way ?

 I coudnt fine a tail/last method for RDD. !!









 --

 Sourav Chandra

 Senior Software Engineer

 · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

 sourav.chan...@livestream.com

 o: +91 80 4121 8723

 m: +91 988 699 3746

 skype: sourav.chandra

 Livestream

 Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
 Block, Koramangala Industrial Area,

 Bangalore 560034

 www.livestream.com




-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: about rdd.filter()

2014-04-23 Thread Sourav Chandra
This could happen if variable is defined in such a way that it pulls its
own class reference into the closure. Hence serilization tries to
 serialize the whole outer class reference which is not serializable and
whole thing failed.



On Wed, Apr 23, 2014 at 3:15 PM, randylu randyl...@gmail.com wrote:

   my code is like:
 rdd2 = rdd1.filter(_._2.length  1)
 rdd2.collect()
   it works well, but if i use a variable /num/ instead of 1:
 var num = 1
 rdd2 = rdd1.filter(_._2.length  num)
 rdd2.collect()
   it fails at rdd2.collect()
   so strange?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/about-rdd-filter-tp4657.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: Change print() in JavaNetworkWordCount

2014-03-25 Thread Sourav Chandra
You can extend DStream and override print() method. Then you can create
your own DSTream extending from this.


On Tue, Mar 25, 2014 at 6:07 PM, Eduardo Costa Alfaia 
e.costaalf...@unibs.it wrote:

 Hi Guys,
 I think that I already did this question, but I don't remember if anyone
 has answered me. I would like changing in the function print() the quantity
 of words and the frequency number that are sent to driver's screen. The
 default value is 10.

 Anyone could help me with this?

 Best Regards

 --
 Informativa sulla Privacy: http://www.unibs.it/node/8155




-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


spark executor/driver log files management

2014-03-24 Thread Sourav Chandra
Hi,

I have few questions regarding log file management in spark:

1. Currently I did not find any way to modify the lof file name for
executor/drivers). Its hardcoded as stdout and stderr. Also there is no log
rotation.

In case of streaming application this will grow forever and become
unmanageable. Is there any way to overcome this?

Thanks,
-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Spark usage patterns and questions

2014-03-11 Thread Sourav Chandra
Hi,

I have some questions regarding usage patterns and debugging in spark/spark
streaming.

1. What is some used design patterns of using broadcast variable? In my
application i created some and also created a scheduled task which
periodically refreshes the variables. I want to know how efficiently and in
modular way people generally achieve this?

2. Sometimes a uncaught exception in driver program/worker does not get
traced anywhere? How can we debug this?

3. In our usecase we read from Kafka, do some mapping and lastly persists
data to cassandra as well as pushes the data over remote actor for realtime
update in dashboard. I used below approaches
 - First tried to use vary naive way like stream.map(...).foreachRDD(
pushes to actor)
It does not work and stage failed saying akka exception
 - Second tried to use
akka.serialization.JavaSerilizer.withSystem(system){...} approach
 It does not work and stage failed BUT without any trace anywhere in
lofs
 - Finally did rdd.collect to collect the output into driver and then
pushes to actor
 It worked.

I would like to know is there any efficient way of achieving this sort of
usecases

4. Sometimes I see failed stages but when opened those stage details it
said stage did not start. What does this mean?

Looking forward for some interesting responses :)

Thanks,
-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: [External] Re: no stdout output from worker

2014-03-10 Thread Sourav Chandra
Hi Ranjan,

Whatever code is being passed as closure to spark operations like map,
flatmap, filter etc are part of task

All others are in driver.

Thanks,
Sourav


On Mon, Mar 10, 2014 at 12:03 PM, Sen, Ranjan [USA] sen_ran...@bah.comwrote:

 Hi Patrick

 How do I know which part of the code is in the driver and which in task?
 The structure of my code is as below-

 Š

 Static boolean done=false;
 Š

 Public static void main(..

 ..

 JavaRDDString lines = ..

 ..

 While (!done) {

 ..
 While (..) {

 JavaPairRDDInteger, ListInteger labs1 = labs.map (new PairFunctionŠ );

 !! Here I have System.out.println (A)

 } // inner while

 !! Here I have System.out.println (B)


 If (Š) {
 Done = true;

 !! Also here some System.out.println  (C)

 Break;
 }

 Else {

 If (Š) {

 !! More System.out.println  (D)


 labs = labs.map(Š) ;

 }
 }

 } // outer while

 !! Even more System.out.println  (E)

 } // main

 } //class

 I get the console outputs on the master for (B) and (E). I do not see any
 stdout in the worker node. I find the stdout and stderr in the
 spark/work/appid/0/. I see output
 in stderr but not in stdout.

 I do get all the outputs on the console when I run it in local mode.

 Sorry I am new and may be asking some naïve question but it is really
 confusing to me. Thanks for your help.

 Ranjan

 On 3/9/14, 10:50 PM, Patrick Wendell pwend...@gmail.com wrote:

 Hey Sen,
 
 Is your code in the driver code or inside one of the tasks?
 
 If it's in the tasks, the place you would expect these to be is in
 stdout file under spark/appid/work/[stdout/stderr]. Are you seeing
 at least stderr logs in that folder? If not then the tasks might not
 be running on the workers machines. If you see stderr but not stdout
 that's a bit of a puzzler since they both go through the same
 mechanism.
 
 - Patrick
 
 On Sun, Mar 9, 2014 at 2:32 PM, Sen, Ranjan [USA] sen_ran...@bah.com
 wrote:
  Hi
  I have some System.out.println in my Java code that is working ok in a
 local
  environment. But when I run the same code on a standalone  mode in a EC2
  cluster I do not see them at the worker stdout (in the worker node under
  spark location/work ) or at the driver console. Could you help me
  understand how do I troubleshoot?
 
  Thanks
  Ranjan




-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com