Sebastian, I was able to broadcast using sql broadcast hint. Question is how to prevent this broadcast for each RDD. Is there a way where it can be broadcast once and used locally for each RDD? Right now every batch the metadata file is read and the DF is broadcasted. I tried sc.broadcast and that did not provide this behavior.
Srikanth On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <sebastian....@gmail.com> wrote: > You should be able to broadcast that data frame using sc.broadcast and > join against it. > > On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote: > >> Hello, >> >> I have a streaming use case where I plan to keep a dataset broadcasted >> and cached on each executor. >> Every micro batch in streaming will create a DF out of the RDD and join >> the batch. >> The below code will perform the broadcast operation for each RDD. Is >> there a way to broadcast it just once? >> >> Alternate approachs are also welcome. >> >> val DF1 = sqlContext.read.format("json").schema(schema1).load(file1) >> >> val metaDF = >> sqlContext.read.format("json").schema(schema1).load(file2) >> .join(DF1, "id") >> metaDF.cache >> >> >> val lines = streamingcontext.textFileStream(path) >> >> lines.foreachRDD( rdd => { >> val recordDF = rdd.flatMap(r => Record(r)).toDF() >> val joinedDF = recordDF.join(broadcast(metaDF), "id") >> >> joinedDF.rdd.foreachPartition ( partition => { >> partition.foreach( row => { >> ... >> ... >> }) >> }) >> }) >> >> streamingcontext.start >> >> On a similar note, if the metaDF is too big for broadcast, can I >> partition it(df.repartition($"col")) and also partition each streaming RDD? >> This way I can avoid shuffling metaDF each time. >> >> Let me know you thoughts. >> >> Thanks >> >>