Re: Generic Interface between RDD and DStream

2014-07-11 Thread Tathagata Das
I totally agree that doing if we are able to do this it will be very cool.
However, this requires having a common trait / interface between RDD and
DStream, which we dont have as of now. It would be very cool though. On my
wish list for sure.

TD


On Thu, Jul 10, 2014 at 11:53 AM, mshah shahmaul...@gmail.com wrote:

 I wanted to get a perspective on how to share code between Spark batch
 processing and Spark Streaming.

 For example, I want to get unique tweets stored in a HDFS file then in both
 Spark Batch and Spark Streaming. Currently I will have to do following
 thing:

 Tweet {
 String tweetText;
 String userId;
 }

 Spark Batch:
 tweets = sparkContext.newHadoopApiAsFile(tweet);

 def getUniqueTweets(tweets: RDD[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }

 Spark Streaming:

 tweets = streamingContext.fileStream(tweet);

 def getUniqueTweets(tweets: DStream[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }


 Above example shows I am doing the same thing but I have to replicate the
 code as there is no common abstraction between DStream and RDD,
 SparkContext
 and Streaming Context.

 If there was a common abstraction it would have been much simlper:

 tweets = context.read(tweet, Stream or Batch)

 def getUniqueTweets(tweets: SparkObject[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }

 I would appreciate thoughts on it. Is it already available? Is there any
 plan to add this support? Is it intentionally not supported because of
 design choice?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Generic-Interface-between-RDD-and-DStream-tp9331.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Generic Interface between RDD and DStream

2014-07-11 Thread andy petrella
A while ago, I wrote this:
```

package com.virdata.core.compute.common.api

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.StreamingContext

sealed trait SparkEnvironment extends Serializable {
  type Context

  type Wagon[A]
}
object Batch extends SparkEnvironment {
  type Context = SparkContext
  type Wagon[A] = RDD[A]
}
object Streaming extends SparkEnvironment{
  type Context = StreamingContext
  type Wagon[A] = DStream[A]
}

```
Then I can produce code like this (just an example)

```

package com.virdata.core.compute.common.api

import org.apache.spark.Logging

trait Process[M[_], In, N[_], Out, E : SparkEnvironment] extends
Logging { self =

  def run(in:M[E#Wagon[In]])(implicit context:E#Context):N[E#Wagon[Out]]

  def pipe[Q[_],U](follow:Process[N,Out,Q,U,E]):Process[M,In,Q,U,E] =
new Process[M,In,Q,U,E] {
override def run(in: M[E#Wagon[In]])(implicit context: E#Context):
Q[E#Wagon[U]] = {
  val run1: N[E#Wagon[Out]] = self.run(in)
  follow.run(run1)
}
  }
}

```

It's not resolving the whole thing, because we'll still have to duplicate
both code (for Batch and Streaming).
However, when the common traits will be there I'll have to remove half of
the implementations only -- without touching the calling side (using them),
and thus keeping my plain old backward compat' ^^.

I know it's just an intermediate hack, but still ;-)

greetz,


  aℕdy ℙetrella
about.me/noootsab
[image: aℕdy ℙetrella on about.me]

http://about.me/noootsab


On Sat, Jul 12, 2014 at 12:57 AM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 I totally agree that doing if we are able to do this it will be very cool.
 However, this requires having a common trait / interface between RDD and
 DStream, which we dont have as of now. It would be very cool though. On my
 wish list for sure.

 TD


 On Thu, Jul 10, 2014 at 11:53 AM, mshah shahmaul...@gmail.com wrote:

 I wanted to get a perspective on how to share code between Spark batch
 processing and Spark Streaming.

 For example, I want to get unique tweets stored in a HDFS file then in
 both
 Spark Batch and Spark Streaming. Currently I will have to do following
 thing:

 Tweet {
 String tweetText;
 String userId;
 }

 Spark Batch:
 tweets = sparkContext.newHadoopApiAsFile(tweet);

 def getUniqueTweets(tweets: RDD[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }

 Spark Streaming:

 tweets = streamingContext.fileStream(tweet);

 def getUniqueTweets(tweets: DStream[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }


 Above example shows I am doing the same thing but I have to replicate the
 code as there is no common abstraction between DStream and RDD,
 SparkContext
 and Streaming Context.

 If there was a common abstraction it would have been much simlper:

 tweets = context.read(tweet, Stream or Batch)

 def getUniqueTweets(tweets: SparkObject[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }

 I would appreciate thoughts on it. Is it already available? Is there any
 plan to add this support? Is it intentionally not supported because of
 design choice?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Generic-Interface-between-RDD-and-DStream-tp9331.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Re: Generic Interface between RDD and DStream

2014-07-11 Thread Tathagata Das
Hey Andy,

Thats pretty cool!! Is there a github repo where you can share this piece
of code for us to play around? If we can come up with a simple enough
general pattern, that can be very usefull!

TD


On Fri, Jul 11, 2014 at 4:12 PM, andy petrella andy.petre...@gmail.com
wrote:

 A while ago, I wrote this:
 ```


 package com.virdata.core.compute.common.api



 import org.apache.spark.rdd.RDD


 import org.apache.spark.SparkContext
 import org.apache.spark.streaming.dstream.DStream


 import org.apache.spark.streaming.StreamingContext



 sealed trait SparkEnvironment extends Serializable {


   type Context


   type Wagon[A]


 }
 object Batch extends SparkEnvironment {


   type Context = SparkContext


   type Wagon[A] = RDD[A]


 }
 object Streaming extends SparkEnvironment{


   type Context = StreamingContext


   type Wagon[A] = DStream[A]


 }

 ```
 Then I can produce code like this (just an example)

 ```



 package com.virdata.core.compute.common.api


 import org.apache.spark.Logging


 trait Process[M[_], In, N[_], Out, E : SparkEnvironment] extends Logging { 
 self =



   def run(in:M[E#Wagon[In]])(implicit context:E#Context):N[E#Wagon[Out]]



   def pipe[Q[_],U](follow:Process[N,Out,Q,U,E]):Process[M,In,Q,U,E] = new 
 Process[M,In,Q,U,E] {


 override def run(in: M[E#Wagon[In]])(implicit context: E#Context): 
 Q[E#Wagon[U]] = {


   val run1: N[E#Wagon[Out]] = self.run(in)


   follow.run(run1)


 }
   }

 }

 ```

 It's not resolving the whole thing, because we'll still have to duplicate
 both code (for Batch and Streaming).
 However, when the common traits will be there I'll have to remove half of
 the implementations only -- without touching the calling side (using them),
 and thus keeping my plain old backward compat' ^^.

 I know it's just an intermediate hack, but still ;-)

 greetz,


   aℕdy ℙetrella
 about.me/noootsab
 [image: aℕdy ℙetrella on about.me]

 http://about.me/noootsab


 On Sat, Jul 12, 2014 at 12:57 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 I totally agree that doing if we are able to do this it will be very
 cool. However, this requires having a common trait / interface between RDD
 and DStream, which we dont have as of now. It would be very cool though. On
 my wish list for sure.

 TD


 On Thu, Jul 10, 2014 at 11:53 AM, mshah shahmaul...@gmail.com wrote:

 I wanted to get a perspective on how to share code between Spark batch
 processing and Spark Streaming.

 For example, I want to get unique tweets stored in a HDFS file then in
 both
 Spark Batch and Spark Streaming. Currently I will have to do following
 thing:

 Tweet {
 String tweetText;
 String userId;
 }

 Spark Batch:
 tweets = sparkContext.newHadoopApiAsFile(tweet);

 def getUniqueTweets(tweets: RDD[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }

 Spark Streaming:

 tweets = streamingContext.fileStream(tweet);

 def getUniqueTweets(tweets: DStream[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }


 Above example shows I am doing the same thing but I have to replicate the
 code as there is no common abstraction between DStream and RDD,
 SparkContext
 and Streaming Context.

 If there was a common abstraction it would have been much simlper:

 tweets = context.read(tweet, Stream or Batch)

 def getUniqueTweets(tweets: SparkObject[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }

 I would appreciate thoughts on it. Is it already available? Is there any
 plan to add this support? Is it intentionally not supported because of
 design choice?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Generic-Interface-between-RDD-and-DStream-tp9331.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.