I don't have the code with me now, and I ended moving everything to RDD in the end and using map operations to do some lookups, i.e. instead of broadcasting a Dataframe I ended broadcasting a Map
On Fri, Feb 19, 2016 at 11:39 AM Srikanth <srikanth...@gmail.com> wrote: > It didn't fail. It wasn't broadcasting. I just ran the test again and here > are the logs. > Every batch is reading the metadata file. > > 16/02/19 06:27:02 INFO HadoopRDD: Input split: > file:/shared/data/test-data.txt:0+27 > 16/02/19 06:27:02 INFO HadoopRDD: Input split: > file:/shared/data/test-data.txt:27+28 > > 16/02/19 06:27:40 INFO HadoopRDD: Input split: > file:/shared/data/test-data.txt:27+28 > 16/02/19 06:27:40 INFO HadoopRDD: Input split: > file:/shared/data/test-data.txt:0+27 > > If I remember, foreachRDD is executed in the driver's context. Not sure > how we'll be able to achieve broadcast in this approach(unless we use SQL > broadcast hint again) > > When you say "it worked before", was it with an older version of spark? > I'm trying this on 1.6. > If you still have the streaming job running can you verify in spark UI > that broadcast join is being used. Also, if the files are read and > broadcasted each batch?? > > Thanks for the help! > > > On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian....@gmail.com> > wrote: > >> I don't see anything obviously wrong on your second approach, I've done >> it like that before and it worked. When you say that it didn't work what do >> you mean? did it fail? it didnt broadcast? >> >> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote: >> >>> Code with SQL broadcast hint. This worked and I was able to see that >>> broadcastjoin was performed. >>> >>> val testDF = sqlContext.read.format("com.databricks.spark.csv") >>> .schema(schema).load("file:///shared/data/test-data.txt") >>> >>> val lines = ssc.socketTextStream("DevNode", 9999) >>> >>> lines.foreachRDD((rdd, timestamp) => { >>> val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, >>> l(1))).toDF() >>> val resultDF = recordDF.join(testDF, "Age") >>> >>> >>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) >>> } >>> >>> But for every batch this file was read and broadcast was performed. >>> Evaluating the entire DAG I guess. >>> 16/02/18 12:24:02 INFO HadoopRDD: Input split: >>> file:/shared/data/test-data.txt:27+28 >>> 16/02/18 12:24:02 INFO HadoopRDD: Input split: >>> file:/shared/data/test-data.txt:0+27 >>> >>> 16/02/18 12:25:00 INFO HadoopRDD: Input split: >>> file:/shared/data/test-data.txt:27+28 >>> 16/02/18 12:25:00 INFO HadoopRDD: Input split: >>> file:/shared/data/test-data.txt:0+27 >>> >>> >>> Then I changed code to broadcast the dataframe. This didn't work either. >>> Not sure if this is what you meant by broadcasting a dataframe. >>> >>> val testDF = >>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv") >>> >>> .schema(schema).load("file:///shared/data/test-data.txt") >>> ) >>> >>> val lines = ssc.socketTextStream("DevNode", 9999) >>> >>> lines.foreachRDD((rdd, timestamp) => { >>> val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, >>> l(1))).toDF() >>> val resultDF = recordDF.join(testDF.value, "Age") >>> >>> >>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) >>> } >>> >>> >>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <sebastian....@gmail.com >>> > wrote: >>> >>>> Can you paste the code where you use sc.broadcast ? >>>> >>>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <srikanth...@gmail.com> wrote: >>>> >>>>> Sebastian, >>>>> >>>>> I was able to broadcast using sql broadcast hint. Question is how to >>>>> prevent this broadcast for each RDD. >>>>> Is there a way where it can be broadcast once and used locally for >>>>> each RDD? >>>>> Right now every batch the metadata file is read and the DF is >>>>> broadcasted. >>>>> I tried sc.broadcast and that did not provide this behavior. >>>>> >>>>> Srikanth >>>>> >>>>> >>>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu < >>>>> sebastian....@gmail.com> wrote: >>>>> >>>>>> You should be able to broadcast that data frame using sc.broadcast >>>>>> and join against it. >>>>>> >>>>>> On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I have a streaming use case where I plan to keep a dataset >>>>>>> broadcasted and cached on each executor. >>>>>>> Every micro batch in streaming will create a DF out of the RDD and >>>>>>> join the batch. >>>>>>> The below code will perform the broadcast operation for each RDD. Is >>>>>>> there a way to broadcast it just once? >>>>>>> >>>>>>> Alternate approachs are also welcome. >>>>>>> >>>>>>> val DF1 = >>>>>>> sqlContext.read.format("json").schema(schema1).load(file1) >>>>>>> >>>>>>> val metaDF = >>>>>>> sqlContext.read.format("json").schema(schema1).load(file2) >>>>>>> .join(DF1, "id") >>>>>>> metaDF.cache >>>>>>> >>>>>>> >>>>>>> val lines = streamingcontext.textFileStream(path) >>>>>>> >>>>>>> lines.foreachRDD( rdd => { >>>>>>> val recordDF = rdd.flatMap(r => Record(r)).toDF() >>>>>>> val joinedDF = recordDF.join(broadcast(metaDF), "id") >>>>>>> >>>>>>> joinedDF.rdd.foreachPartition ( partition => { >>>>>>> partition.foreach( row => { >>>>>>> ... >>>>>>> ... >>>>>>> }) >>>>>>> }) >>>>>>> }) >>>>>>> >>>>>>> streamingcontext.start >>>>>>> >>>>>>> On a similar note, if the metaDF is too big for broadcast, can I >>>>>>> partition it(df.repartition($"col")) and also partition each streaming >>>>>>> RDD? >>>>>>> This way I can avoid shuffling metaDF each time. >>>>>>> >>>>>>> Let me know you thoughts. >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> >>>>> >>> >