It didn't fail. It wasn't broadcasting. I just ran the test again and here
are the logs.
Every batch is reading the metadata file.

16/02/19 06:27:02 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:0+27
16/02/19 06:27:02 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:27+28

16/02/19 06:27:40 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:27+28
16/02/19 06:27:40 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:0+27

If I remember, foreachRDD is executed in the driver's context. Not sure how
we'll be able to achieve broadcast in this approach(unless we use SQL
broadcast hint again)

When you say "it worked before",  was it with an older version of spark?
I'm trying this on 1.6.
If you still have the streaming job running can you verify in spark UI that
broadcast join is being used. Also, if the files are read and broadcasted
each batch??

Thanks for the help!


On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian....@gmail.com>
wrote:

> I don't see anything obviously wrong on your second approach, I've done it
> like that before and it worked. When you say that it didn't work what do
> you mean? did it fail? it didnt broadcast?
>
> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote:
>
>> Code with SQL broadcast hint. This worked and I was able to see that
>> broadcastjoin was performed.
>>
>> val testDF = sqlContext.read.format("com.databricks.spark.csv")
>>                .schema(schema).load("file:///shared/data/test-data.txt")
>>
>> val lines = ssc.socketTextStream("DevNode", 9999)
>>
>> lines.foreachRDD((rdd, timestamp) => {
>>    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>> l(1))).toDF()
>>    val resultDF = recordDF.join(testDF, "Age")
>>
>>  
>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>         }
>>
>> But for every batch this file was read and broadcast was performed.
>> Evaluating the entire DAG I guess.
>>   16/02/18 12:24:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>>     16/02/18 12:24:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>>
>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>>
>>
>> Then I changed code to broadcast the dataframe. This didn't work either.
>> Not sure if this is what you meant by broadcasting a dataframe.
>>
>> val testDF =
>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>>                 .schema(schema).load("file:///shared/data/test-data.txt")
>>              )
>>
>> val lines = ssc.socketTextStream("DevNode", 9999)
>>
>> lines.foreachRDD((rdd, timestamp) => {
>>     val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>> l(1))).toDF()
>>     val resultDF = recordDF.join(testDF.value, "Age")
>>
>>  
>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>         }
>>
>>
>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <sebastian....@gmail.com>
>> wrote:
>>
>>> Can you paste the code where you use sc.broadcast ?
>>>
>>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <srikanth...@gmail.com> wrote:
>>>
>>>> Sebastian,
>>>>
>>>> I was able to broadcast using sql broadcast hint. Question is how to
>>>> prevent this broadcast for each RDD.
>>>> Is there a way where it can be broadcast once and used locally for each
>>>> RDD?
>>>> Right now every batch the metadata file is read and the DF is
>>>> broadcasted.
>>>> I tried sc.broadcast and that did not provide this behavior.
>>>>
>>>> Srikanth
>>>>
>>>>
>>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <sebastian....@gmail.com
>>>> > wrote:
>>>>
>>>>> You should be able to broadcast that data frame using sc.broadcast and
>>>>> join against it.
>>>>>
>>>>> On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have a streaming use case where I plan to keep a dataset
>>>>>> broadcasted and cached on each executor.
>>>>>> Every micro batch in streaming will create a DF out of the RDD and
>>>>>> join the batch.
>>>>>> The below code will perform the broadcast operation for each RDD. Is
>>>>>> there a way to broadcast it just once?
>>>>>>
>>>>>> Alternate approachs are also welcome.
>>>>>>
>>>>>>     val DF1 =
>>>>>> sqlContext.read.format("json").schema(schema1).load(file1)
>>>>>>
>>>>>>     val metaDF =
>>>>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>>>>                               .join(DF1, "id")
>>>>>>     metaDF.cache
>>>>>>
>>>>>>
>>>>>>   val lines = streamingcontext.textFileStream(path)
>>>>>>
>>>>>>   lines.foreachRDD( rdd => {
>>>>>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>>>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>>>>
>>>>>>       joinedDF.rdd.foreachPartition ( partition => {
>>>>>>         partition.foreach( row => {
>>>>>>              ...
>>>>>>              ...
>>>>>>         })
>>>>>>       })
>>>>>>   })
>>>>>>
>>>>>>  streamingcontext.start
>>>>>>
>>>>>> On a similar note, if the metaDF is too big for broadcast, can I
>>>>>> partition it(df.repartition($"col")) and also partition each streaming 
>>>>>> RDD?
>>>>>> This way I can avoid shuffling metaDF each time.
>>>>>>
>>>>>> Let me know you thoughts.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>
>>

Reply via email to