Hi Shiyuan, I do not know whether I am right, but I would prefer to avoid expressions in Spark as:
df = <<some transformation on df>> Regards, Gourav Sengupta On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan <gshy2...@gmail.com> wrote: > Here is the pretty print of the physical plan which reveals some details > about what causes the bug (see the lines highlighted in bold): > WithColumnRenamed() fails to update the dependency graph correctly: > > > 'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121 > in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s) > with the same name appear in the operation: kk. Please check if the right > attribute(s) are used > > Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L] > +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L)) > :- Project [ID#64, score#67, LABEL#65, kk#73L] > : +- Join Inner, (ID#64 = ID#99) > : :- Project [ID#64, score#67, LABEL#65, kk#73L] > : : +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67] > : : +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67] > : +- Project [ID#99] > : +- Filter (nL#90L > cast(1 as bigint)) > : +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100) AS > nL#90L] > : +- Project [ID#99, score#102, LABEL#100, kk#73L] > : +- Project [ID#99, LABEL#100, k#101L AS kk#73L, > score#102] > : +- LogicalRDD [ID#99, LABEL#100, k#101L, score#102] > +- Project [ID#135, kk#128L, count#118L AS cnt1#123L] > +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS > count#118L] > +- Project [ID#135, score#138, LABEL#136, kk#128L] > +- Join Inner, (ID#135 = ID#99) > :- Project [ID#135, score#138, LABEL#136, kk#128L] > : +- *Project [ID#135, LABEL#136, k#137L AS kk#128L, > score#138]* > : +- LogicalRDD [ID#135, LABEL#136, k#137L, score#138] > +- Project [ID#99] > +- Filter (nL#90L > cast(1 as bigint)) > +- Aggregate [ID#99], [ID#99, count(distinct > LABEL#100) AS nL#90L] > +- *!Project [ID#99, score#102, LABEL#100, > kk#128L]* > +-* Project [ID#99, LABEL#100, k#101L AS > kk#73L, score#102]* > +- LogicalRDD [ID#99, LABEL#100, k#101L, > score#102] > > Here is the code which generates the error: > > import pyspark.sql.functions as F > from pyspark.sql import Row > df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True, > k=2),Row(score=1.0,ID='abc',LABEL=False,k=3)]). > withColumnRenamed("k","kk").select("ID","score","LABEL","kk") > df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias(" > nL")).filter(F.col("nL")>1) > df = df.join(df_t.select("ID"),["ID"]) > df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1") > df = df.join(df_sw, ["ID","kk"]) > > > On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gshy2...@gmail.com> wrote: > >> The spark warning about Row instead of Dict is not the culprit. The >> problem still persists after I use Row instead of Dict to generate the >> dataframe. >> >> Here is the expain() output regarding the reassignment of df as Gourav >> suggests to run, They look the same except that the serial numbers >> following the columns are different(eg. ID#7273 vs. ID#7344). >> >> this is the output of df.explain() after df = >> df.join(df_t.select("ID"),["ID"]) >> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274, >> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort >> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273, >> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS >> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan >> ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort [ID#7303 >> ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5) Filter >> (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303], >> functions=[finalmerge_count(distinct merge count#7314L) AS >> count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +- >> *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct >> LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303, >> LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303, >> LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304], >> functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter >> isnotnull(ID#7303) +- *(3) Scan >> ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306] >> >> >> In comparison, this is the output of df1.explain() after df1 = >> df.join(df_t.select("ID"),["ID"])? >> == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345, >> kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort >> [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7344, >> 200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS >> kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan >> ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort [ID#7374 >> ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5) Filter >> (nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374], >> functions=[finalmerge_count(distinct merge count#7385L) AS >> count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +- >> *(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct >> LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374, >> LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374, >> LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375], >> functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter >> isnotnull(ID#7374) +- *(3) Scan ExistingRDD[ID#7374,LABEL#7375 >> ,k#7376L,score#7377] >> >> >> Here is the code I run and the error I get in Spark 2.3.0. By looking at >> the error, the cause seems to be that spark doesn't look up the column by >> its name but by a serial number and the serial number somehow is messed >> up. >> >> import pyspark.sql.functions as F >> from pyspark.sql import Row >> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k= >> 2),Row(score=1.0,ID='abc',LABEL=False,k=3)]) >> >> df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk") >> #line B >> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")). >> filter(F.col("nL")>1) >> df = df.join(df_t.select("ID"),["ID"]) >> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", >> "cnt1") >> df = df.join(df_sw, ["ID","kk"]) >> >> This is the error: >> 'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121 >> in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s) >> with the same name appear in the operation: kk. Please check if the right >> attribute(s) are used.;;\nProject [ID#88, kk#96L, score#91, LABEL#89, >> cnt1#140L]\n+- Join Inner, ((ID#88 = ID#150) && (kk#96L = kk#144L))\n :- >> Project [ID#88, score#91, LABEL#89, kk#96L]\n : +- Join Inner, (ID#88 = >> ID#118)\n : :- Project [ID#88, score#91, LABEL#89, kk#96L]\n : : +- Project >> [ID#88, LABEL#89, k#90L AS kk#96L, score#91]\n : : +- LogicalRDD [ID#88, >> LABEL#89, k#90L, score#91], false\n : +- Project [ID#118]\n : +- Filter >> (nL#110L > cast(1 as bigint))\n : +- Aggregate [ID#118], [ID#118, >> count(distinct LABEL#119) AS nL#110L]\n : +- Project [ID#118, score#121, >> LABEL#119, kk#96L]\n : +- Project [ID#118, LABEL#119, k#120L AS kk#96L, >> score#121]\n : +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], >> false\n +- Project [ID#150, kk#144L, count#134L AS cnt1#140L]\n +- >> Aggregate [ID#150, kk#144L], [ID#150, kk#144L, count(1) AS count#134L]\n +- >> Project [ID#150, score#153, LABEL#151, kk#144L]\n +- Join Inner, (ID#150 = >> ID#118)\n :- Project [ID#150, score#153, LABEL#151, kk#144L]\n : +- Project >> [ID#150, LABEL#151, k#152L AS kk#144L, score#153]\n : +- LogicalRDD >> [ID#150, LABEL#151, k#152L, score#153], false\n +- Project [ID#118]\n +- >> Filter (nL#110L > cast(1 as bigint))\n +- Aggregate [ID#118], [ID#118, >> count(distinct LABEL#119) AS nL#110L]\n +- !Project [ID#118, score#121, >> LABEL#119, kk#144L]\n +- Project [ID#118, LABEL#119, k#120L AS kk#96L, >> score#121]\n +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n' >> >> >> >> >> On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta < >> gourav.sengu...@gmail.com> wrote: >> >>> Hi, >>> >>> what I am curious about is the reassignment of df. >>> >>> Can you please look into the explain plan of df after the statement df = >>> df.join(df_t.select("ID"),["ID"])? And then compare with the explain >>> plan of df1 after the statement df1 = df.join(df_t.select("ID"),["ID"])? >>> >>> Its late here, but I am yet to go through this completely. But I think >>> that SPARK does throw a warning mentioning us to use Row instead of >>> Dictionary. >>> >>> It will be of help if you could kindly try using the below statement and >>> go through your used case once again (I am yet to go through all the lines): >>> >>> >>> >>> from pyspark.sql import Row >>> >>> df = spark.createDataFrame([Row(score = 1.0,ID="abc",LABEL=True,k=2), >>> Row(score = 1.0,ID="abc",LABEL=True,k=3)]) >>> >>> Regards, >>> Gourav Sengupta >>> >>> >>> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gshy2...@gmail.com> wrote: >>> >>>> Hi Spark Users, >>>> The following code snippet has an "attribute missing" error while >>>> the attribute exists. This bug is triggered by a particular sequence of >>>> of "select", "groupby" and "join". Note that if I take away the "select" >>>> in #line B, the code runs without error. However, the "select" in #line >>>> B includes all columns in the dataframe and hence should not affect the >>>> final result. >>>> >>>> >>>> import pyspark.sql.functions as F >>>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True, >>>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}]) >>>> >>>> df = df.withColumnRenamed("k","kk")\ >>>> .select("ID","score","LABEL","kk") #line B >>>> >>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f >>>> ilter(F.col("nL")>1) >>>> df = df.join(df_t.select("ID"),["ID"]) >>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", >>>> "cnt1") >>>> df = df.join(df_sw, ["ID","kk"]) >>>> >>> >>> >> >