[ https://issues.apache.org/jira/browse/SPARK-22316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16211528#comment-16211528 ]
Russell Spitzer edited comment on SPARK-22316 at 10/19/17 6:49 PM: ------------------------------------------------------------------- I can imagine many reasons I might want to access a column in a DataFrame. Lets say I want to add a new column based on the property of my reduced group {code} scala> grouped.withColumn("newId", grouped(grouped.columns(1))) org.apache.spark.sql.AnalysisException: Cannot resolve column name "ReduceAggregator(Customer)" among (value, ReduceAggregator(Customer)); {code} Or filter based on the results {code} scala> grouped.filter(grouped(grouped.columns(1)) < 5) org.apache.spark.sql.AnalysisException: Cannot resolve column name "ReduceAggregator(Customer)" among (value, ReduceAggregator(Customer)); {code} or to use the literal select function to expand the Struct which gives a slightly different error {code} scala> grouped.select("ReduceAggregator(Customer).*") org.apache.spark.sql.AnalysisException: cannot resolve 'ReduceAggregator(Customer).*' give input columns 'id, person, value'; {code} I should when I say "select" I mean actually access the column by its name for any purpose. was (Author: rspitzer): I can imagine many reasons i might want to access a column in a dataframe. Lets say I want to add a new column based on the property of my reduced group {code} scala> grouped.withColumn("newId", grouped(grouped.columns(1))) org.apache.spark.sql.AnalysisException: Cannot resolve column name "ReduceAggregator(Customer)" among (value, ReduceAggregator(Customer)); {code} Or filter based on the results {code} scala> grouped.filter(grouped(grouped.columns(1)) < 5) org.apache.spark.sql.AnalysisException: Cannot resolve column name "ReduceAggregator(Customer)" among (value, ReduceAggregator(Customer)); {code} or to use the literal select function to expand the Struct which gives a slightly different error {code} scala> grouped.select("ReduceAggregator(Customer).*") org.apache.spark.sql.AnalysisException: cannot resolve 'ReduceAggregator(Customer).*' give input columns 'id, person, value'; {code} I should when i say "select" I mean actually access the column by it's name for any purpose. > Cannot Select ReducedAggregator Column > -------------------------------------- > > Key: SPARK-22316 > URL: https://issues.apache.org/jira/browse/SPARK-22316 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.2.0 > Reporter: Russell Spitzer > Priority: Minor > > Given a dataset which has been run through reduceGroups like this > {code} > case class Person(name: String, age: Int) > case class Customer(id: Int, person: Person) > val ds = spark.createDataFrame(Seq(Customer(1,Person("russ", 85))) > val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x ) > {code} > We end up with a Dataset with the schema > {code} > org.apache.spark.sql.types.StructType = > StructType( > StructField(value,IntegerType,false), > StructField(ReduceAggregator(Customer), > StructType(StructField(id,IntegerType,false), > StructField(person, > StructType(StructField(name,StringType,true), > StructField(age,IntegerType,false)) > ,true)) > ,true)) > {code} > The column names are > {code} > Array(value, ReduceAggregator(Customer)) > {code} > But you cannot select the "ReduceAggregatorColumn" > {code} > grouped.select(grouped.columns(1)) > org.apache.spark.sql.AnalysisException: cannot resolve > '`ReduceAggregator(Customer)`' given input columns: [value, > ReduceAggregator(Customer)];; > 'Project ['ReduceAggregator(Customer)] > +- Aggregate [value#338], [value#338, > reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573, > Some(newInstance(class Customer)), Some(class Customer), > Some(StructType(StructField(id,IntegerType,false), > StructField(person,StructType(StructField(name,StringType,true), > StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1 > AS value#340, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) > null else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2, > true]._2)).id AS id#195, person, if > (isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, > true]._2)).person)) null else named_struct(name, staticinvoke(class > org.apache.spark.unsafe.types.UTF8String, StringType, fromString, > assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, > true]._2)).person).name, true), age, > assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, > true]._2)).person).age) AS person#196) AS _2#341, newInstance(class > scala.Tuple2), assertnotnull(assertnotnull(input[0, Customer, true])).id AS > id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer, > true])).person)) null else named_struct(name, staticinvoke(class > org.apache.spark.unsafe.types.UTF8String, StringType, fromString, > assertnotnull(assertnotnull(assertnotnull(input[0, Customer, > true])).person).name, true), age, > assertnotnull(assertnotnull(assertnotnull(input[0, Customer, > true])).person).age) AS person#196, StructField(id,IntegerType,false), > StructField(person,StructType(StructField(name,StringType,true), > StructField(age,IntegerType,false)),true), true, 0, 0) AS > ReduceAggregator(Customer)#346] > +- AppendColumns <function1>, class Customer, > [StructField(id,IntegerType,false), > StructField(person,StructType(StructField(name,StringType,true), > StructField(age,IntegerType,false)),true)], newInstance(class Customer), > [input[0, int, false] AS value#338] > +- LocalRelation [id#197, person#198] > {code} > You can work around this by using "toDF" to rename the column > {code} > scala> grouped.toDF("key", "reduced").select("reduced") > res55: org.apache.spark.sql.DataFrame = [reduced: struct<id: int, person: > struct<name: string, age: int>>] > {code} > I think that all invocations of > {code} > ds.select(ds.columns(i)) > {code} > For all valid i < columns size should work. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org