Sure. These may be unrelated.

On Fri, Feb 19, 2016 at 10:39 AM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi guys,
>
> I also encounter broadcast dataframe issue not for steaming jobs but
> regular dataframe join. In my case, the executors died probably due to OOM
> which I don't think it should use that much memory. Anyway, I'm going to
> craft an example and send it here to see if it is a bug or something I've
> misunderstood.
>
> Best Regards,
>
> Jerry
>
> Sent from my iPhone
>
> On 19 Feb, 2016, at 10:20 am, Sebastian Piu <sebastian....@gmail.com>
> wrote:
>
> I don't have the code with me now, and I ended moving everything to RDD in
> the end and using map operations to do some lookups, i.e. instead of
> broadcasting a Dataframe I ended broadcasting a Map
>
>
> On Fri, Feb 19, 2016 at 11:39 AM Srikanth <srikanth...@gmail.com> wrote:
>
>> It didn't fail. It wasn't broadcasting. I just ran the test again and
>> here are the logs.
>> Every batch is reading the metadata file.
>>
>> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>>
>> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>>
>> If I remember, foreachRDD is executed in the driver's context. Not sure
>> how we'll be able to achieve broadcast in this approach(unless we use SQL
>> broadcast hint again)
>>
>> When you say "it worked before",  was it with an older version of spark?
>> I'm trying this on 1.6.
>> If you still have the streaming job running can you verify in spark UI
>> that broadcast join is being used. Also, if the files are read and
>> broadcasted each batch??
>>
>> Thanks for the help!
>>
>>
>> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian....@gmail.com>
>> wrote:
>>
>>> I don't see anything obviously wrong on your second approach, I've done
>>> it like that before and it worked. When you say that it didn't work what do
>>> you mean? did it fail? it didnt broadcast?
>>>
>>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote:
>>>
>>>> Code with SQL broadcast hint. This worked and I was able to see that
>>>> broadcastjoin was performed.
>>>>
>>>> val testDF = sqlContext.read.format("com.databricks.spark.csv")
>>>>
>>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>>
>>>> val lines = ssc.socketTextStream("DevNode", 9999)
>>>>
>>>> lines.foreachRDD((rdd, timestamp) => {
>>>>    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>>> l(1))).toDF()
>>>>    val resultDF = recordDF.join(testDF, "Age")
>>>>
>>>>  
>>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>>         }
>>>>
>>>> But for every batch this file was read and broadcast was performed.
>>>> Evaluating the entire DAG I guess.
>>>>   16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:27+28
>>>>     16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:0+27
>>>>
>>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:27+28
>>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:0+27
>>>>
>>>>
>>>> Then I changed code to broadcast the dataframe. This didn't work
>>>> either. Not sure if this is what you meant by broadcasting a dataframe.
>>>>
>>>> val testDF =
>>>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>>>>
>>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>>              )
>>>>
>>>> val lines = ssc.socketTextStream("DevNode", 9999)
>>>>
>>>> lines.foreachRDD((rdd, timestamp) => {
>>>>     val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>>> l(1))).toDF()
>>>>     val resultDF = recordDF.join(testDF.value, "Age")
>>>>
>>>>  
>>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>>         }
>>>>
>>>>
>>>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <
>>>> sebastian....@gmail.com> wrote:
>>>>
>>>>> Can you paste the code where you use sc.broadcast ?
>>>>>
>>>>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <srikanth...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Sebastian,
>>>>>>
>>>>>> I was able to broadcast using sql broadcast hint. Question is how to
>>>>>> prevent this broadcast for each RDD.
>>>>>> Is there a way where it can be broadcast once and used locally for
>>>>>> each RDD?
>>>>>> Right now every batch the metadata file is read and the DF is
>>>>>> broadcasted.
>>>>>> I tried sc.broadcast and that did not provide this behavior.
>>>>>>
>>>>>> Srikanth
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <
>>>>>> sebastian....@gmail.com> wrote:
>>>>>>
>>>>>>> You should be able to broadcast that data frame using sc.broadcast
>>>>>>> and join against it.
>>>>>>>
>>>>>>> On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I have a streaming use case where I plan to keep a dataset
>>>>>>>> broadcasted and cached on each executor.
>>>>>>>> Every micro batch in streaming will create a DF out of the RDD and
>>>>>>>> join the batch.
>>>>>>>> The below code will perform the broadcast operation for each RDD.
>>>>>>>> Is there a way to broadcast it just once?
>>>>>>>>
>>>>>>>> Alternate approachs are also welcome.
>>>>>>>>
>>>>>>>>     val DF1 =
>>>>>>>> sqlContext.read.format("json").schema(schema1).load(file1)
>>>>>>>>
>>>>>>>>     val metaDF =
>>>>>>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>>>>>>                               .join(DF1, "id")
>>>>>>>>     metaDF.cache
>>>>>>>>
>>>>>>>>
>>>>>>>>   val lines = streamingcontext.textFileStream(path)
>>>>>>>>
>>>>>>>>   lines.foreachRDD( rdd => {
>>>>>>>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>>>>>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>>>>>>
>>>>>>>>       joinedDF.rdd.foreachPartition ( partition => {
>>>>>>>>         partition.foreach( row => {
>>>>>>>>              ...
>>>>>>>>              ...
>>>>>>>>         })
>>>>>>>>       })
>>>>>>>>   })
>>>>>>>>
>>>>>>>>  streamingcontext.start
>>>>>>>>
>>>>>>>> On a similar note, if the metaDF is too big for broadcast, can I
>>>>>>>> partition it(df.repartition($"col")) and also partition each streaming 
>>>>>>>> RDD?
>>>>>>>> This way I can avoid shuffling metaDF each time.
>>>>>>>>
>>>>>>>> Let me know you thoughts.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>

Reply via email to