Visual PySpark Programming

2018-06-11 Thread srungarapu vamsi
Hi,

I have the following use case and I did not find a suitable tool which can
serve my purpose.

Use case:
Step 1,2,3 are UI driven.
*Step 1*)  A user should be able to choose data source (example HDFS) and
should be able to configure it so that it points to a file.
*Step 2*)  A user should be able to apply filters, transformations and
actions on the dataframe loaded in the previous step.
*Step 3*)  A user should be able to perform Step 2 any number of times as a
chain.
*Step 4*)  A user should be able to click a Save button which would convert
the data flow diagram into a pyspark job.

I found tools like https://seahorse.deepsense.ai/,
https://www.streamanalytix.com/product/streamanalytix/ which can do this.
However, they give a scala/java spark job instead of a pyspark job.
Moreover, these are paid products.

a) Are there any opensource solutions which can serve my need?

If not, I would like to build one. In order to build one, I would require a
workflow UI editor which i can tweak to serve my purpose.
But I did not find any free workflow UI editor which I can tweak.

b) Are there any open sourced workflow UI editor which can help me in
solving my use case?

c) Are there any other interesting approaches to solve my use case?


Thanks,
Vamsi


Launch a pyspark Job From UI

2018-06-11 Thread srungarapu vamsi
Hi,

I am looking for applications where we can trigger spark jobs from UI.
Are there any such applications available?

I have checked Spark-jobserver using which we can expose an api to submit a
spark application.

Are there any other alternatives using which i can submit pyspark jobs from
UI ?

Thanks,
Vamsi


Re: Spark/Kafka Streaming Job Gets Stuck

2015-10-29 Thread srungarapu vamsi
Other than @Adrian suggestions, check if the processing delay is more than
the batch processing time.

On Thu, Oct 29, 2015 at 2:23 AM, Adrian Tanase  wrote:

> Does it work as expected with smaller batch or smaller load? Could it be
> that it's accumulating too many events over 3 minutes?
>
> You could also try increasing the parallelism via repartition to ensure
> smaller tasks that can safely fit in working memory.
>
> Sent from my iPhone
>
> > On 28 Oct 2015, at 17:45, Afshartous, Nick 
> wrote:
> >
> >
> > Hi, we are load testing our Spark 1.3 streaming (reading from Kafka)
> job and seeing a problem.  This is running in AWS/Yarn and the streaming
> batch interval is set to 3 minutes and this is a ten node cluster.
> >
> > Testing at 30,000 events per second we are seeing the streaming job get
> stuck (stack trace below) for over an hour.
> >
> > Thanks on any insights or suggestions.
> > --
> >  Nick
> >
> >
> org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartitionsToPair(JavaDStreamLike.scala:43)
> >
> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.runStream(StreamingKafkaConsumerDriver.java:125)
> >
> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.main(StreamingKafkaConsumerDriver.java:71)
> > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > java.lang.reflect.Method.invoke(Method.java:606)
> >
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480)
> >
> > Notice: This communication is for the intended recipient(s) only and may
> contain confidential, proprietary, legally protected or privileged
> information of Turbine, Inc. If you are not the intended recipient(s),
> please notify the sender at once and delete this communication.
> Unauthorized use of the information in this communication is strictly
> prohibited and may be unlawful. For those recipients under contract with
> Turbine, Inc., the information in this communication is subject to the
> terms and conditions of any applicable contracts or agreements.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
/Vamsi


reduceByKeyAndWindow confusion

2015-09-23 Thread srungarapu vamsi
I create  a stream from kafka as belows"

val kafkaDStream  =
KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
kafkaConf, Set(topics))

.window(Minutes(WINDOW_DURATION),Minutes(SLIDER_DURATION))

I have a map ("intToStringList") which is a Map[Int,List[String]]
using this map i am filtering the stream and finally converting it into
Map[Int,DStream[KafkaGenericEvent]]]

