The spark warning about Row instead of Dict is not the culprit. The problem still persists after I use Row instead of Dict to generate the dataframe.
Here is the expain() output regarding the reassignment of df as Gourav suggests to run, They look the same except that the serial numbers following the columns are different(eg. ID#7273 vs. ID#7344). this is the output of df.explain() after df = df.join(df_t.select("ID"),["ID"]) == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274, kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273, 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort [ID#7303 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5) Filter (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303], functions=[finalmerge_count(distinct merge count#7314L) AS count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +- *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303, LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303, LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304], functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter isnotnull(ID#7303) +- *(3) Scan ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306] In comparison, this is the output of df1.explain() after df1 = df.join(df_t.select("ID"),["ID"])? == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345, kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7344, 200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort [ID#7374 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5) Filter (nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374], functions=[finalmerge_count(distinct merge count#7385L) AS count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +- *(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374, LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374, LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375], functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter isnotnull(ID#7374) +- *(3) Scan ExistingRDD[ID#7374,LABEL#7375,k#7376L,score#7377] Here is the code I run and the error I get in Spark 2.3.0. By looking at the error, the cause seems to be that spark doesn't look up the column by its name but by a serial number and the serial number somehow is messed up. import pyspark.sql.functions as F from pyspark.sql import Row df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2),Row(score=1.0,ID='abc',LABEL=False,k=3)]) df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk") #line B df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).filter(F.col("nL")>1) df = df.join(df_t.select("ID"),["ID"]) df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1") df = df.join(df_sw, ["ID","kk"]) This is the error: 'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in the operation: kk. Please check if the right attribute(s) are used.;;\nProject [ID#88, kk#96L, score#91, LABEL#89, cnt1#140L]\n+- Join Inner, ((ID#88 = ID#150) && (kk#96L = kk#144L))\n :- Project [ID#88, score#91, LABEL#89, kk#96L]\n : +- Join Inner, (ID#88 = ID#118)\n : :- Project [ID#88, score#91, LABEL#89, kk#96L]\n : : +- Project [ID#88, LABEL#89, k#90L AS kk#96L, score#91]\n : : +- LogicalRDD [ID#88, LABEL#89, k#90L, score#91], false\n : +- Project [ID#118]\n : +- Filter (nL#110L > cast(1 as bigint))\n : +- Aggregate [ID#118], [ID#118, count(distinct LABEL#119) AS nL#110L]\n : +- Project [ID#118, score#121, LABEL#119, kk#96L]\n : +- Project [ID#118, LABEL#119, k#120L AS kk#96L, score#121]\n : +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n +- Project [ID#150, kk#144L, count#134L AS cnt1#140L]\n +- Aggregate [ID#150, kk#144L], [ID#150, kk#144L, count(1) AS count#134L]\n +- Project [ID#150, score#153, LABEL#151, kk#144L]\n +- Join Inner, (ID#150 = ID#118)\n :- Project [ID#150, score#153, LABEL#151, kk#144L]\n : +- Project [ID#150, LABEL#151, k#152L AS kk#144L, score#153]\n : +- LogicalRDD [ID#150, LABEL#151, k#152L, score#153], false\n +- Project [ID#118]\n +- Filter (nL#110L > cast(1 as bigint))\n +- Aggregate [ID#118], [ID#118, count(distinct LABEL#119) AS nL#110L]\n +- !Project [ID#118, score#121, LABEL#119, kk#144L]\n +- Project [ID#118, LABEL#119, k#120L AS kk#96L, score#121]\n +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n' On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi, > > what I am curious about is the reassignment of df. > > Can you please look into the explain plan of df after the statement df = > df.join(df_t.select("ID"),["ID"])? And then compare with the explain plan > of df1 after the statement df1 = df.join(df_t.select("ID"),["ID"])? > > Its late here, but I am yet to go through this completely. But I think > that SPARK does throw a warning mentioning us to use Row instead of > Dictionary. > > It will be of help if you could kindly try using the below statement and > go through your used case once again (I am yet to go through all the lines): > > > > from pyspark.sql import Row > > df = spark.createDataFrame([Row(score = 1.0,ID="abc",LABEL=True,k=2), > Row(score = 1.0,ID="abc",LABEL=True,k=3)]) > > Regards, > Gourav Sengupta > > > On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gshy2...@gmail.com> wrote: > >> Hi Spark Users, >> The following code snippet has an "attribute missing" error while the >> attribute exists. This bug is triggered by a particular sequence of of >> "select", "groupby" and "join". Note that if I take away the "select" in >> #line B, the code runs without error. However, the "select" in #line B >> includes all columns in the dataframe and hence should not affect the >> final result. >> >> >> import pyspark.sql.functions as F >> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True, >> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}]) >> >> df = df.withColumnRenamed("k","kk")\ >> .select("ID","score","LABEL","kk") #line B >> >> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")). >> filter(F.col("nL")>1) >> df = df.join(df_t.select("ID"),["ID"]) >> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", >> "cnt1") >> df = df.join(df_sw, ["ID","kk"]) >> > >