Can you paste the code where you use sc.broadcast ? On Thu, Feb 18, 2016 at 5:32 PM Srikanth <srikanth...@gmail.com> wrote:
> Sebastian, > > I was able to broadcast using sql broadcast hint. Question is how to > prevent this broadcast for each RDD. > Is there a way where it can be broadcast once and used locally for each > RDD? > Right now every batch the metadata file is read and the DF is broadcasted. > I tried sc.broadcast and that did not provide this behavior. > > Srikanth > > > On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <sebastian....@gmail.com> > wrote: > >> You should be able to broadcast that data frame using sc.broadcast and >> join against it. >> >> On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote: >> >>> Hello, >>> >>> I have a streaming use case where I plan to keep a dataset broadcasted >>> and cached on each executor. >>> Every micro batch in streaming will create a DF out of the RDD and join >>> the batch. >>> The below code will perform the broadcast operation for each RDD. Is >>> there a way to broadcast it just once? >>> >>> Alternate approachs are also welcome. >>> >>> val DF1 = sqlContext.read.format("json").schema(schema1).load(file1) >>> >>> val metaDF = >>> sqlContext.read.format("json").schema(schema1).load(file2) >>> .join(DF1, "id") >>> metaDF.cache >>> >>> >>> val lines = streamingcontext.textFileStream(path) >>> >>> lines.foreachRDD( rdd => { >>> val recordDF = rdd.flatMap(r => Record(r)).toDF() >>> val joinedDF = recordDF.join(broadcast(metaDF), "id") >>> >>> joinedDF.rdd.foreachPartition ( partition => { >>> partition.foreach( row => { >>> ... >>> ... >>> }) >>> }) >>> }) >>> >>> streamingcontext.start >>> >>> On a similar note, if the metaDF is too big for broadcast, can I >>> partition it(df.repartition($"col")) and also partition each streaming RDD? >>> This way I can avoid shuffling metaDF each time. >>> >>> Let me know you thoughts. >>> >>> Thanks >>> >>> >