1.
Now on this map, for each and every value (which is a
DStream[KafkaGenericEvent])
i am applying reduceByKeyAndWindow operation.
But since we have to give window duration and slider duration even in
reduceByKeyAndWindow, does that imply that on every window of the given
DStream, reduceByKeyAndWindow can be applied with a different window
duration and slider duration ?
i.e Lets say window DStream is created with window duration-> 16 minutes,
slider duration -> 1 Minute, so  i have one RDD for every window
For reduceByKeyAndWindow, if we have window duration as as 4 minutes and
slider duration as 1 minute, then will i get 4 RDDs since the
windowDStream_batchDuration / reduceByKeyAndwindow_batchDuration is 4 ?

2.
As suggested in spark doc, i am trying to give checkpointing interval on
the kafkaDStream created in the block shown above in the following way:
kafkaDStream.checkpoint(Minutes(4))

But when i execute this, i get the error:
"WindowedDStream has been marked for checkpointing but the storage level
has not been set to enable persisting. Please use DStream.persist() to set
the storage level to use memory for better checkpointing performance"
But when i went through the implementation of checkpoint function  of
DStream.scala, i see a call to persist() function.
Then do i really have to persist function in the WindowedDStream ?
Just to give a shot i made a call to persist method on the windowedDStream
and then made a call to checkpoint(interval) . Even then i am facing the
above mentioned error.
How do i solve this ?
-- 
/Vamsi


Re: Invalid checkpoint url

2015-09-22 Thread srungarapu vamsi
@Das,
No, i am getting in the cluster mode.
I think i understood why i am getting this error, please correct me if i am
wrong.
Reason is:
checkpointing writes rdd to disk, so this checkpointing happens on all
workers. Whenever, spark has to read back the rdd , checkpoint directory
should be reachable to all the workers and should be a common place where
workers can write to and read from. This  asks for commonly accessible file
system like nfs or hdfs or s3.
So, if i give ssc.checkpoint("some local directory"), since workers are not
able to read the rdds from the other worker's checkpoint directory , i am
facing the above mentioned error.
With this understanding, i am creating a t2 medium, hdfs 2.7.1 node and
pointing the check point directory to "hdfs://ip:port/path/to/directory"

Please correct me if i am wrong.

On Wed, Sep 23, 2015 at 4:53 AM, Tathagata Das <t...@databricks.com> wrote:

> Are you getting this error in local mode?
>
>
> On Tue, Sep 22, 2015 at 7:34 AM, srungarapu vamsi <
> srungarapu1...@gmail.com> wrote:
>
>> Yes, I tried ssc.checkpoint("checkpoint"), it works for me as long as i
>> don't use reduceByKeyAndWindow.
>>
>> When i start using "reduceByKeyAndWindow" it complains me with the error
>> "Exception in thread "main" org.apache.spark.SparkException: Invalid
>> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171-01f3-48$
>> 2-97be-e3862eb5c944/rdd-8"
>>
>> The stack trace is as below:
>>
>> Exception in thread "main" org.apache.spark.SparkException: Invalid
>> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171[22/9706$
>> 2-97be-e3862eb5c944/rdd-8
>> at
>> org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>> at
>> org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:97)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1415)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spa

Re: Invalid checkpoint url

2015-09-22 Thread srungarapu vamsi
oolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

On Tue, Sep 22, 2015 at 6:49 PM, Adrian Tanase <atan...@adobe.com> wrote:

