Here is the pretty print of the physical plan which reveals some details about what causes the bug (see the lines highlighted in bold): WithColumnRenamed() fails to update the dependency graph correctly:
'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in the operation: kk. Please check if the right attribute(s) are used Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L] +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L)) :- Project [ID#64, score#67, LABEL#65, kk#73L] : +- Join Inner, (ID#64 = ID#99) : :- Project [ID#64, score#67, LABEL#65, kk#73L] : : +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67] : : +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67] : +- Project [ID#99] : +- Filter (nL#90L > cast(1 as bigint)) : +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100) AS nL#90L] : +- Project [ID#99, score#102, LABEL#100, kk#73L] : +- Project [ID#99, LABEL#100, k#101L AS kk#73L, score#102] : +- LogicalRDD [ID#99, LABEL#100, k#101L, score#102] +- Project [ID#135, kk#128L, count#118L AS cnt1#123L] +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS count#118L] +- Project [ID#135, score#138, LABEL#136, kk#128L] +- Join Inner, (ID#135 = ID#99) :- Project [ID#135, score#138, LABEL#136, kk#128L] : +- *Project [ID#135, LABEL#136, k#137L AS kk#128L, score#138]* : +- LogicalRDD [ID#135, LABEL#136, k#137L, score#138] +- Project [ID#99] +- Filter (nL#90L > cast(1 as bigint)) +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100) AS nL#90L] +- *!Project [ID#99, score#102, LABEL#100, kk#128L]* +-* Project [ID#99, LABEL#100, k#101L AS kk#73L, score#102]* +- LogicalRDD [ID#99, LABEL#100, k#101L, score#102] Here is the code which generates the error: import pyspark.sql.functions as F from pyspark.sql import Row df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRenamed("k","kk").select("ID","score","LABEL","kk") df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).filter(F.col("nL")>1) df = df.join(df_t.select("ID"),["ID"]) df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1") df = df.join(df_sw, ["ID","kk"]) On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gshy2...@gmail.com> wrote: > The spark warning about Row instead of Dict is not the culprit. The > problem still persists after I use Row instead of Dict to generate the > dataframe. > > Here is the expain() output regarding the reassignment of df as Gourav > suggests to run, They look the same except that the serial numbers > following the columns are different(eg. ID#7273 vs. ID#7344). > > this is the output of df.explain() after df = df.join(df_t.select("ID"),[" > ID"]) > == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274, > kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort > [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273, > 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS > kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan > ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort [ID#7303 > ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5) Filter > (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303], > functions=[finalmerge_count(distinct merge count#7314L) AS > count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +- > *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct > LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303, > LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303, > LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304], > functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter > isnotnull(ID#7303) +- *(3) Scan > ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306] > > > In comparison, this is the output of df1.explain() after df1 = > df.join(df_t.select("ID"),["ID"])? > == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345, > kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort > [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7344, > 200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS > kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan > ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort [ID#7374 > ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5) Filter > (nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374], > functions=[finalmerge_count(distinct merge count#7385L) AS > count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +- > *(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct > LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374, > LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374, > LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375], > functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter > isnotnull(ID#7374) +- *(3) Scan ExistingRDD[ID#7374,LABEL# > 7375,k#7376L,score#7377] > > > Here is the code I run and the error I get in Spark 2.3.0. By looking at > the error, the cause seems to be that spark doesn't look up the column by > its name but by a serial number and the serial number somehow is messed > up. > > import pyspark.sql.functions as F > from pyspark.sql import Row > df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True, > k=2),Row(score=1.0,ID='abc',LABEL=False,k=3)]) > > df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk") > #line B > df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias(" > nL")).filter(F.col("nL")>1) > df = df.join(df_t.select("ID"),["ID"]) > df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1") > df = df.join(df_sw, ["ID","kk"]) > > This is the error: > 'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121 > in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s) > with the same name appear in the operation: kk. Please check if the right > attribute(s) are used.;;\nProject [ID#88, kk#96L, score#91, LABEL#89, > cnt1#140L]\n+- Join Inner, ((ID#88 = ID#150) && (kk#96L = kk#144L))\n :- > Project [ID#88, score#91, LABEL#89, kk#96L]\n : +- Join Inner, (ID#88 = > ID#118)\n : :- Project [ID#88, score#91, LABEL#89, kk#96L]\n : : +- Project > [ID#88, LABEL#89, k#90L AS kk#96L, score#91]\n : : +- LogicalRDD [ID#88, > LABEL#89, k#90L, score#91], false\n : +- Project [ID#118]\n : +- Filter > (nL#110L > cast(1 as bigint))\n : +- Aggregate [ID#118], [ID#118, > count(distinct LABEL#119) AS nL#110L]\n : +- Project [ID#118, score#121, > LABEL#119, kk#96L]\n : +- Project [ID#118, LABEL#119, k#120L AS kk#96L, > score#121]\n : +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], > false\n +- Project [ID#150, kk#144L, count#134L AS cnt1#140L]\n +- > Aggregate [ID#150, kk#144L], [ID#150, kk#144L, count(1) AS count#134L]\n +- > Project [ID#150, score#153, LABEL#151, kk#144L]\n +- Join Inner, (ID#150 = > ID#118)\n :- Project [ID#150, score#153, LABEL#151, kk#144L]\n : +- Project > [ID#150, LABEL#151, k#152L AS kk#144L, score#153]\n : +- LogicalRDD > [ID#150, LABEL#151, k#152L, score#153], false\n +- Project [ID#118]\n +- > Filter (nL#110L > cast(1 as bigint))\n +- Aggregate [ID#118], [ID#118, > count(distinct LABEL#119) AS nL#110L]\n +- !Project [ID#118, score#121, > LABEL#119, kk#144L]\n +- Project [ID#118, LABEL#119, k#120L AS kk#96L, > score#121]\n +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n' > > > > > On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta <gourav.sengu...@gmail.com > > wrote: > >> Hi, >> >> what I am curious about is the reassignment of df. >> >> Can you please look into the explain plan of df after the statement df = >> df.join(df_t.select("ID"),["ID"])? And then compare with the explain >> plan of df1 after the statement df1 = df.join(df_t.select("ID"),["ID"])? >> >> Its late here, but I am yet to go through this completely. But I think >> that SPARK does throw a warning mentioning us to use Row instead of >> Dictionary. >> >> It will be of help if you could kindly try using the below statement and >> go through your used case once again (I am yet to go through all the lines): >> >> >> >> from pyspark.sql import Row >> >> df = spark.createDataFrame([Row(score = 1.0,ID="abc",LABEL=True,k=2), >> Row(score = 1.0,ID="abc",LABEL=True,k=3)]) >> >> Regards, >> Gourav Sengupta >> >> >> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gshy2...@gmail.com> wrote: >> >>> Hi Spark Users, >>> The following code snippet has an "attribute missing" error while >>> the attribute exists. This bug is triggered by a particular sequence of >>> of "select", "groupby" and "join". Note that if I take away the "select" >>> in #line B, the code runs without error. However, the "select" in #line >>> B includes all columns in the dataframe and hence should not affect the >>> final result. >>> >>> >>> import pyspark.sql.functions as F >>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True, >>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}]) >>> >>> df = df.withColumnRenamed("k","kk")\ >>> .select("ID","score","LABEL","kk") #line B >>> >>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f >>> ilter(F.col("nL")>1) >>> df = df.join(df_t.select("ID"),["ID"]) >>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", >>> "cnt1") >>> df = df.join(df_sw, ["ID","kk"]) >>> >> >> >