Hi, I have two input datasets First input dataset like as below : year,make,model,comment,blank > "2012","Tesla","S","No comment", > 1997,Ford,E350,"Go get one now they are going fast", > 2015,Chevy,Volt
Second Input dataset : TagId,condition > 1997_cars,year = 1997 and model = 'E350' > 2012_cars,year=2012 and model ='S' > 2015_cars ,year=2015 and model = 'Volt' Now my requirement is read first data set and based on the filtering condition in second dataset need to tag rows of first input dataset by introducing a new column TagId to first input data set so the expected should look like : year,make,model,comment,blank,TagId > "2012","Tesla","S","No comment",2012_cars > 1997,Ford,E350,"Go get one now they are going fast",1997_cars > 2015,Chevy,Volt, ,2015_cars I tried like : val sqlContext = new SQLContext(sc) > val carsSchema = StructType(Seq( > StructField("year", IntegerType, true), > StructField("make", StringType, true), > StructField("model", StringType, true), > StructField("comment", StringType, true), > StructField("blank", StringType, true))) > > val carTagsSchema = StructType(Seq( > StructField("TagId", StringType, true), > StructField("condition", StringType, true))) > > > val dfcars = > sqlContext.read.format("com.databricks.spark.csv").option("header", "true") > .schema(carsSchema).load("/TestDivya/Spark/cars.csv") > val dftags = > sqlContext.read.format("com.databricks.spark.csv").option("header", "true") > .schema(carTagsSchema).load("/TestDivya/Spark/CarTags.csv") > > val Amendeddf = dfcars.withColumn("TagId", dfcars("blank")) > val cdtnval = dftags.select("condition") > val df2=dfcars.filter(cdtnval) > <console>:35: error: overloaded method value filter with alternatives: > (conditionExpr: String)org.apache.spark.sql.DataFrame <and> > (condition: > org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame > cannot be applied to (org.apache.spark.sql.DataFrame) > val df2=dfcars.filter(cdtnval) another way : val col = dftags.col("TagId") > val finaldf = dfcars.withColumn("TagId", col) > org.apache.spark.sql.AnalysisException: resolved attribute(s) TagId#5 > missing from comment#3,blank#4,model#2,make#1,year#0 in operator !Project > [year#0,make#1,model#2,comment#3,blank#4,TagId#5 AS TagId#8]; > > finaldf.write.format("com.databricks.spark.csv").option("header", > "true").save("/TestDivya/Spark/carswithtags.csv") Would really appreciate if somebody give me pointers how can I pass the filter condition(second dataframe) to filter function of first dataframe. Or another solution . My apppologies for such a naive question as I am new to scala and Spark Thanks