> Have you tried simply ssc.checkpoint("checkpoint”)? This should create it
> in the local folder, has always worked for me when in development on local
> mode.
>
> For the others (/tmp/..) make sure you have rights to write there.
>
> -adrian
>
> From: srungarapu vamsi
> Date: Tuesday, September 22, 2015 at 7:59 AM
> To: user
> Subject: Invalid checkpoint url
>
> I am using reduceByKeyAndWindow (with inverse reduce function) in my code.
> In order to use this, it seems the checkpointDirectory which i have to use
> should be hadoop compatible file system.
> Does that mean that, i should setup hadoop on my system.
> I googled about this and i found in a S.O answer that i need not setup
> hdfs but the checkpoint directory should be HDFS copatible.
>
> I am a beginner in this area. I am running my spark streaming application
> on ubuntu 14.04, spark -1.3.1
> If at all i need not setup hdfs and ext4 is hdfs compatible, then how does
> my checkpoint directory look like?
>
> i tried all these:
> ssc.checkpoint("/tmp/checkpoint")
> ssc.checkpoint("hdfs:///tmp/checkpoint")
> ssc.checkpoint("file:///tmp/checkpoint")
>
> But none of them worked for me.
>
> --
> /Vamsi
>



-- 
/Vamsi


Invalid checkpoint url

2015-09-21 Thread srungarapu vamsi
I am using reduceByKeyAndWindow (with inverse reduce function) in my code.
In order to use this, it seems the checkpointDirectory which i have to use
should be hadoop compatible file system.
Does that mean that, i should setup hadoop on my system.
I googled about this and i found in a S.O answer that i need not setup hdfs
but the checkpoint directory should be HDFS copatible.

I am a beginner in this area. I am running my spark streaming application
on ubuntu 14.04, spark -1.3.1
If at all i need not setup hdfs and ext4 is hdfs compatible, then how does
my checkpoint directory look like?

i tried all these:
ssc.checkpoint("/tmp/checkpoint")
ssc.checkpoint("hdfs:///tmp/checkpoint")
ssc.checkpoint("file:///tmp/checkpoint")

But none of them worked for me.

-- 
/Vamsi


Re: Spark Streaming kafka directStream value decoder issue

2015-09-17 Thread srungarapu vamsi
@Adrian,
I am doing collect for debugging purpose. But i have to use foreachRDD so
that i can operate on top of this rdd and eventually save to DB.

But my actual problem here is to properly convert Array[Byte] to my custom
object.

On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase <atan...@adobe.com> wrote:

> Why are you calling foreachRdd / collect in the first place?
>
> Instead of using a custom decoder, you should simply do – this is code
> executed on the workers and allows the computation to continue. ForeachRdd
> and collect are output operations and force the data to be collected on the
> driver (assuming you don’t want that…)
>
> val events = kafkaDStream.map { case(devId,byteArray)=> 
> KafkaGenericEvent.parseFrom(byteArray) }
>
> From: srungarapu vamsi
> Date: Thursday, September 17, 2015 at 4:03 PM
> To: user
> Subject: Spark Streaming kafka directStream value decoder issue
>
> I am using KafkaUtils.createDirectStream to read the data from kafka bus.
>
> On the producer end, i am generating in the following way:
>
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
> val producer = new KafkaProducer[String, KafkaGenericEvent](props)
>
> // Send some messages
> println("Sending message")
> val kafkaGenericEvent = new 
> KafkaGenericEvent("event-id",EventType.site,"6",144050040L)
> val message = new ProducerRecord[String, 
> KafkaGenericEvent](topic,"myKey", kafkaGenericEvent)
> producer.send(message)
>   }
>
> I am connecting to kafka using the console consumer script and am able to
> see proper data. The KafkaGenericEvent used in the above code is  the class
> generated using ScalaBuff from a protobuff file.
>
> On the consumer end,
> If i read the value as a normal byte array and the convert it into
> KafkaGenericEvent in the following way, i get proper data:
>
>  val kafkaDStream  = 
> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>  kafkaConf, Set(topics))
>
> kafkaDStream.foreachRDD(rdd=>rdd.collect().map{
>   case(devId,byteArray)=>{
> println(KafkaGenericEvent.parseFrom(byteArray))
>   }
> })
>
> But if change the value to KafkaGenericEvent and use a custom decoder like
> this:
>
> class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends 
> Decoder[KafkaGenericEvent]{
>  override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = {
>KafkaGenericEvent.parseFrom(bytes)
>  }
> }
>
> and in consumer:
>
> val kafkaDStream  = 
> KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
>  kafkaConf, Set(topics))
> kafkaDStream foreachRDD(rdd=>rdd.collect().map{
>   case(devId,genericEvent)=>{
> println(genericEvent)
>   }
> })
>
> Now, i my value object KafkaGenericEvent   is not created based on the
> sent data instead it is creating an empty Object of KafkaGenericEvent with
> default values.
>
> Even if i read the value as array of bytes in the createDirectStream and
> than apply a transformation in the following way i am getting in correct
> values:
>
> val kafkaDStream  = 
> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>  kafkaConf, Set(topics))
>
> kafkaDStream.map{
>   case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
> } foreachRDD(rdd=>rdd.collect().map{
>   case(devId,genericEvent)=>{
> println(genericEvent)
>   }
> })
>
> I get the default KafkaGenericEvent Object in the line println
> (genericEvent)
> Does this mean that I can transform the values only on the driver and not
> on the executors?
>
> I am completely confused here!
> I am using :
>  scala-2.10.4
>  spark-1.3.1
>  kafka_2.10-0.8.2.1
>
> -
> /Vamsi
>



