Re: Generic Interface between RDD and DStream
I totally agree that doing if we are able to do this it will be very cool. However, this requires having a common trait / interface between RDD and DStream, which we dont have as of now. It would be very cool though. On my wish list for sure. TD On Thu, Jul 10, 2014 at 11:53 AM, mshah shahmaul...@gmail.com wrote: I wanted to get a perspective on how to share code between Spark batch processing and Spark Streaming. For example, I want to get unique tweets stored in a HDFS file then in both Spark Batch and Spark Streaming. Currently I will have to do following thing: Tweet { String tweetText; String userId; } Spark Batch: tweets = sparkContext.newHadoopApiAsFile(tweet); def getUniqueTweets(tweets: RDD[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } Spark Streaming: tweets = streamingContext.fileStream(tweet); def getUniqueTweets(tweets: DStream[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } Above example shows I am doing the same thing but I have to replicate the code as there is no common abstraction between DStream and RDD, SparkContext and Streaming Context. If there was a common abstraction it would have been much simlper: tweets = context.read(tweet, Stream or Batch) def getUniqueTweets(tweets: SparkObject[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } I would appreciate thoughts on it. Is it already available? Is there any plan to add this support? Is it intentionally not supported because of design choice? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Generic-Interface-between-RDD-and-DStream-tp9331.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Generic Interface between RDD and DStream
A while ago, I wrote this: ``` package com.virdata.core.compute.common.api import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.StreamingContext sealed trait SparkEnvironment extends Serializable { type Context type Wagon[A] } object Batch extends SparkEnvironment { type Context = SparkContext type Wagon[A] = RDD[A] } object Streaming extends SparkEnvironment{ type Context = StreamingContext type Wagon[A] = DStream[A] } ``` Then I can produce code like this (just an example) ``` package com.virdata.core.compute.common.api import org.apache.spark.Logging trait Process[M[_], In, N[_], Out, E : SparkEnvironment] extends Logging { self = def run(in:M[E#Wagon[In]])(implicit context:E#Context):N[E#Wagon[Out]] def pipe[Q[_],U](follow:Process[N,Out,Q,U,E]):Process[M,In,Q,U,E] = new Process[M,In,Q,U,E] { override def run(in: M[E#Wagon[In]])(implicit context: E#Context): Q[E#Wagon[U]] = { val run1: N[E#Wagon[Out]] = self.run(in) follow.run(run1) } } } ``` It's not resolving the whole thing, because we'll still have to duplicate both code (for Batch and Streaming). However, when the common traits will be there I'll have to remove half of the implementations only -- without touching the calling side (using them), and thus keeping my plain old backward compat' ^^. I know it's just an intermediate hack, but still ;-) greetz, aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Sat, Jul 12, 2014 at 12:57 AM, Tathagata Das tathagata.das1...@gmail.com wrote: I totally agree that doing if we are able to do this it will be very cool. However, this requires having a common trait / interface between RDD and DStream, which we dont have as of now. It would be very cool though. On my wish list for sure. TD On Thu, Jul 10, 2014 at 11:53 AM, mshah shahmaul...@gmail.com wrote: I wanted to get a perspective on how to share code between Spark batch processing and Spark Streaming. For example, I want to get unique tweets stored in a HDFS file then in both Spark Batch and Spark Streaming. Currently I will have to do following thing: Tweet { String tweetText; String userId; } Spark Batch: tweets = sparkContext.newHadoopApiAsFile(tweet); def getUniqueTweets(tweets: RDD[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } Spark Streaming: tweets = streamingContext.fileStream(tweet); def getUniqueTweets(tweets: DStream[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } Above example shows I am doing the same thing but I have to replicate the code as there is no common abstraction between DStream and RDD, SparkContext and Streaming Context. If there was a common abstraction it would have been much simlper: tweets = context.read(tweet, Stream or Batch) def getUniqueTweets(tweets: SparkObject[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } I would appreciate thoughts on it. Is it already available? Is there any plan to add this support? Is it intentionally not supported because of design choice? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Generic-Interface-between-RDD-and-DStream-tp9331.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Generic Interface between RDD and DStream
Hey Andy, Thats pretty cool!! Is there a github repo where you can share this piece of code for us to play around? If we can come up with a simple enough general pattern, that can be very usefull! TD On Fri, Jul 11, 2014 at 4:12 PM, andy petrella andy.petre...@gmail.com wrote: A while ago, I wrote this: ``` package com.virdata.core.compute.common.api import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.StreamingContext sealed trait SparkEnvironment extends Serializable { type Context type Wagon[A] } object Batch extends SparkEnvironment { type Context = SparkContext type Wagon[A] = RDD[A] } object Streaming extends SparkEnvironment{ type Context = StreamingContext type Wagon[A] = DStream[A] } ``` Then I can produce code like this (just an example) ``` package com.virdata.core.compute.common.api import org.apache.spark.Logging trait Process[M[_], In, N[_], Out, E : SparkEnvironment] extends Logging { self = def run(in:M[E#Wagon[In]])(implicit context:E#Context):N[E#Wagon[Out]] def pipe[Q[_],U](follow:Process[N,Out,Q,U,E]):Process[M,In,Q,U,E] = new Process[M,In,Q,U,E] { override def run(in: M[E#Wagon[In]])(implicit context: E#Context): Q[E#Wagon[U]] = { val run1: N[E#Wagon[Out]] = self.run(in) follow.run(run1) } } } ``` It's not resolving the whole thing, because we'll still have to duplicate both code (for Batch and Streaming). However, when the common traits will be there I'll have to remove half of the implementations only -- without touching the calling side (using them), and thus keeping my plain old backward compat' ^^. I know it's just an intermediate hack, but still ;-) greetz, aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Sat, Jul 12, 2014 at 12:57 AM, Tathagata Das tathagata.das1...@gmail.com wrote: I totally agree that doing if we are able to do this it will be very cool. However, this requires having a common trait / interface between RDD and DStream, which we dont have as of now. It would be very cool though. On my wish list for sure. TD On Thu, Jul 10, 2014 at 11:53 AM, mshah shahmaul...@gmail.com wrote: I wanted to get a perspective on how to share code between Spark batch processing and Spark Streaming. For example, I want to get unique tweets stored in a HDFS file then in both Spark Batch and Spark Streaming. Currently I will have to do following thing: Tweet { String tweetText; String userId; } Spark Batch: tweets = sparkContext.newHadoopApiAsFile(tweet); def getUniqueTweets(tweets: RDD[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } Spark Streaming: tweets = streamingContext.fileStream(tweet); def getUniqueTweets(tweets: DStream[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } Above example shows I am doing the same thing but I have to replicate the code as there is no common abstraction between DStream and RDD, SparkContext and Streaming Context. If there was a common abstraction it would have been much simlper: tweets = context.read(tweet, Stream or Batch) def getUniqueTweets(tweets: SparkObject[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } I would appreciate thoughts on it. Is it already available? Is there any plan to add this support? Is it intentionally not supported because of design choice? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Generic-Interface-between-RDD-and-DStream-tp9331.html Sent from the Apache Spark User List mailing list archive at Nabble.com.