I don't have the code with me now, and I ended moving everything to RDD in
the end and using map operations to do some lookups, i.e. instead of
broadcasting a Dataframe I ended broadcasting a Map

On Fri, Feb 19, 2016 at 11:39 AM Srikanth <srikanth...@gmail.com> wrote:

> It didn't fail. It wasn't broadcasting. I just ran the test again and here
> are the logs.
> Every batch is reading the metadata file.
> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
> file:/shared/data/test-data.txt:0+27
> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
> file:/shared/data/test-data.txt:27+28
> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
> file:/shared/data/test-data.txt:27+28
> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
> file:/shared/data/test-data.txt:0+27
> If I remember, foreachRDD is executed in the driver's context. Not sure
> how we'll be able to achieve broadcast in this approach(unless we use SQL
> broadcast hint again)
> When you say "it worked before",  was it with an older version of spark?
> I'm trying this on 1.6.
> If you still have the streaming job running can you verify in spark UI
> that broadcast join is being used. Also, if the files are read and
> broadcasted each batch??
> Thanks for the help!
> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian....@gmail.com>
> wrote:
>> I don't see anything obviously wrong on your second approach, I've done
>> it like that before and it worked. When you say that it didn't work what do
>> you mean? did it fail? it didnt broadcast?
>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote:
>>> Code with SQL broadcast hint. This worked and I was able to see that
>>> broadcastjoin was performed.
>>> val testDF = sqlContext.read.format("com.databricks.spark.csv")
>>>                .schema(schema).load("file:///shared/data/test-data.txt")
>>> val lines = ssc.socketTextStream("DevNode", 9999)
>>> lines.foreachRDD((rdd, timestamp) => {
>>>    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>> l(1))).toDF()
>>>    val resultDF = recordDF.join(testDF, "Age")
>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>         }
>>> But for every batch this file was read and broadcast was performed.
>>> Evaluating the entire DAG I guess.
>>>   16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>> file:/shared/data/test-data.txt:27+28
>>>     16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>> file:/shared/data/test-data.txt:0+27
>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>> file:/shared/data/test-data.txt:27+28
>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>> file:/shared/data/test-data.txt:0+27
>>> Then I changed code to broadcast the dataframe. This didn't work either.
>>> Not sure if this is what you meant by broadcasting a dataframe.
>>> val testDF =
>>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>              )
>>> val lines = ssc.socketTextStream("DevNode", 9999)
>>> lines.foreachRDD((rdd, timestamp) => {
>>>     val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>> l(1))).toDF()
>>>     val resultDF = recordDF.join(testDF.value, "Age")
>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>         }
>>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <sebastian....@gmail.com
>>> > wrote:
>>>> Can you paste the code where you use sc.broadcast ?
>>>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <srikanth...@gmail.com> wrote:
>>>>> Sebastian,
>>>>> I was able to broadcast using sql broadcast hint. Question is how to
>>>>> prevent this broadcast for each RDD.
>>>>> Is there a way where it can be broadcast once and used locally for
>>>>> each RDD?
>>>>> Right now every batch the metadata file is read and the DF is
>>>>> broadcasted.
>>>>> I tried sc.broadcast and that did not provide this behavior.
>>>>> Srikanth
>>>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <
>>>>> sebastian....@gmail.com> wrote:
>>>>>> You should be able to broadcast that data frame using sc.broadcast
>>>>>> and join against it.
>>>>>> On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote:
>>>>>>> Hello,
>>>>>>> I have a streaming use case where I plan to keep a dataset
>>>>>>> broadcasted and cached on each executor.
>>>>>>> Every micro batch in streaming will create a DF out of the RDD and
>>>>>>> join the batch.
>>>>>>> The below code will perform the broadcast operation for each RDD. Is
>>>>>>> there a way to broadcast it just once?
>>>>>>> Alternate approachs are also welcome.
>>>>>>>     val DF1 =
>>>>>>> sqlContext.read.format("json").schema(schema1).load(file1)
>>>>>>>     val metaDF =
>>>>>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>>>>>                               .join(DF1, "id")
>>>>>>>     metaDF.cache
>>>>>>>   val lines = streamingcontext.textFileStream(path)
>>>>>>>   lines.foreachRDD( rdd => {
>>>>>>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>>>>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>>>>>       joinedDF.rdd.foreachPartition ( partition => {
>>>>>>>         partition.foreach( row => {
>>>>>>>              ...
>>>>>>>              ...
>>>>>>>         })
>>>>>>>       })
>>>>>>>   })
>>>>>>>  streamingcontext.start
>>>>>>> On a similar note, if the metaDF is too big for broadcast, can I
>>>>>>> partition it(df.repartition($"col")) and also partition each streaming 
>>>>>>> RDD?
>>>>>>> This way I can avoid shuffling metaDF each time.
>>>>>>> Let me know you thoughts.
>>>>>>> Thanks

Reply via email to