-- 
/Vamsi


Spark Streaming kafka directStream value decoder issue

2015-09-17 Thread srungarapu vamsi
I am using KafkaUtils.createDirectStream to read the data from kafka bus.

On the producer end, i am generating in the following way:

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, KafkaGenericEvent](props)

// Send some messages
println("Sending message")
val kafkaGenericEvent = new
KafkaGenericEvent("event-id",EventType.site,"6",144050040L)
val message = new ProducerRecord[String,
KafkaGenericEvent](topic,"myKey", kafkaGenericEvent)
producer.send(message)
  }

I am connecting to kafka using the console consumer script and am able to
see proper data. The KafkaGenericEvent used in the above code is  the class
generated using ScalaBuff from a protobuff file.

On the consumer end,
If i read the value as a normal byte array and the convert it into
KafkaGenericEvent in the following way, i get proper data:

 val kafkaDStream  =
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
kafkaConf, Set(topics))

kafkaDStream.foreachRDD(rdd=>rdd.collect().map{
  case(devId,byteArray)=>{
println(KafkaGenericEvent.parseFrom(byteArray))
  }
})

But if change the value to KafkaGenericEvent and use a custom decoder like
this:

class KafkaGenericEventsDecoder(props: VerifiableProperties = null)
extends Decoder[KafkaGenericEvent]{
 override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = {
   KafkaGenericEvent.parseFrom(bytes)
 }
}

and in consumer:

val kafkaDStream  =
KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
kafkaConf, Set(topics))
kafkaDStream foreachRDD(rdd=>rdd.collect().map{
  case(devId,genericEvent)=>{
println(genericEvent)
  }
})

Now, i my value object KafkaGenericEvent   is not created based on the sent
data instead it is creating an empty Object of KafkaGenericEvent with
default values.

Even if i read the value as array of bytes in the createDirectStream and
than apply a transformation in the following way i am getting in correct
values:

val kafkaDStream  =
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
kafkaConf, Set(topics))

kafkaDStream.map{
  case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
} foreachRDD(rdd=>rdd.collect().map{
  case(devId,genericEvent)=>{
println(genericEvent)
  }
})

I get the default KafkaGenericEvent Object in the line println(genericEvent)
Does this mean that I can transform the values only on the driver and not
on the executors?

I am completely confused here!
I am using :
 scala-2.10.4
 spark-1.3.1
 kafka_2.10-0.8.2.1

-
/Vamsi


Re: Spark Streaming kafka directStream value decoder issue

