Sebastian,

I was able to broadcast using sql broadcast hint. Question is how to
prevent this broadcast for each RDD.
Is there a way where it can be broadcast once and used locally for each RDD?
Right now every batch the metadata file is read and the DF is broadcasted.
I tried sc.broadcast and that did not provide this behavior.

Srikanth


On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <sebastian....@gmail.com>
wrote:

> You should be able to broadcast that data frame using sc.broadcast and
> join against it.
>
> On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a streaming use case where I plan to keep a dataset broadcasted
>> and cached on each executor.
>> Every micro batch in streaming will create a DF out of the RDD and join
>> the batch.
>> The below code will perform the broadcast operation for each RDD. Is
>> there a way to broadcast it just once?
>>
>> Alternate approachs are also welcome.
>>
>>     val DF1 = sqlContext.read.format("json").schema(schema1).load(file1)
>>
>>     val metaDF =
>> sqlContext.read.format("json").schema(schema1).load(file2)
>>                               .join(DF1, "id")
>>     metaDF.cache
>>
>>
>>   val lines = streamingcontext.textFileStream(path)
>>
>>   lines.foreachRDD( rdd => {
>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>
>>       joinedDF.rdd.foreachPartition ( partition => {
>>         partition.foreach( row => {
>>              ...
>>              ...
>>         })
>>       })
>>   })
>>
>>  streamingcontext.start
>>
>> On a similar note, if the metaDF is too big for broadcast, can I
>> partition it(df.repartition($"col")) and also partition each streaming RDD?
>> This way I can avoid shuffling metaDF each time.
>>
>> Let me know you thoughts.
>>
>> Thanks
>>
>>

Reply via email to