Hi guys,

I also encounter broadcast dataframe issue not for steaming jobs but regular 
dataframe join. In my case, the executors died probably due to OOM which I 
don't think it should use that much memory. Anyway, I'm going to craft an 
example and send it here to see if it is a bug or something I've misunderstood.

Best Regards,

Jerry

Sent from my iPhone

> On 19 Feb, 2016, at 10:20 am, Sebastian Piu <sebastian....@gmail.com> wrote:
> 
> I don't have the code with me now, and I ended moving everything to RDD in 
> the end and using map operations to do some lookups, i.e. instead of 
> broadcasting a Dataframe I ended broadcasting a Map 
> 
> 
>> On Fri, Feb 19, 2016 at 11:39 AM Srikanth <srikanth...@gmail.com> wrote:
>> It didn't fail. It wasn't broadcasting. I just ran the test again and here 
>> are the logs.
>> Every batch is reading the metadata file.
>> 
>>      16/02/19 06:27:02 INFO HadoopRDD: Input split: 
>> file:/shared/data/test-data.txt:0+27
>>      16/02/19 06:27:02 INFO HadoopRDD: Input split: 
>> file:/shared/data/test-data.txt:27+28
>> 
>>      16/02/19 06:27:40 INFO HadoopRDD: Input split: 
>> file:/shared/data/test-data.txt:27+28
>>      16/02/19 06:27:40 INFO HadoopRDD: Input split: 
>> file:/shared/data/test-data.txt:0+27
>> 
>> If I remember, foreachRDD is executed in the driver's context. Not sure how 
>> we'll be able to achieve broadcast in this approach(unless we use SQL 
>> broadcast hint again)
>> 
>> When you say "it worked before",  was it with an older version of spark? I'm 
>> trying this on 1.6.
>> If you still have the streaming job running can you verify in spark UI that 
>> broadcast join is being used. Also, if the files are read and broadcasted 
>> each batch??
>> 
>> Thanks for the help!
>> 
>> 
>>> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian....@gmail.com> 
>>> wrote:
>>> I don't see anything obviously wrong on your second approach, I've done it 
>>> like that before and it worked. When you say that it didn't work what do 
>>> you mean? did it fail? it didnt broadcast? 
>>> 
>>>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote:
>>>> Code with SQL broadcast hint. This worked and I was able to see that 
>>>> broadcastjoin was performed.
>>>> 
>>>>    val testDF = sqlContext.read.format("com.databricks.spark.csv")
>>>>                    
>>>> .schema(schema).load("file:///shared/data/test-data.txt") 
>>>> 
>>>>    val lines = ssc.socketTextStream("DevNode", 9999)
>>>> 
>>>>    lines.foreachRDD((rdd, timestamp) => {
>>>>        val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, 
>>>> l(1))).toDF()
>>>>        val resultDF = recordDF.join(testDF, "Age")
>>>>        
>>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>>         }
>>>> 
>>>> But for every batch this file was read and broadcast was performed. 
>>>> Evaluating the entire DAG I guess.
>>>>     16/02/18 12:24:02 INFO HadoopRDD: Input split: 
>>>> file:/shared/data/test-data.txt:27+28
>>>>     16/02/18 12:24:02 INFO HadoopRDD: Input split: 
>>>> file:/shared/data/test-data.txt:0+27
>>>> 
>>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split: 
>>>> file:/shared/data/test-data.txt:27+28
>>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split: 
>>>> file:/shared/data/test-data.txt:0+27
>>>> 
>>>> 
>>>> Then I changed code to broadcast the dataframe. This didn't work either. 
>>>> Not sure if this is what you meant by broadcasting a dataframe.
>>>> 
>>>>    val testDF = 
>>>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>>>>                    
>>>> .schema(schema).load("file:///shared/data/test-data.txt") 
>>>>                 )
>>>> 
>>>>    val lines = ssc.socketTextStream("DevNode", 9999)
>>>> 
>>>>    lines.foreachRDD((rdd, timestamp) => {
>>>>        val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, 
>>>> l(1))).toDF()
>>>>        val resultDF = recordDF.join(testDF.value, "Age")
>>>>        
>>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>>         }
>>>> 
>>>> 
>>>>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <sebastian....@gmail.com> 
>>>>> wrote:
>>>>> Can you paste the code where you use sc.broadcast ?
>>>>> 
>>>>>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <srikanth...@gmail.com> wrote:
>>>>>> Sebastian,
>>>>>> 
>>>>>> I was able to broadcast using sql broadcast hint. Question is how to 
>>>>>> prevent this broadcast for each RDD.
>>>>>> Is there a way where it can be broadcast once and used locally for each 
>>>>>> RDD?
>>>>>> Right now every batch the metadata file is read and the DF is 
>>>>>> broadcasted.
>>>>>> I tried sc.broadcast and that did not provide this behavior.
>>>>>> 
>>>>>> Srikanth
>>>>>> 
>>>>>> 
>>>>>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu 
>>>>>>> <sebastian....@gmail.com> wrote:
>>>>>>> You should be able to broadcast that data frame using sc.broadcast and 
>>>>>>> join against it.
>>>>>>> 
>>>>>>>> On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote:
>>>>>>>> Hello,
>>>>>>>> 
>>>>>>>> I have a streaming use case where I plan to keep a dataset broadcasted 
>>>>>>>> and cached on each executor.
>>>>>>>> Every micro batch in streaming will create a DF out of the RDD and 
>>>>>>>> join the batch.
>>>>>>>> The below code will perform the broadcast operation for each RDD. Is 
>>>>>>>> there a way to broadcast it just once?
>>>>>>>> 
>>>>>>>> Alternate approachs are also welcome.
>>>>>>>> 
>>>>>>>>     val DF1 = 
>>>>>>>> sqlContext.read.format("json").schema(schema1).load(file1)
>>>>>>>> 
>>>>>>>>     val metaDF = 
>>>>>>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>>>>>>                               .join(DF1, "id")
>>>>>>>>     metaDF.cache
>>>>>>>> 
>>>>>>>> 
>>>>>>>>     val lines = streamingcontext.textFileStream(path)
>>>>>>>> 
>>>>>>>>     lines.foreachRDD( rdd => {
>>>>>>>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>>>>>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>>>>>> 
>>>>>>>>       joinedDF.rdd.foreachPartition ( partition => {
>>>>>>>>         partition.foreach( row => {
>>>>>>>>              ...
>>>>>>>>              ...
>>>>>>>>         })
>>>>>>>>       })
>>>>>>>>     })
>>>>>>>> 
>>>>>>>>     streamingcontext.start
>>>>>>>> 
>>>>>>>> On a similar note, if the metaDF is too big for broadcast, can I 
>>>>>>>> partition it(df.repartition($"col")) and also partition each streaming 
>>>>>>>> RDD?
>>>>>>>> This way I can avoid shuffling metaDF each time.
>>>>>>>> 
>>>>>>>> Let me know you thoughts.
>>>>>>>> 
>>>>>>>> Thanks

Reply via email to