2015-09-17 Thread srungarapu vamsi
@Saisai Shao, Thanks for the pointer. It turned out to be the serialization
issue. I was using scalabuff to generate my "KafkaGenericEvent" class. But
when i went through the generated class code, i figured out that it is not
serializable.
Now i am generating my classes using scalapb (
https://github.com/trueaccord/ScalaPB) and my problem is solved.

Thanks

On Thu, Sep 17, 2015 at 10:43 PM, Saisai Shao <sai.sai.s...@gmail.com>
wrote:

> Is your "KafkaGenericEvent" serializable? Since you call rdd.collect() to
> fetch the data to local driver, so this KafkaGenericEvent need to be
> serialized and deserialized through Java or Kryo (depends on your
> configuration) serializer, not sure if it is your problem to always get a
> default object.
>
> Also would you provide the implementation of `parseFrom`, so we could
> better understand the details of how you do deserialization.
>
> Thanks
> Saisai
>
> On Thu, Sep 17, 2015 at 9:49 AM, srungarapu vamsi <
> srungarapu1...@gmail.com> wrote:
>
>> If i understand correctly, i guess you are suggesting me to do this  :
>>
>> val kafkaDStream  = 
>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>>  kafkaConf, Set(topics))
>>
>> kafkaDStream.map{
>>   case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
>> } foreachRDD(rdd=>rdd.collect().map{
>>   case(devId,genericEvent)=>{
>> println(genericEvent)
>>   }
>> })
>>
>> I read from Kafka as a Byte Array => applied a transformation on the
>> byteArray to Custom Class => Printed the custom class for debugging purpose.
>>
>> But this is not helping me. i.e i am getting an empty object with default
>> values when i printed "genericEvent"
>>
>> Please correct me if i did not get what you are suggesting me to try.
>>
>>
>> On Thu, Sep 17, 2015 at 9:30 PM, Adrian Tanase <atan...@adobe.com> wrote:
>>
>>> I guess what I'm asking is why not start with a Byte array like in the
>>> example that works (using the DefaultDecoder) then map over it and do the
>>> decoding manually like I'm suggesting below.
>>>
>>> Have you tried this approach? We have the same workflow (kafka =>
>>> protobuf => custom class) and it works.
>>> If you expect invalid messages, you can use flatMap instead and wrap
>>> .parseFrom in a Try {} .toOption.
>>>
>>> Sent from my iPhone
>>>
>>> On 17 Sep 2015, at 18:23, srungarapu vamsi <srungarapu1...@gmail.com>
>>> wrote:
>>>
>>> @Adrian,
>>> I am doing collect for debugging purpose. But i have to use foreachRDD
>>> so that i can operate on top of this rdd and eventually save to DB.
>>>
>>> But my actual problem here is to properly convert Array[Byte] to my
>>> custom object.
>>>
>>> On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase <atan...@adobe.com>
>>> wrote:
>>>
>>>> Why are you calling foreachRdd / collect in the first place?
>>>>
>>>> Instead of using a custom decoder, you should simply do – this is code
>>>> executed on the workers and allows the computation to continue. ForeachRdd
>>>> and collect are output operations and force the data to be collected on the
>>>> driver (assuming you don’t want that…)
>>>>
>>>> val events = kafkaDStream.map { case(devId,byteArray)=> 
>>>> KafkaGenericEvent.parseFrom(byteArray) }
>>>>
>>>> From: srungarapu vamsi
>>>> Date: Thursday, September 17, 2015 at 4:03 PM
>>>> To: user
>>>> Subject: Spark Streaming kafka directStream value decoder issue
>>>>
>>>> I am using KafkaUtils.createDirectStream to read the data from kafka
>>>> bus.
>>>>
>>>> On the producer end, i am generating in the following way:
>>>>
>>>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
>>>> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>>>>   "org.apache.kafka.common.serialization.StringSerializer")
>>>> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>>>>   "org.apache.kafka.common.serialization.StringSerializer")
>>>> val producer = new KafkaProducer[String, KafkaGenericEvent](props)
>>>>
>>>> // Send some messages
>>>> println("Sending message")
>>>> val k

Re: Spark Streaming kafka directStream value decoder issue

2015-09-17 Thread srungarapu vamsi
If i understand correctly, i guess you are suggesting me to do this  :

val kafkaDStream  =
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
kafkaConf, Set(topics))

kafkaDStream.map{
  case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
} foreachRDD(rdd=>rdd.collect().map{
  case(devId,genericEvent)=>{
println(genericEvent)
  }
})

I read from Kafka as a Byte Array => applied a transformation on the
byteArray to Custom Class => Printed the custom class for debugging purpose.

But this is not helping me. i.e i am getting an empty object with default
values when i printed "genericEvent"

Please correct me if i did not get what you are suggesting me to try.


On Thu, Sep 17, 2015 at 9:30 PM, Adrian Tanase <atan...@adobe.com> wrote:

