Hey Nathan, As the dataset is very huge, I am looking for ways that involve minimum joins. I will give a try to your approach. Thanks a lot for your help.
On Sat, Sep 14, 2019 at 12:58 AM Nathan Kronenfeld <nkronenfeld@uncharted.software> wrote: > It's a bit of a pain, but you could just use an outer join (assuming there > are no duplicates in the input datasets, of course): > > import org.apache.spark.sql.test.SharedSparkSession > import org.scalatest.FunSpec > > class QuestionSpec extends FunSpec with SharedSparkSession { > describe("spark list question") { > it("should join based on id with one row only per id, based on the > first dataset") { > import testImplicits._ > import org.apache.spark.sql.functions.when > > val ds1 = spark.createDataFrame(Seq( > QuestionRecord(0, "dataset 1 record 1"), > QuestionRecord(2, "dataset 1 record 2"), > QuestionRecord(4, "dataset 1 record 3"), > QuestionRecord(6, "dataset 1 record 4"), > QuestionRecord(8, "dataset 1 record 5") > )) > val ds2 = spark.createDataFrame(Seq( > QuestionRecord(0, "dataset 2 record 1"), > QuestionRecord(3, "dataset 2 record 2"), > QuestionRecord(6, "dataset 2 record 3"), > QuestionRecord(9, "dataset 2 record 4"), > QuestionRecord(12, "dataset 2 record 5") > )) > > val allColumns = ds1.columns > > // Merge the datasets > val ds3 = ds1.join(ds2, ds1("id") === ds2("id"), "outer") > > // Form new columns with the required value > val ds4 = allColumns.foldLeft(ds3) { case (ds, nextColName) => > ds.withColumn(s"new_$nextColName", when(ds1("id").isNotNull, > ds1(nextColName)).otherwise(ds2(nextColName))) > } > > // Drop old columns > val ds5 = allColumns.foldLeft(ds4) { case (ds, nextColumnName) => > ds.drop(ds1(nextColumnName)).drop(ds2(nextColumnName)) > }.drop("id") > > // And get rid of our new_ marker > val ds6 = allColumns.foldLeft(ds5) { case (ds, nextColumnName) => > ds.withColumnRenamed(s"new_$nextColumnName", nextColumnName) > } > > ds6.show() > } > } > } > > case class QuestionRecord (id: Int, payload: String) > > On Fri, Sep 13, 2019 at 11:43 AM Abhinesh Hada <abhinesh...@gmail.com> > wrote: > >> Hi, >> >> I am trying to take union of 2 dataframes and then drop duplicate based >> on the value of a specific column. But, I want to make sure that while >> dropping duplicates, the rows from first data frame are kept. >> >> Example: >> df1 = df1.union(df2).dropDuplicates(['id']) >> >> >>