Code with SQL broadcast hint. This worked and I was able to see that broadcastjoin was performed.
val testDF = sqlContext.read.format("com.databricks.spark.csv") .schema(schema).load("file:///shared/data/test-data.txt") val lines = ssc.socketTextStream("DevNode", 9999) lines.foreachRDD((rdd, timestamp) => { val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, l(1))).toDF() val resultDF = recordDF.join(testDF, "Age") resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) } But for every batch this file was read and broadcast was performed. Evaluating the entire DAG I guess. 16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28 16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27 16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28 16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27 Then I changed code to broadcast the dataframe. This didn't work either. Not sure if this is what you meant by broadcasting a dataframe. val testDF = sc.broadcast(sqlContext.read.format("com.databricks.spark.csv") .schema(schema).load("file:///shared/data/test-data.txt") ) val lines = ssc.socketTextStream("DevNode", 9999) lines.foreachRDD((rdd, timestamp) => { val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, l(1))).toDF() val resultDF = recordDF.join(testDF.value, "Age") resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) } On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <sebastian....@gmail.com> wrote: > Can you paste the code where you use sc.broadcast ? > > On Thu, Feb 18, 2016 at 5:32 PM Srikanth <srikanth...@gmail.com> wrote: > >> Sebastian, >> >> I was able to broadcast using sql broadcast hint. Question is how to >> prevent this broadcast for each RDD. >> Is there a way where it can be broadcast once and used locally for each >> RDD? >> Right now every batch the metadata file is read and the DF is broadcasted. >> I tried sc.broadcast and that did not provide this behavior. >> >> Srikanth >> >> >> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <sebastian....@gmail.com> >> wrote: >> >>> You should be able to broadcast that data frame using sc.broadcast and >>> join against it. >>> >>> On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote: >>> >>>> Hello, >>>> >>>> I have a streaming use case where I plan to keep a dataset broadcasted >>>> and cached on each executor. >>>> Every micro batch in streaming will create a DF out of the RDD and join >>>> the batch. >>>> The below code will perform the broadcast operation for each RDD. Is >>>> there a way to broadcast it just once? >>>> >>>> Alternate approachs are also welcome. >>>> >>>> val DF1 = sqlContext.read.format("json").schema(schema1).load(file1) >>>> >>>> val metaDF = >>>> sqlContext.read.format("json").schema(schema1).load(file2) >>>> .join(DF1, "id") >>>> metaDF.cache >>>> >>>> >>>> val lines = streamingcontext.textFileStream(path) >>>> >>>> lines.foreachRDD( rdd => { >>>> val recordDF = rdd.flatMap(r => Record(r)).toDF() >>>> val joinedDF = recordDF.join(broadcast(metaDF), "id") >>>> >>>> joinedDF.rdd.foreachPartition ( partition => { >>>> partition.foreach( row => { >>>> ... >>>> ... >>>> }) >>>> }) >>>> }) >>>> >>>> streamingcontext.start >>>> >>>> On a similar note, if the metaDF is too big for broadcast, can I >>>> partition it(df.repartition($"col")) and also partition each streaming RDD? >>>> This way I can avoid shuffling metaDF each time. >>>> >>>> Let me know you thoughts. >>>> >>>> Thanks >>>> >>>> >>