> I guess what I'm asking is why not start with a Byte array like in the
> example that works (using the DefaultDecoder) then map over it and do the
> decoding manually like I'm suggesting below.
>
> Have you tried this approach? We have the same workflow (kafka => protobuf
> => custom class) and it works.
> If you expect invalid messages, you can use flatMap instead and wrap
> .parseFrom in a Try {} .toOption.
>
> Sent from my iPhone
>
> On 17 Sep 2015, at 18:23, srungarapu vamsi <srungarapu1...@gmail.com>
> wrote:
>
> @Adrian,
> I am doing collect for debugging purpose. But i have to use foreachRDD so
> that i can operate on top of this rdd and eventually save to DB.
>
> But my actual problem here is to properly convert Array[Byte] to my custom
> object.
>
> On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase <atan...@adobe.com> wrote:
>
>> Why are you calling foreachRdd / collect in the first place?
>>
>> Instead of using a custom decoder, you should simply do – this is code
>> executed on the workers and allows the computation to continue. ForeachRdd
>> and collect are output operations and force the data to be collected on the
>> driver (assuming you don’t want that…)
>>
>> val events = kafkaDStream.map { case(devId,byteArray)=> 
>> KafkaGenericEvent.parseFrom(byteArray) }
>>
>> From: srungarapu vamsi
>> Date: Thursday, September 17, 2015 at 4:03 PM
>> To: user
>> Subject: Spark Streaming kafka directStream value decoder issue
>>
>> I am using KafkaUtils.createDirectStream to read the data from kafka bus.
>>
>> On the producer end, i am generating in the following way:
>>
>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
>> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>>   "org.apache.kafka.common.serialization.StringSerializer")
>> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>>   "org.apache.kafka.common.serialization.StringSerializer")
>> val producer = new KafkaProducer[String, KafkaGenericEvent](props)
>>
>> // Send some messages
>> println("Sending message")
>> val kafkaGenericEvent = new 
>> KafkaGenericEvent("event-id",EventType.site,"6",144050040L)
>> val message = new ProducerRecord[String, 
>> KafkaGenericEvent](topic,"myKey", kafkaGenericEvent)
>> producer.send(message)
>>   }
>>
>> I am connecting to kafka using the console consumer script and am able to
>> see proper data. The KafkaGenericEvent used in the above code is  the class
>> generated using ScalaBuff from a protobuff file.
>>
>> On the consumer end,
>> If i read the value as a normal byte array and the convert it into
>> KafkaGenericEvent in the following way, i get proper data:
>>
>>  val kafkaDStream  = 
>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>>  kafkaConf, Set(topics))
>>
>> kafkaDStream.foreachRDD(rdd=>rdd.collect().map{
>>   case(devId,byteArray)=>{
>> println(KafkaGenericEvent.parseFrom(byteArray))
>>   }
>> })
>>
>> But if change the value to KafkaGenericEvent and use a custom decoder
>> like this:
>>
>> class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends 
>> Decoder[KafkaGenericEvent]{
>>  override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = {
>>KafkaGenericEvent.parseFrom(bytes)
>>  }
>> }
>>
>> and in consumer:
>>
>> val kafkaDStream  = 
>> KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
>>  kafkaConf, Set(topics))
>> kafkaDStream fore

Re: Spark Streaming Suggestion

2015-09-16 Thread srungarapu vamsi
@David, I am going through the articles you have shared. Will message you
if i need any hellp. Thanks

@Ayan,  Yes, it looks like i can get every thing done with spark streaming.
In fact we have storm already in the architecture sanitizing the data and
dumping into cassandra. Now, i got some new requirements for which spark
streaming is the right tool. Just wanted to see if there can be smooth
marriage between existing storm and spark streaming.

Thanks for the inputs.

On Wed, Sep 16, 2015 at 2:30 AM, ayan guha <guha.a...@gmail.com> wrote:

