I don't have the code with me now, and I ended moving everything to RDD in
the end and using map operations to do some lookups, i.e. instead of
broadcasting a Dataframe I ended broadcasting a Map


On Fri, Feb 19, 2016 at 11:39 AM Srikanth <srikanth...@gmail.com> wrote:

> It didn't fail. It wasn't broadcasting. I just ran the test again and here
> are the logs.
> Every batch is reading the metadata file.
>
> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
> file:/shared/data/test-data.txt:0+27
> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
> file:/shared/data/test-data.txt:27+28
>
> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
> file:/shared/data/test-data.txt:27+28
> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
> file:/shared/data/test-data.txt:0+27
>
> If I remember, foreachRDD is executed in the driver's context. Not sure
> how we'll be able to achieve broadcast in this approach(unless we use SQL
> broadcast hint again)
>
> When you say "it worked before",  was it with an older version of spark?
> I'm trying this on 1.6.
> If you still have the streaming job running can you verify in spark UI
> that broadcast join is being used. Also, if the files are read and
> broadcasted each batch??
>
> Thanks for the help!
>
>
> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian....@gmail.com>
> wrote:
>
>> I don't see anything obviously wrong on your second approach, I've done
>> it like that before and it worked. When you say that it didn't work what do
>> you mean? did it fail? it didnt broadcast?
>>
>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote:
>>
>>> Code with SQL broadcast hint. This worked and I was able to see that
>>> broadcastjoin was performed.
>>>
>>> val testDF = sqlContext.read.format("com.databricks.spark.csv")
>>>                .schema(schema).load("file:///shared/data/test-data.txt")
>>>
>>> val lines = ssc.socketTextStream("DevNode", 9999)
>>>
>>> lines.foreachRDD((rdd, timestamp) => {
>>>    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>> l(1))).toDF()
>>>    val resultDF = recordDF.join(testDF, "Age")
>>>
>>>  
>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>         }
>>>
>>> But for every batch this file was read and broadcast was performed.
>>> Evaluating the entire DAG I guess.
>>>   16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>> file:/shared/data/test-data.txt:27+28
>>>     16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>> file:/shared/data/test-data.txt:0+27
>>>
>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>> file:/shared/data/test-data.txt:27+28
>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>> file:/shared/data/test-data.txt:0+27
>>>
>>>
>>> Then I changed code to broadcast the dataframe. This didn't work either.
>>> Not sure if this is what you meant by broadcasting a dataframe.
>>>
>>> val testDF =
>>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>>>
>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>              )
>>>
>>> val lines = ssc.socketTextStream("DevNode", 9999)
>>>
>>> lines.foreachRDD((rdd, timestamp) => {
>>>     val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>> l(1))).toDF()
>>>     val resultDF = recordDF.join(testDF.value, "Age")
>>>
>>>  
>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>         }
>>>
>>>
>>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <sebastian....@gmail.com
>>> > wrote:
>>>
>>>> Can you paste the code where you use sc.broadcast ?
>>>>
>>>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <srikanth...@gmail.com> wrote:
>>>>
>>>>> Sebastian,
>>>>>
>>>>> I was able to broadcast using sql broadcast hint. Question is how to
>>>>> prevent this broadcast for each RDD.
>>>>> Is there a way where it can be broadcast once and used locally for
>>>>> each RDD?
>>>>> Right now every batch the metadata file is read and the DF is
>>>>> broadcasted.
>>>>> I tried sc.broadcast and that did not provide this behavior.
>>>>>
>>>>> Srikanth
>>>>>
>>>>>
>>>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <
>>>>> sebastian....@gmail.com> wrote:
>>>>>
>>>>>> You should be able to broadcast that data frame using sc.broadcast
>>>>>> and join against it.
>>>>>>
>>>>>> On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I have a streaming use case where I plan to keep a dataset
>>>>>>> broadcasted and cached on each executor.
>>>>>>> Every micro batch in streaming will create a DF out of the RDD and
>>>>>>> join the batch.
>>>>>>> The below code will perform the broadcast operation for each RDD. Is
>>>>>>> there a way to broadcast it just once?
>>>>>>>
>>>>>>> Alternate approachs are also welcome.
>>>>>>>
>>>>>>>     val DF1 =
>>>>>>> sqlContext.read.format("json").schema(schema1).load(file1)
>>>>>>>
>>>>>>>     val metaDF =
>>>>>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>>>>>                               .join(DF1, "id")
>>>>>>>     metaDF.cache
>>>>>>>
>>>>>>>
>>>>>>>   val lines = streamingcontext.textFileStream(path)
>>>>>>>
>>>>>>>   lines.foreachRDD( rdd => {
>>>>>>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>>>>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>>>>>
>>>>>>>       joinedDF.rdd.foreachPartition ( partition => {
>>>>>>>         partition.foreach( row => {
>>>>>>>              ...
>>>>>>>              ...
>>>>>>>         })
>>>>>>>       })
>>>>>>>   })
>>>>>>>
>>>>>>>  streamingcontext.start
>>>>>>>
>>>>>>> On a similar note, if the metaDF is too big for broadcast, can I
>>>>>>> partition it(df.repartition($"col")) and also partition each streaming 
>>>>>>> RDD?
>>>>>>> This way I can avoid shuffling metaDF each time.
>>>>>>>
>>>>>>> Let me know you thoughts.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>
>>>
>

Reply via email to