Visual PySpark Programming
Hi, I have the following use case and I did not find a suitable tool which can serve my purpose. Use case: Step 1,2,3 are UI driven. *Step 1*) A user should be able to choose data source (example HDFS) and should be able to configure it so that it points to a file. *Step 2*) A user should be able to apply filters, transformations and actions on the dataframe loaded in the previous step. *Step 3*) A user should be able to perform Step 2 any number of times as a chain. *Step 4*) A user should be able to click a Save button which would convert the data flow diagram into a pyspark job. I found tools like https://seahorse.deepsense.ai/, https://www.streamanalytix.com/product/streamanalytix/ which can do this. However, they give a scala/java spark job instead of a pyspark job. Moreover, these are paid products. a) Are there any opensource solutions which can serve my need? If not, I would like to build one. In order to build one, I would require a workflow UI editor which i can tweak to serve my purpose. But I did not find any free workflow UI editor which I can tweak. b) Are there any open sourced workflow UI editor which can help me in solving my use case? c) Are there any other interesting approaches to solve my use case? Thanks, Vamsi
Launch a pyspark Job From UI
Hi, I am looking for applications where we can trigger spark jobs from UI. Are there any such applications available? I have checked Spark-jobserver using which we can expose an api to submit a spark application. Are there any other alternatives using which i can submit pyspark jobs from UI ? Thanks, Vamsi
Re: Spark/Kafka Streaming Job Gets Stuck
Other than @Adrian suggestions, check if the processing delay is more than the batch processing time. On Thu, Oct 29, 2015 at 2:23 AM, Adrian Tanasewrote: > Does it work as expected with smaller batch or smaller load? Could it be > that it's accumulating too many events over 3 minutes? > > You could also try increasing the parallelism via repartition to ensure > smaller tasks that can safely fit in working memory. > > Sent from my iPhone > > > On 28 Oct 2015, at 17:45, Afshartous, Nick > wrote: > > > > > > Hi, we are load testing our Spark 1.3 streaming (reading from Kafka) > job and seeing a problem. This is running in AWS/Yarn and the streaming > batch interval is set to 3 minutes and this is a ten node cluster. > > > > Testing at 30,000 events per second we are seeing the streaming job get > stuck (stack trace below) for over an hour. > > > > Thanks on any insights or suggestions. > > -- > > Nick > > > > > org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartitionsToPair(JavaDStreamLike.scala:43) > > > com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.runStream(StreamingKafkaConsumerDriver.java:125) > > > com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.main(StreamingKafkaConsumerDriver.java:71) > > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > java.lang.reflect.Method.invoke(Method.java:606) > > > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480) > > > > Notice: This communication is for the intended recipient(s) only and may > contain confidential, proprietary, legally protected or privileged > information of Turbine, Inc. If you are not the intended recipient(s), > please notify the sender at once and delete this communication. > Unauthorized use of the information in this communication is strictly > prohibited and may be unlawful. For those recipients under contract with > Turbine, Inc., the information in this communication is subject to the > terms and conditions of any applicable contracts or agreements. > > > > - > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > For additional commands, e-mail: user-h...@spark.apache.org > > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- /Vamsi
reduceByKeyAndWindow confusion
I create a stream from kafka as belows" val kafkaDStream = KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc, kafkaConf, Set(topics)) .window(Minutes(WINDOW_DURATION),Minutes(SLIDER_DURATION)) I have a map ("intToStringList") which is a Map[Int,List[String]] using this map i am filtering the stream and finally converting it into Map[Int,DStream[KafkaGenericEvent]]] 1. Now on this map, for each and every value (which is a DStream[KafkaGenericEvent]) i am applying reduceByKeyAndWindow operation. But since we have to give window duration and slider duration even in reduceByKeyAndWindow, does that imply that on every window of the given DStream, reduceByKeyAndWindow can be applied with a different window duration and slider duration ? i.e Lets say window DStream is created with window duration-> 16 minutes, slider duration -> 1 Minute, so i have one RDD for every window For reduceByKeyAndWindow, if we have window duration as as 4 minutes and slider duration as 1 minute, then will i get 4 RDDs since the windowDStream_batchDuration / reduceByKeyAndwindow_batchDuration is 4 ? 2. As suggested in spark doc, i am trying to give checkpointing interval on the kafkaDStream created in the block shown above in the following way: kafkaDStream.checkpoint(Minutes(4)) But when i execute this, i get the error: "WindowedDStream has been marked for checkpointing but the storage level has not been set to enable persisting. Please use DStream.persist() to set the storage level to use memory for better checkpointing performance" But when i went through the implementation of checkpoint function of DStream.scala, i see a call to persist() function. Then do i really have to persist function in the WindowedDStream ? Just to give a shot i made a call to persist method on the windowedDStream and then made a call to checkpoint(interval) . Even then i am facing the above mentioned error. How do i solve this ? -- /Vamsi
Re: Invalid checkpoint url
@Das, No, i am getting in the cluster mode. I think i understood why i am getting this error, please correct me if i am wrong. Reason is: checkpointing writes rdd to disk, so this checkpointing happens on all workers. Whenever, spark has to read back the rdd , checkpoint directory should be reachable to all the workers and should be a common place where workers can write to and read from. This asks for commonly accessible file system like nfs or hdfs or s3. So, if i give ssc.checkpoint("some local directory"), since workers are not able to read the rdds from the other worker's checkpoint directory , i am facing the above mentioned error. With this understanding, i am creating a t2 medium, hdfs 2.7.1 node and pointing the check point directory to "hdfs://ip:port/path/to/directory" Please correct me if i am wrong. On Wed, Sep 23, 2015 at 4:53 AM, Tathagata Das <t...@databricks.com> wrote: > Are you getting this error in local mode? > > > On Tue, Sep 22, 2015 at 7:34 AM, srungarapu vamsi < > srungarapu1...@gmail.com> wrote: > >> Yes, I tried ssc.checkpoint("checkpoint"), it works for me as long as i >> don't use reduceByKeyAndWindow. >> >> When i start using "reduceByKeyAndWindow" it complains me with the error >> "Exception in thread "main" org.apache.spark.SparkException: Invalid >> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171-01f3-48$ >> 2-97be-e3862eb5c944/rdd-8" >> >> The stack trace is as below: >> >> Exception in thread "main" org.apache.spark.SparkException: Invalid >> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171[22/9706$ >> 2-97be-e3862eb5c944/rdd-8 >> at >> org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54) >> at >> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) >> at >> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) >> at scala.Option.getOrElse(Option.scala:120) >> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) >> at >> org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:97) >> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1415) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at org.apache.spa
Re: Invalid checkpoint url
oolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) On Tue, Sep 22, 2015 at 6:49 PM, Adrian Tanase <atan...@adobe.com> wrote: > Have you tried simply ssc.checkpoint("checkpoint”)? This should create it > in the local folder, has always worked for me when in development on local > mode. > > For the others (/tmp/..) make sure you have rights to write there. > > -adrian > > From: srungarapu vamsi > Date: Tuesday, September 22, 2015 at 7:59 AM > To: user > Subject: Invalid checkpoint url > > I am using reduceByKeyAndWindow (with inverse reduce function) in my code. > In order to use this, it seems the checkpointDirectory which i have to use > should be hadoop compatible file system. > Does that mean that, i should setup hadoop on my system. > I googled about this and i found in a S.O answer that i need not setup > hdfs but the checkpoint directory should be HDFS copatible. > > I am a beginner in this area. I am running my spark streaming application > on ubuntu 14.04, spark -1.3.1 > If at all i need not setup hdfs and ext4 is hdfs compatible, then how does > my checkpoint directory look like? > > i tried all these: > ssc.checkpoint("/tmp/checkpoint") > ssc.checkpoint("hdfs:///tmp/checkpoint") > ssc.checkpoint("file:///tmp/checkpoint") > > But none of them worked for me. > > -- > /Vamsi > -- /Vamsi
Invalid checkpoint url
I am using reduceByKeyAndWindow (with inverse reduce function) in my code. In order to use this, it seems the checkpointDirectory which i have to use should be hadoop compatible file system. Does that mean that, i should setup hadoop on my system. I googled about this and i found in a S.O answer that i need not setup hdfs but the checkpoint directory should be HDFS copatible. I am a beginner in this area. I am running my spark streaming application on ubuntu 14.04, spark -1.3.1 If at all i need not setup hdfs and ext4 is hdfs compatible, then how does my checkpoint directory look like? i tried all these: ssc.checkpoint("/tmp/checkpoint") ssc.checkpoint("hdfs:///tmp/checkpoint") ssc.checkpoint("file:///tmp/checkpoint") But none of them worked for me. -- /Vamsi
Re: Spark Streaming kafka directStream value decoder issue
@Adrian, I am doing collect for debugging purpose. But i have to use foreachRDD so that i can operate on top of this rdd and eventually save to DB. But my actual problem here is to properly convert Array[Byte] to my custom object. On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase <atan...@adobe.com> wrote: > Why are you calling foreachRdd / collect in the first place? > > Instead of using a custom decoder, you should simply do – this is code > executed on the workers and allows the computation to continue. ForeachRdd > and collect are output operations and force the data to be collected on the > driver (assuming you don’t want that…) > > val events = kafkaDStream.map { case(devId,byteArray)=> > KafkaGenericEvent.parseFrom(byteArray) } > > From: srungarapu vamsi > Date: Thursday, September 17, 2015 at 4:03 PM > To: user > Subject: Spark Streaming kafka directStream value decoder issue > > I am using KafkaUtils.createDirectStream to read the data from kafka bus. > > On the producer end, i am generating in the following way: > > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) > props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringSerializer") > props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringSerializer") > val producer = new KafkaProducer[String, KafkaGenericEvent](props) > > // Send some messages > println("Sending message") > val kafkaGenericEvent = new > KafkaGenericEvent("event-id",EventType.site,"6",144050040L) > val message = new ProducerRecord[String, > KafkaGenericEvent](topic,"myKey", kafkaGenericEvent) > producer.send(message) > } > > I am connecting to kafka using the console consumer script and am able to > see proper data. The KafkaGenericEvent used in the above code is the class > generated using ScalaBuff from a protobuff file. > > On the consumer end, > If i read the value as a normal byte array and the convert it into > KafkaGenericEvent in the following way, i get proper data: > > val kafkaDStream = > KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, > kafkaConf, Set(topics)) > > kafkaDStream.foreachRDD(rdd=>rdd.collect().map{ > case(devId,byteArray)=>{ > println(KafkaGenericEvent.parseFrom(byteArray)) > } > }) > > But if change the value to KafkaGenericEvent and use a custom decoder like > this: > > class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends > Decoder[KafkaGenericEvent]{ > override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = { >KafkaGenericEvent.parseFrom(bytes) > } > } > > and in consumer: > > val kafkaDStream = > KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc, > kafkaConf, Set(topics)) > kafkaDStream foreachRDD(rdd=>rdd.collect().map{ > case(devId,genericEvent)=>{ > println(genericEvent) > } > }) > > Now, i my value object KafkaGenericEvent is not created based on the > sent data instead it is creating an empty Object of KafkaGenericEvent with > default values. > > Even if i read the value as array of bytes in the createDirectStream and > than apply a transformation in the following way i am getting in correct > values: > > val kafkaDStream = > KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, > kafkaConf, Set(topics)) > > kafkaDStream.map{ > case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray)) > } foreachRDD(rdd=>rdd.collect().map{ > case(devId,genericEvent)=>{ > println(genericEvent) > } > }) > > I get the default KafkaGenericEvent Object in the line println > (genericEvent) > Does this mean that I can transform the values only on the driver and not > on the executors? > > I am completely confused here! > I am using : > scala-2.10.4 > spark-1.3.1 > kafka_2.10-0.8.2.1 > > - > /Vamsi > -- /Vamsi
Spark Streaming kafka directStream value decoder issue
I am using KafkaUtils.createDirectStream to read the data from kafka bus. On the producer end, i am generating in the following way: props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, KafkaGenericEvent](props) // Send some messages println("Sending message") val kafkaGenericEvent = new KafkaGenericEvent("event-id",EventType.site,"6",144050040L) val message = new ProducerRecord[String, KafkaGenericEvent](topic,"myKey", kafkaGenericEvent) producer.send(message) } I am connecting to kafka using the console consumer script and am able to see proper data. The KafkaGenericEvent used in the above code is the class generated using ScalaBuff from a protobuff file. On the consumer end, If i read the value as a normal byte array and the convert it into KafkaGenericEvent in the following way, i get proper data: val kafkaDStream = KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, kafkaConf, Set(topics)) kafkaDStream.foreachRDD(rdd=>rdd.collect().map{ case(devId,byteArray)=>{ println(KafkaGenericEvent.parseFrom(byteArray)) } }) But if change the value to KafkaGenericEvent and use a custom decoder like this: class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends Decoder[KafkaGenericEvent]{ override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = { KafkaGenericEvent.parseFrom(bytes) } } and in consumer: val kafkaDStream = KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc, kafkaConf, Set(topics)) kafkaDStream foreachRDD(rdd=>rdd.collect().map{ case(devId,genericEvent)=>{ println(genericEvent) } }) Now, i my value object KafkaGenericEvent is not created based on the sent data instead it is creating an empty Object of KafkaGenericEvent with default values. Even if i read the value as array of bytes in the createDirectStream and than apply a transformation in the following way i am getting in correct values: val kafkaDStream = KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, kafkaConf, Set(topics)) kafkaDStream.map{ case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray)) } foreachRDD(rdd=>rdd.collect().map{ case(devId,genericEvent)=>{ println(genericEvent) } }) I get the default KafkaGenericEvent Object in the line println(genericEvent) Does this mean that I can transform the values only on the driver and not on the executors? I am completely confused here! I am using : scala-2.10.4 spark-1.3.1 kafka_2.10-0.8.2.1 - /Vamsi
Re: Spark Streaming kafka directStream value decoder issue
@Saisai Shao, Thanks for the pointer. It turned out to be the serialization issue. I was using scalabuff to generate my "KafkaGenericEvent" class. But when i went through the generated class code, i figured out that it is not serializable. Now i am generating my classes using scalapb ( https://github.com/trueaccord/ScalaPB) and my problem is solved. Thanks On Thu, Sep 17, 2015 at 10:43 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote: > Is your "KafkaGenericEvent" serializable? Since you call rdd.collect() to > fetch the data to local driver, so this KafkaGenericEvent need to be > serialized and deserialized through Java or Kryo (depends on your > configuration) serializer, not sure if it is your problem to always get a > default object. > > Also would you provide the implementation of `parseFrom`, so we could > better understand the details of how you do deserialization. > > Thanks > Saisai > > On Thu, Sep 17, 2015 at 9:49 AM, srungarapu vamsi < > srungarapu1...@gmail.com> wrote: > >> If i understand correctly, i guess you are suggesting me to do this : >> >> val kafkaDStream = >> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, >> kafkaConf, Set(topics)) >> >> kafkaDStream.map{ >> case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray)) >> } foreachRDD(rdd=>rdd.collect().map{ >> case(devId,genericEvent)=>{ >> println(genericEvent) >> } >> }) >> >> I read from Kafka as a Byte Array => applied a transformation on the >> byteArray to Custom Class => Printed the custom class for debugging purpose. >> >> But this is not helping me. i.e i am getting an empty object with default >> values when i printed "genericEvent" >> >> Please correct me if i did not get what you are suggesting me to try. >> >> >> On Thu, Sep 17, 2015 at 9:30 PM, Adrian Tanase <atan...@adobe.com> wrote: >> >>> I guess what I'm asking is why not start with a Byte array like in the >>> example that works (using the DefaultDecoder) then map over it and do the >>> decoding manually like I'm suggesting below. >>> >>> Have you tried this approach? We have the same workflow (kafka => >>> protobuf => custom class) and it works. >>> If you expect invalid messages, you can use flatMap instead and wrap >>> .parseFrom in a Try {} .toOption. >>> >>> Sent from my iPhone >>> >>> On 17 Sep 2015, at 18:23, srungarapu vamsi <srungarapu1...@gmail.com> >>> wrote: >>> >>> @Adrian, >>> I am doing collect for debugging purpose. But i have to use foreachRDD >>> so that i can operate on top of this rdd and eventually save to DB. >>> >>> But my actual problem here is to properly convert Array[Byte] to my >>> custom object. >>> >>> On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase <atan...@adobe.com> >>> wrote: >>> >>>> Why are you calling foreachRdd / collect in the first place? >>>> >>>> Instead of using a custom decoder, you should simply do – this is code >>>> executed on the workers and allows the computation to continue. ForeachRdd >>>> and collect are output operations and force the data to be collected on the >>>> driver (assuming you don’t want that…) >>>> >>>> val events = kafkaDStream.map { case(devId,byteArray)=> >>>> KafkaGenericEvent.parseFrom(byteArray) } >>>> >>>> From: srungarapu vamsi >>>> Date: Thursday, September 17, 2015 at 4:03 PM >>>> To: user >>>> Subject: Spark Streaming kafka directStream value decoder issue >>>> >>>> I am using KafkaUtils.createDirectStream to read the data from kafka >>>> bus. >>>> >>>> On the producer end, i am generating in the following way: >>>> >>>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) >>>> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, >>>> "org.apache.kafka.common.serialization.StringSerializer") >>>> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, >>>> "org.apache.kafka.common.serialization.StringSerializer") >>>> val producer = new KafkaProducer[String, KafkaGenericEvent](props) >>>> >>>> // Send some messages >>>> println("Sending message") >>>> val k
Re: Spark Streaming kafka directStream value decoder issue
If i understand correctly, i guess you are suggesting me to do this : val kafkaDStream = KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, kafkaConf, Set(topics)) kafkaDStream.map{ case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray)) } foreachRDD(rdd=>rdd.collect().map{ case(devId,genericEvent)=>{ println(genericEvent) } }) I read from Kafka as a Byte Array => applied a transformation on the byteArray to Custom Class => Printed the custom class for debugging purpose. But this is not helping me. i.e i am getting an empty object with default values when i printed "genericEvent" Please correct me if i did not get what you are suggesting me to try. On Thu, Sep 17, 2015 at 9:30 PM, Adrian Tanase <atan...@adobe.com> wrote: > I guess what I'm asking is why not start with a Byte array like in the > example that works (using the DefaultDecoder) then map over it and do the > decoding manually like I'm suggesting below. > > Have you tried this approach? We have the same workflow (kafka => protobuf > => custom class) and it works. > If you expect invalid messages, you can use flatMap instead and wrap > .parseFrom in a Try {} .toOption. > > Sent from my iPhone > > On 17 Sep 2015, at 18:23, srungarapu vamsi <srungarapu1...@gmail.com> > wrote: > > @Adrian, > I am doing collect for debugging purpose. But i have to use foreachRDD so > that i can operate on top of this rdd and eventually save to DB. > > But my actual problem here is to properly convert Array[Byte] to my custom > object. > > On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase <atan...@adobe.com> wrote: > >> Why are you calling foreachRdd / collect in the first place? >> >> Instead of using a custom decoder, you should simply do – this is code >> executed on the workers and allows the computation to continue. ForeachRdd >> and collect are output operations and force the data to be collected on the >> driver (assuming you don’t want that…) >> >> val events = kafkaDStream.map { case(devId,byteArray)=> >> KafkaGenericEvent.parseFrom(byteArray) } >> >> From: srungarapu vamsi >> Date: Thursday, September 17, 2015 at 4:03 PM >> To: user >> Subject: Spark Streaming kafka directStream value decoder issue >> >> I am using KafkaUtils.createDirectStream to read the data from kafka bus. >> >> On the producer end, i am generating in the following way: >> >> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) >> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, >> "org.apache.kafka.common.serialization.StringSerializer") >> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, >> "org.apache.kafka.common.serialization.StringSerializer") >> val producer = new KafkaProducer[String, KafkaGenericEvent](props) >> >> // Send some messages >> println("Sending message") >> val kafkaGenericEvent = new >> KafkaGenericEvent("event-id",EventType.site,"6",144050040L) >> val message = new ProducerRecord[String, >> KafkaGenericEvent](topic,"myKey", kafkaGenericEvent) >> producer.send(message) >> } >> >> I am connecting to kafka using the console consumer script and am able to >> see proper data. The KafkaGenericEvent used in the above code is the class >> generated using ScalaBuff from a protobuff file. >> >> On the consumer end, >> If i read the value as a normal byte array and the convert it into >> KafkaGenericEvent in the following way, i get proper data: >> >> val kafkaDStream = >> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, >> kafkaConf, Set(topics)) >> >> kafkaDStream.foreachRDD(rdd=>rdd.collect().map{ >> case(devId,byteArray)=>{ >> println(KafkaGenericEvent.parseFrom(byteArray)) >> } >> }) >> >> But if change the value to KafkaGenericEvent and use a custom decoder >> like this: >> >> class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends >> Decoder[KafkaGenericEvent]{ >> override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = { >>KafkaGenericEvent.parseFrom(bytes) >> } >> } >> >> and in consumer: >> >> val kafkaDStream = >> KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc, >> kafkaConf, Set(topics)) >> kafkaDStream fore
Re: Spark Streaming Suggestion
@David, I am going through the articles you have shared. Will message you if i need any hellp. Thanks @Ayan, Yes, it looks like i can get every thing done with spark streaming. In fact we have storm already in the architecture sanitizing the data and dumping into cassandra. Now, i got some new requirements for which spark streaming is the right tool. Just wanted to see if there can be smooth marriage between existing storm and spark streaming. Thanks for the inputs. On Wed, Sep 16, 2015 at 2:30 AM, ayan guha <guha.a...@gmail.com> wrote: > I think you need to make up your mind about storm vs spark. Using both in > this context does not make much sense to me. > On 15 Sep 2015 22:54, "David Morales" <dmora...@stratio.com> wrote: > >> Hi there, >> >> This is exactly our goal in Stratio Sparkta, a real-time aggregation >> engine fully developed with spark streaming (and fully open source). >> >> Take a look at: >> >> >>- the docs: http://docs.stratio.com/modules/sparkta/development/ >>- the repository: https://github.com/Stratio/sparkta >>- and some slides explaining how sparkta was born and what it makes: >>http://www.slideshare.net/Stratio/strata-sparkta >> >> >> Feel free to ask us anything about the project. >> >> >> >> >> >> >> >> >> 2015-09-15 8:10 GMT+02:00 srungarapu vamsi <srungarapu1...@gmail.com>: >> >>> The batch approach i had implemented takes about 10 minutes to complete >>> all the pre-computation tasks for the one hour worth of data. When i went >>> through my code, i figured out that most of the time consuming tasks are >>> the ones, which read data from cassandra and the places where i perform >>> sparkContex.union(Array[RDD]). >>> Now the ask is to get the pre computation tasks near real time. So i am >>> exploring the streaming approach. >>> >>> My pre computation tasks not only include just finding the unique >>> numbers for a given device every minute, every hour, every day but it also >>> includes the following tasks: >>> 1. Find the number of unique numbers across a set of devices every >>> minute, every hour, every day >>> 2. Find the number of unique numbers which are commonly occurring across >>> a set of devices every minute, every hour, every day >>> 3. Find (total time a number occurred across a set of devices)/(total >>> unique numbers occurred across the set of devices) >>> The above mentioned pre computation tasks are just a few of what i will >>> be needing and there are many more coming towards me :) >>> I see all these problems need more of data parallel approach and hence i >>> am interested to do this on the spark streaming end. >>> >>> >>> On Tue, Sep 15, 2015 at 11:04 AM, Jörn Franke <jornfra...@gmail.com> >>> wrote: >>> >>>> Why did you not stay with the batch approach? For me the architecture >>>> looks very complex for a simple thing you want to achieve. Why don't you >>>> process the data already in storm ? >>>> >>>> Le mar. 15 sept. 2015 à 6:20, srungarapu vamsi < >>>> srungarapu1...@gmail.com> a écrit : >>>> >>>>> I am pretty new to spark. Please suggest a better model for the >>>>> following use case. >>>>> >>>>> I have few (about 1500) devices in field which keep emitting about >>>>> 100KB of data every minute. The nature of data sent by the devices is just >>>>> a list of numbers. >>>>> As of now, we have Storm is in the architecture which receives this >>>>> data, sanitizes it and writes to cassandra. >>>>> Now, i have a requirement to process this data. The processing >>>>> includes finding unique numbers emitted by one or more devices for every >>>>> minute, every hour, every day, every month. >>>>> I had implemented this processing part as a batch job execution and >>>>> now i am interested in making it a streaming application. i.e calculating >>>>> the processed data as and when devices emit the data. >>>>> >>>>> I have the following two approaches: >>>>> 1. Storm writes the actual data to cassandra and writes a message on >>>>> Kafka bus that data corresponding to device D and minute M has been >>>>> written >>>>> to cassandra >>>>> >>>>> Then Spark streaming reads this message from k
Re: Spark Streaming Suggestion
The batch approach i had implemented takes about 10 minutes to complete all the pre-computation tasks for the one hour worth of data. When i went through my code, i figured out that most of the time consuming tasks are the ones, which read data from cassandra and the places where i perform sparkContex.union(Array[RDD]). Now the ask is to get the pre computation tasks near real time. So i am exploring the streaming approach. My pre computation tasks not only include just finding the unique numbers for a given device every minute, every hour, every day but it also includes the following tasks: 1. Find the number of unique numbers across a set of devices every minute, every hour, every day 2. Find the number of unique numbers which are commonly occurring across a set of devices every minute, every hour, every day 3. Find (total time a number occurred across a set of devices)/(total unique numbers occurred across the set of devices) The above mentioned pre computation tasks are just a few of what i will be needing and there are many more coming towards me :) I see all these problems need more of data parallel approach and hence i am interested to do this on the spark streaming end. On Tue, Sep 15, 2015 at 11:04 AM, Jörn Franke <jornfra...@gmail.com> wrote: > Why did you not stay with the batch approach? For me the architecture > looks very complex for a simple thing you want to achieve. Why don't you > process the data already in storm ? > > Le mar. 15 sept. 2015 à 6:20, srungarapu vamsi <srungarapu1...@gmail.com> > a écrit : > >> I am pretty new to spark. Please suggest a better model for the following >> use case. >> >> I have few (about 1500) devices in field which keep emitting about 100KB >> of data every minute. The nature of data sent by the devices is just a list >> of numbers. >> As of now, we have Storm is in the architecture which receives this data, >> sanitizes it and writes to cassandra. >> Now, i have a requirement to process this data. The processing includes >> finding unique numbers emitted by one or more devices for every minute, >> every hour, every day, every month. >> I had implemented this processing part as a batch job execution and now i >> am interested in making it a streaming application. i.e calculating the >> processed data as and when devices emit the data. >> >> I have the following two approaches: >> 1. Storm writes the actual data to cassandra and writes a message on >> Kafka bus that data corresponding to device D and minute M has been written >> to cassandra >> >> Then Spark streaming reads this message from kafka , then reads the data >> of Device D at minute M from cassandra and starts processing the data. >> >> 2. Storm writes the data to both cassandra and kafka, spark reads the >> actual data from kafka , processes the data and writes to cassandra. >> The second approach avoids additional hit of reading from cassandra every >> minute , a device has written data to cassandra at the cost of putting the >> actual heavy messages instead of light events on kafka. >> >> I am a bit confused among the two approaches. Please suggest which one is >> better and if both are bad, how can i handle this use case? >> >> >> -- >> /Vamsi >> > -- /Vamsi
Spark Streaming Suggestion
I am pretty new to spark. Please suggest a better model for the following use case. I have few (about 1500) devices in field which keep emitting about 100KB of data every minute. The nature of data sent by the devices is just a list of numbers. As of now, we have Storm is in the architecture which receives this data, sanitizes it and writes to cassandra. Now, i have a requirement to process this data. The processing includes finding unique numbers emitted by one or more devices for every minute, every hour, every day, every month. I had implemented this processing part as a batch job execution and now i am interested in making it a streaming application. i.e calculating the processed data as and when devices emit the data. I have the following two approaches: 1. Storm writes the actual data to cassandra and writes a message on Kafka bus that data corresponding to device D and minute M has been written to cassandra Then Spark streaming reads this message from kafka , then reads the data of Device D at minute M from cassandra and starts processing the data. 2. Storm writes the data to both cassandra and kafka, spark reads the actual data from kafka , processes the data and writes to cassandra. The second approach avoids additional hit of reading from cassandra every minute , a device has written data to cassandra at the cost of putting the actual heavy messages instead of light events on kafka. I am a bit confused among the two approaches. Please suggest which one is better and if both are bad, how can i handle this use case? -- /Vamsi
Multiple spark-submits vs akka-actors
Hi, I am using a mesos cluster to run my spark jobs. I have one mesos-master and two mesos-slaves setup on 2 machines. On one machine, master and slave are setup and on the second machine mesos-slave is setup I run these on m3-large ec2 instances. 1. When i try to submit two jobs using spark-submit in parallel, one job hangs with the message : "Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources". But when i check on the mesos cluster UI which runs at 5050 port, i can see idle memory which can be used by the hanging job. But number of idle cores is 1. So, does this mean that cores are pinned to spark-submit and no other spark-submit can get the core till the running spark-submit completes ? 2. Assumption : "submitting multiple spark-jobs using spark-submit has the above mentioned problem ". Now my task is to run a spark-streaming job which reads from kafka and does some precomputation. The nature of my pre-computation jobs are in such a way that, each pre-compute jobs has few mutually exclusive tasks to complete where all the tasks have inherent tree structure in them. i.e A task initiates few other tasks and they initiate further more tasks. I already have spark jobs which run as a batch job to perform the pre-computations mentioned above. Now, is it a good idea to convert these precompuations jobs into akka actors ? 3. If at all running multiple spark-submit jobs with shared CPU is possible, for the scenario explained in Point.2, which approach is better : "precomputation jobs as actors" vs "multiple spark-submits" ? Any pointers to clear my above doubts is highly appreciated. -- /Vamsi