Code with SQL broadcast hint. This worked and I was able to see that
broadcastjoin was performed.

val testDF = sqlContext.read.format("com.databricks.spark.csv")
               .schema(schema).load("file:///shared/data/test-data.txt")

val lines = ssc.socketTextStream("DevNode", 9999)

lines.foreachRDD((rdd, timestamp) => {
   val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
l(1))).toDF()
   val resultDF = recordDF.join(testDF, "Age")

 
resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
        }

But for every batch this file was read and broadcast was performed.
Evaluating the entire DAG I guess.
  16/02/18 12:24:02 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:27+28
    16/02/18 12:24:02 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:0+27

    16/02/18 12:25:00 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:27+28
    16/02/18 12:25:00 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:0+27


Then I changed code to broadcast the dataframe. This didn't work either.
Not sure if this is what you meant by broadcasting a dataframe.

val testDF = sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
                .schema(schema).load("file:///shared/data/test-data.txt")
             )

val lines = ssc.socketTextStream("DevNode", 9999)

lines.foreachRDD((rdd, timestamp) => {
    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
l(1))).toDF()
    val resultDF = recordDF.join(testDF.value, "Age")

 
resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
        }


On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <sebastian....@gmail.com>
wrote:

> Can you paste the code where you use sc.broadcast ?
>
> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <srikanth...@gmail.com> wrote:
>
>> Sebastian,
>>
>> I was able to broadcast using sql broadcast hint. Question is how to
>> prevent this broadcast for each RDD.
>> Is there a way where it can be broadcast once and used locally for each
>> RDD?
>> Right now every batch the metadata file is read and the DF is broadcasted.
>> I tried sc.broadcast and that did not provide this behavior.
>>
>> Srikanth
>>
>>
>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <sebastian....@gmail.com>
>> wrote:
>>
>>> You should be able to broadcast that data frame using sc.broadcast and
>>> join against it.
>>>
>>> On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have a streaming use case where I plan to keep a dataset broadcasted
>>>> and cached on each executor.
>>>> Every micro batch in streaming will create a DF out of the RDD and join
>>>> the batch.
>>>> The below code will perform the broadcast operation for each RDD. Is
>>>> there a way to broadcast it just once?
>>>>
>>>> Alternate approachs are also welcome.
>>>>
>>>>     val DF1 = sqlContext.read.format("json").schema(schema1).load(file1)
>>>>
>>>>     val metaDF =
>>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>>                               .join(DF1, "id")
>>>>     metaDF.cache
>>>>
>>>>
>>>>   val lines = streamingcontext.textFileStream(path)
>>>>
>>>>   lines.foreachRDD( rdd => {
>>>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>>
>>>>       joinedDF.rdd.foreachPartition ( partition => {
>>>>         partition.foreach( row => {
>>>>              ...
>>>>              ...
>>>>         })
>>>>       })
>>>>   })
>>>>
>>>>  streamingcontext.start
>>>>
>>>> On a similar note, if the metaDF is too big for broadcast, can I
>>>> partition it(df.repartition($"col")) and also partition each streaming RDD?
>>>> This way I can avoid shuffling metaDF each time.
>>>>
>>>> Let me know you thoughts.
>>>>
>>>> Thanks
>>>>
>>>>
>>

Reply via email to