> I think you need to make up your mind about storm vs spark. Using both in
> this context does not make much sense to me.
> On 15 Sep 2015 22:54, "David Morales" <dmora...@stratio.com> wrote:
>
>> Hi there,
>>
>> This is exactly our goal in Stratio Sparkta, a real-time aggregation
>> engine fully developed with spark streaming (and fully open source).
>>
>> Take a look at:
>>
>>
>>- the docs: http://docs.stratio.com/modules/sparkta/development/
>>- the repository: https://github.com/Stratio/sparkta
>>- and some slides explaining how sparkta was born and what it makes:
>>http://www.slideshare.net/Stratio/strata-sparkta
>>
>>
>> Feel free to ask us anything about the project.
>>
>>
>>
>>
>>
>>
>>
>>
>> 2015-09-15 8:10 GMT+02:00 srungarapu vamsi <srungarapu1...@gmail.com>:
>>
>>> The batch approach i had implemented takes about 10 minutes to complete
>>> all the pre-computation tasks for the one hour worth of data. When i went
>>> through my code, i figured out that most of the time consuming tasks are
>>> the ones, which read data from cassandra and the places where i perform
>>> sparkContex.union(Array[RDD]).
>>> Now the ask is to get the pre computation tasks near real time. So i am
>>> exploring the streaming approach.
>>>
>>> My pre computation tasks not only include just finding the unique
>>> numbers for a given device every minute, every hour, every day but it also
>>> includes the following tasks:
>>> 1. Find the number of unique numbers across a set of devices every
>>> minute, every hour, every day
>>> 2. Find the number of unique numbers which are commonly occurring across
>>> a set of devices every minute, every hour, every day
>>> 3. Find (total time a number occurred across a set of devices)/(total
>>> unique numbers occurred across the set of devices)
>>> The above mentioned pre computation tasks are just a few of what i will
>>> be needing and there are many more coming towards me :)
>>> I see all these problems need more of data parallel approach and hence i
>>> am interested to do this on the spark streaming end.
>>>
>>>
>>> On Tue, Sep 15, 2015 at 11:04 AM, Jörn Franke <jornfra...@gmail.com>
>>> wrote:
>>>
>>>> Why did you not stay with the batch approach? For me the architecture
>>>> looks very complex for a simple thing you want to achieve. Why don't you
>>>> process the data already in storm ?
>>>>
>>>> Le mar. 15 sept. 2015 à 6:20, srungarapu vamsi <
>>>> srungarapu1...@gmail.com> a écrit :
>>>>
>>>>> I am pretty new to spark. Please suggest a better model for the
>>>>> following use case.
>>>>>
>>>>> I have few (about 1500) devices in field which keep emitting about
>>>>> 100KB of data every minute. The nature of data sent by the devices is just
>>>>> a list of numbers.
>>>>> As of now, we have Storm is in the architecture which receives this
>>>>> data, sanitizes it and writes to cassandra.
>>>>> Now, i have a requirement to process this data. The processing
>>>>> includes finding unique numbers emitted by one or more devices for every
>>>>> minute, every hour, every day, every month.
>>>>> I had implemented this processing part as a batch job execution and
>>>>> now i am interested in making it a streaming application. i.e calculating
>>>>> the processed data as and when devices emit the data.
>>>>>
>>>>> I have the following two approaches:
>>>>> 1. Storm writes the actual data to cassandra and writes a message on
>>>>> Kafka bus that data corresponding to device D and minute M has been 
>>>>> written
>>>>> to cassandra
>>>>>
>>>>> Then Spark streaming reads this message from k

Re: Spark Streaming Suggestion

2015-09-15 Thread srungarapu vamsi
The batch approach i had implemented takes about 10 minutes to complete all
the pre-computation tasks for the one hour worth of data. When i went
through my code, i figured out that most of the time consuming tasks are
the ones, which read data from cassandra and the places where i perform
sparkContex.union(Array[RDD]).
Now the ask is to get the pre computation tasks near real time. So i am
exploring the streaming approach.

My pre computation tasks not only include just finding the unique numbers
for a given device every minute, every hour, every day but it also includes
the following tasks:
1. Find the number of unique numbers across a set of devices every minute,
every hour, every day
2. Find the number of unique numbers which are commonly occurring across a
set of devices every minute, every hour, every day
3. Find (total time a number occurred across a set of devices)/(total
unique numbers occurred across the set of devices)
The above mentioned pre computation tasks are just a few of what i will be
needing and there are many more coming towards me :)
I see all these problems need more of data parallel approach and hence i am
interested to do this on the spark streaming end.


On Tue, Sep 15, 2015 at 11:04 AM, Jörn Franke <jornfra...@gmail.com> wrote:

