Re: How to minimize shuffling on Spark dataframe Join?
If you create a PairRDD from the DataFrame, using dataFrame.toRDD().mapToPair(), then you can call partitionBy(someCustomPartitioner) which will partition the RDD by the key (of the pair). Then the operations on it (like joining with another RDD) will consider this partitioning. I'm not sure that DataFrames already support this. On Wed, Aug 12, 2015 at 11:16 AM Abdullah Anwar < abdullah.ibn.an...@gmail.com> wrote: > Hi Hemant, > > Thank you for your replay. > > I think source of my dataframe is not partitioned on key, its an avro > file where 'id' is a field .. but I don't know how to read a file and at > the same time configure partition key. I couldn't find anything on > SQLContext.read.load where you can set partition key. or in dataframe where > you can set partition key. If it could partition the on the specified key > .. will spark put the same partition range on same machine for two > different dataframe?? > >What are the overall tips to join faster? > > Best Regards, > Abdullah > > > > > On Wed, Aug 12, 2015 at 11:02 AM, Hemant Bhanawat > wrote: > >> Is the source of your dataframe partitioned on key? As per your mail, it >> looks like it is not. If that is the case, for partitioning the data, you >> will have to shuffle the data anyway. >> >> Another part of your question is - how to co-group data from two >> dataframes based on a key? I think for RDD's cogroup in PairRDDFunctions is >> a way. I am not sure if something similar is available for DataFrames. >> >> Hemant >> >> >> >> >> >> On Tue, Aug 11, 2015 at 2:14 PM, Abdullah Anwar < >> abdullah.ibn.an...@gmail.com> wrote: >> >>> >>> >>> I have two dataframes like this >>> >>> student_rdf = (studentid, name, ...) >>> student_result_rdf = (studentid, gpa, ...) >>> >>> we need to join this two dataframes. we are now doing like this, >>> >>> student_rdf.join(student_result_rdf, student_result_rdf["studentid"] == >>> student_rdf["studentid"]) >>> >>> So it is simple. But it creates lots of data shuffling across worker >>> nodes, but as joining key is similar and if the dataframe could (understand >>> the partitionkey) be partitioned using that key (studentid) then there >>> suppose not to be any shuffling at all. As similar data (based on partition >>> key) would reside in similar node. is it possible, to hint spark to do this? >>> >>> So, I am finding the way to partition data based on a column while I >>> read a dataframe from input. And If it is possible that Spark would >>> understand that two partitionkey of two dataframes are similar, then how? >>> >>> >>> >>> >>> -- >>> Abdullah >>> >> >> > > > -- > Abdullah >
Re: How to minimize shuffling on Spark dataframe Join?
Hi Hemant, Thank you for your replay. I think source of my dataframe is not partitioned on key, its an avro file where 'id' is a field .. but I don't know how to read a file and at the same time configure partition key. I couldn't find anything on SQLContext.read.load where you can set partition key. or in dataframe where you can set partition key. If it could partition the on the specified key .. will spark put the same partition range on same machine for two different dataframe?? What are the overall tips to join faster? Best Regards, Abdullah On Wed, Aug 12, 2015 at 11:02 AM, Hemant Bhanawat wrote: > Is the source of your dataframe partitioned on key? As per your mail, it > looks like it is not. If that is the case, for partitioning the data, you > will have to shuffle the data anyway. > > Another part of your question is - how to co-group data from two > dataframes based on a key? I think for RDD's cogroup in PairRDDFunctions is > a way. I am not sure if something similar is available for DataFrames. > > Hemant > > > > > > On Tue, Aug 11, 2015 at 2:14 PM, Abdullah Anwar < > abdullah.ibn.an...@gmail.com> wrote: > >> >> >> I have two dataframes like this >> >> student_rdf = (studentid, name, ...) >> student_result_rdf = (studentid, gpa, ...) >> >> we need to join this two dataframes. we are now doing like this, >> >> student_rdf.join(student_result_rdf, student_result_rdf["studentid"] == >> student_rdf["studentid"]) >> >> So it is simple. But it creates lots of data shuffling across worker >> nodes, but as joining key is similar and if the dataframe could (understand >> the partitionkey) be partitioned using that key (studentid) then there >> suppose not to be any shuffling at all. As similar data (based on partition >> key) would reside in similar node. is it possible, to hint spark to do this? >> >> So, I am finding the way to partition data based on a column while I read >> a dataframe from input. And If it is possible that Spark would understand >> that two partitionkey of two dataframes are similar, then how? >> >> >> >> >> -- >> Abdullah >> > > -- Abdullah
Re: How to minimize shuffling on Spark dataframe Join?
Is the source of your dataframe partitioned on key? As per your mail, it looks like it is not. If that is the case, for partitioning the data, you will have to shuffle the data anyway. Another part of your question is - how to co-group data from two dataframes based on a key? I think for RDD's cogroup in PairRDDFunctions is a way. I am not sure if something similar is available for DataFrames. Hemant On Tue, Aug 11, 2015 at 2:14 PM, Abdullah Anwar < abdullah.ibn.an...@gmail.com> wrote: > > > I have two dataframes like this > > student_rdf = (studentid, name, ...) > student_result_rdf = (studentid, gpa, ...) > > we need to join this two dataframes. we are now doing like this, > > student_rdf.join(student_result_rdf, student_result_rdf["studentid"] == > student_rdf["studentid"]) > > So it is simple. But it creates lots of data shuffling across worker > nodes, but as joining key is similar and if the dataframe could (understand > the partitionkey) be partitioned using that key (studentid) then there > suppose not to be any shuffling at all. As similar data (based on partition > key) would reside in similar node. is it possible, to hint spark to do this? > > So, I am finding the way to partition data based on a column while I read > a dataframe from input. And If it is possible that Spark would understand > that two partitionkey of two dataframes are similar, then how? > > > > > -- > Abdullah >
Fwd: How to minimize shuffling on Spark dataframe Join?
I have two dataframes like this student_rdf = (studentid, name, ...) student_result_rdf = (studentid, gpa, ...) we need to join this two dataframes. we are now doing like this, student_rdf.join(student_result_rdf, student_result_rdf["studentid"] == student_rdf["studentid"]) So it is simple. But it creates lots of data shuffling across worker nodes, but as joining key is similar and if the dataframe could (understand the partitionkey) be partitioned using that key (studentid) then there suppose not to be any shuffling at all. As similar data (based on partition key) would reside in similar node. is it possible, to hint spark to do this? So, I am finding the way to partition data based on a column while I read a dataframe from input. And If it is possible that Spark would understand that two partitionkey of two dataframes are similar, then how? -- Abdullah