Then go for the second option I suggested - simply turn (keep turning) your 
HDFS file (Batch RDD) into a stream of messages (outside spark streaming) – 
then spark streaming consumes and aggregates the messages FOR THE RUNTIME 
LIFETIME of your application in some of the following ways:

 

1.       Continuous Union of DStream RDDs as you also Persist the result (so it 
doesn’t not get discarded whioch is what happens to DStream RDDs by default in 
spark streaming)   

2.       Apply one of the Window operations e.g. aggregation on the DSream RDD 
– as the window is the runtime lifetime of the app 

 

And at the same time you join the DStream RDDs of your actual Streaming Data 
with the above continuously updated DStream RDD representing your HDFS file 

 

From: Ilove Data [mailto:data4...@gmail.com] 
Sent: Monday, June 15, 2015 5:19 AM
To: Tathagata Das
Cc: Evo Eftimov; Akhil Das; user
Subject: Re: Join between DStream and Periodically-Changing-RDD

 

@Akhil Das

Join two Dstreams might not be an option since I want to join stream with 
historical data in HDFS folder.

 

@Tagatha Das & @Evo Eftimov

Batch RDD to be reloaded is considerably huge compare to Dstream data since it 
is historical data. To be more specific, most of join from rdd stream to hdfs 
folder (~90% of rdd streams data) will hit to recent data (last 1-2 days data) 
in hdfs folder. So it is important to get the most updated data.

 

Is there a workaround for that specific case? Since RDD are not mutable, do I 
need a K-V database for this join with historical data?

 

On Fri, Jun 12, 2015 at 8:14 AM, Tathagata Das <t...@databricks.com> wrote:

Another approach not mentioned is to use a function to get the RDD that is to 
be joined. Something like this.

 

 

Not sure, but you can try something like this also:

 

kvDstream.foreachRDD(rdd => {

      

      val rdd = getOrUpdateRDD(....params...)

      

      rdd.join(kvFile)

      

      

    })

The getOrUpdateRDD() function that you implement will get called every batch 
interval. And you can decide to return the same RDD or an updated RDD when you 
want to. Once updated, if the RDD is going to be used in multiple batch 
intervals, you should cache it. Furthermore, if you are going to join it, you 
should partition it by a partitioner, then cached it and make sure that the 
same partitioner is used for joining. That would be more efficient, as the RDD 
will stay partitioned in memory, minimizing the cost of join. 

 

 

On Wed, Jun 10, 2015 at 9:08 AM, Evo Eftimov <evo.efti...@isecc.com> wrote:

It depends on how big the Batch RDD requiring reloading is 

 

Reloading it for EVERY single DStream RDD would slow down the stream processing 
inline with the total time required to reload the Batch RDD …..

 

But if the Batch RDD is not that big then that might not be an issues 
especially in the context of the latency requirements for your streaming app

 

Another more efficient and real-time approach may be to represent your Batch 
RDD as a Dstraeam RDDs and keep aggregating them over the lifetime of the spark 
streaming app instance and keep joining with the actual Dstream RDDs 

 

You can feed your HDFS file into a Message Broker topic and consume it from 
there in the form of DStream RDDs which you keep aggregating over the lifetime 
of the spark streaming app instance 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Wednesday, June 10, 2015 8:36 AM
To: Ilove Data
Cc: user@spark.apache.org
Subject: Re: Join between DStream and Periodically-Changing-RDD

 

RDD's are immutable, why not join two DStreams? 

 

Not sure, but you can try something like this also:

 

kvDstream.foreachRDD(rdd => {

      

      val file = ssc.sparkContext.textFile("/sigmoid/")

      val kvFile = file.map(x => (x.split(",")(0), x))

      

      rdd.join(kvFile)

      

      

    })

 




Thanks

Best Regards

 

On Tue, Jun 9, 2015 at 7:37 PM, Ilove Data <data4...@gmail.com> wrote:

Hi,

 

I'm trying to join DStream with interval let say 20s, join with RDD loaded from 
HDFS folder which is changing periodically, let say new file is coming to the 
folder for every 10 minutes.

 

How should it be done, considering the HDFS files in the folder is periodically 
changing/adding new files? Do RDD automatically detect changes in HDFS folder 
as RDD source and automatically reload RDD?

 

Thanks!

Rendy

 

 

 

Reply via email to