> Why did you not stay with the batch approach? For me the architecture
> looks very complex for a simple thing you want to achieve. Why don't you
> process the data already in storm ?
>
> Le mar. 15 sept. 2015 à 6:20, srungarapu vamsi <srungarapu1...@gmail.com>
> a écrit :
>
>> I am pretty new to spark. Please suggest a better model for the following
>> use case.
>>
>> I have few (about 1500) devices in field which keep emitting about 100KB
>> of data every minute. The nature of data sent by the devices is just a list
>> of numbers.
>> As of now, we have Storm is in the architecture which receives this data,
>> sanitizes it and writes to cassandra.
>> Now, i have a requirement to process this data. The processing includes
>> finding unique numbers emitted by one or more devices for every minute,
>> every hour, every day, every month.
>> I had implemented this processing part as a batch job execution and now i
>> am interested in making it a streaming application. i.e calculating the
>> processed data as and when devices emit the data.
>>
>> I have the following two approaches:
>> 1. Storm writes the actual data to cassandra and writes a message on
>> Kafka bus that data corresponding to device D and minute M has been written
>> to cassandra
>>
>> Then Spark streaming reads this message from kafka , then reads the data
>> of Device D at minute M from cassandra and starts processing the data.
>>
>> 2. Storm writes the data to both cassandra and  kafka, spark reads the
>> actual data from kafka , processes the data and writes to cassandra.
>> The second approach avoids additional hit of reading from cassandra every
>> minute , a device has written data to cassandra at the cost of putting the
>> actual heavy messages instead of light events on  kafka.
>>
>> I am a bit confused among the two approaches. Please suggest which one is
>> better and if both are bad, how can i handle this use case?
>>
>>
>> --
>> /Vamsi
>>
>


-- 
/Vamsi


Spark Streaming Suggestion

2015-09-14 Thread srungarapu vamsi
I am pretty new to spark. Please suggest a better model for the following
use case.

I have few (about 1500) devices in field which keep emitting about 100KB of
data every minute. The nature of data sent by the devices is just a list of
numbers.
As of now, we have Storm is in the architecture which receives this data,
sanitizes it and writes to cassandra.
Now, i have a requirement to process this data. The processing includes
finding unique numbers emitted by one or more devices for every minute,
every hour, every day, every month.
I had implemented this processing part as a batch job execution and now i
am interested in making it a streaming application. i.e calculating the
processed data as and when devices emit the data.

I have the following two approaches:
1. Storm writes the actual data to cassandra and writes a message on Kafka
bus that data corresponding to device D and minute M has been written to
cassandra

Then Spark streaming reads this message from kafka , then reads the data of
Device D at minute M from cassandra and starts processing the data.

2. Storm writes the data to both cassandra and  kafka, spark reads the
actual data from kafka , processes the data and writes to cassandra.
The second approach avoids additional hit of reading from cassandra every
minute , a device has written data to cassandra at the cost of putting the
actual heavy messages instead of light events on  kafka.

I am a bit confused among the two approaches. Please suggest which one is
better and if both are bad, how can i handle this use case?


-- 
/Vamsi


Multiple spark-submits vs akka-actors

2015-09-02 Thread srungarapu vamsi
Hi,

I am using a mesos cluster to run my spark jobs.
I have one mesos-master and two mesos-slaves setup on 2 machines.
On one machine, master and slave are setup and on the second machine
mesos-slave is setup
I run these on  m3-large ec2 instances.

1. When i try to submit two jobs using spark-submit in parallel, one job
hangs with the message : "Initial job has not accepted any resources; check
your cluster UI to ensure that workers are registered and have sufficient
resources". But when i check on the mesos cluster UI which runs at 5050
port, i can see idle memory which can be used by the hanging job. But
number of idle cores is 1.
So, does this mean that cores are pinned to spark-submit and no other
spark-submit can get the core till the running spark-submit completes ?

2. Assumption : "submitting multiple spark-jobs using spark-submit has the
above mentioned problem ".
Now my task is to run a spark-streaming job which reads from kafka and does
some precomputation.
The nature of my pre-computation jobs are in such a way that, each
pre-compute jobs has few mutually exclusive tasks to complete where all the
tasks have inherent tree structure in them. i.e A task initiates few other
tasks and they initiate further more tasks.
I already have spark jobs which run as a batch job to perform the
pre-computations  mentioned above. Now, is it a good idea to convert these
precompuations jobs into akka actors ?

3. If at all running multiple spark-submit jobs with shared CPU is
possible, for the scenario explained in Point.2, which approach is better :
"precomputation jobs as actors" vs "multiple spark-submits" ?

Any pointers to clear my above doubts is highly appreciated.
-- 
/Vamsi