A while ago, I wrote this:
```

package com.virdata.core.compute.common.api

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.StreamingContext

sealed trait SparkEnvironment extends Serializable {
  type Context

  type Wagon[A]
}
object Batch extends SparkEnvironment {
  type Context = SparkContext
  type Wagon[A] = RDD[A]
}
object Streaming extends SparkEnvironment{
  type Context = StreamingContext
  type Wagon[A] = DStream[A]
}

```
Then I can produce code like this (just an example)

```

package com.virdata.core.compute.common.api

import org.apache.spark.Logging

trait Process[M[_], In, N[_], Out, E <: SparkEnvironment] extends
Logging { self =>

  def run(in:M[E#Wagon[In]])(implicit context:E#Context):N[E#Wagon[Out]]

  def pipe[Q[_],U](follow:Process[N,Out,Q,U,E]):Process[M,In,Q,U,E] =
new Process[M,In,Q,U,E] {
    override def run(in: M[E#Wagon[In]])(implicit context: E#Context):
Q[E#Wagon[U]] = {
      val run1: N[E#Wagon[Out]] = self.run(in)
      follow.run(run1)
    }
  }
}

```

It's not resolving the whole thing, because we'll still have to duplicate
both code (for Batch and Streaming).
However, when the common traits will be there I'll have to remove half of
the implementations only -- without touching the calling side (using them),
and thus keeping my plain old backward compat' ^^.

I know it's just an intermediate hack, but still ;-)

greetz,


  aℕdy ℙetrella
about.me/noootsab
[image: aℕdy ℙetrella on about.me]

<http://about.me/noootsab>


On Sat, Jul 12, 2014 at 12:57 AM, Tathagata Das <tathagata.das1...@gmail.com
> wrote:

> I totally agree that doing if we are able to do this it will be very cool.
> However, this requires having a common trait / interface between RDD and
> DStream, which we dont have as of now. It would be very cool though. On my
> wish list for sure.
>
> TD
>
>
> On Thu, Jul 10, 2014 at 11:53 AM, mshah <shahmaul...@gmail.com> wrote:
>
>> I wanted to get a perspective on how to share code between Spark batch
>> processing and Spark Streaming.
>>
>> For example, I want to get unique tweets stored in a HDFS file then in
>> both
>> Spark Batch and Spark Streaming. Currently I will have to do following
>> thing:
>>
>> Tweet {
>> String tweetText;
>> String userId;
>> }
>>
>> Spark Batch:
>> tweets = sparkContext.newHadoopApiAsFile("tweet");
>>
>> def getUniqueTweets(tweets: RDD[Tweet])= {
>>      tweets.map(tweet=>(tweetText,
>> tweet).groupByKey(tweetText).map((tweetText, _) =>tweetText)
>> }
>>
>> Spark Streaming:
>>
>> tweets = streamingContext.fileStream("tweet");
>>
>> def getUniqueTweets(tweets: DStream[Tweet])= {
>>      tweets.map(tweet=>(tweetText,
>> tweet).groupByKey(tweetText).map((tweetText, _) =>tweetText)
>> }
>>
>>
>> Above example shows I am doing the same thing but I have to replicate the
>> code as there is no common abstraction between DStream and RDD,
>> SparkContext
>> and Streaming Context.
>>
>> If there was a common abstraction it would have been much simlper:
>>
>> tweets = context.read("tweet", Stream or Batch)
>>
>> def getUniqueTweets(tweets: SparkObject[Tweet])= {
>>      tweets.map(tweet=>(tweetText,
>> tweet).groupByKey(tweetText).map((tweetText, _) =>tweetText)
>> }
>>
>> I would appreciate thoughts on it. Is it already available? Is there any
>> plan to add this support? Is it intentionally not supported because of
>> design choice?
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Generic-Interface-between-RDD-and-DStream-tp9331.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>

Reply via email to