Russell Spitzer created SPARK-22316:
---------------------------------------

             Summary: Cannot Select ReducedAggreagtor Column
                 Key: SPARK-22316
                 URL: https://issues.apache.org/jira/browse/SPARK-22316
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.2.0
            Reporter: Russell Spitzer
            Priority: Minor


Given a dataset which has been run through reduceGroups like this
{code}
case class Person(name: String, age: Int)
case class Customer(id: Int, person: Person)
val ds = spark.createDataFrame(Seq(Customer(1,Person("russ", 85)))
val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
{code}

We end up with a Dataset with the schema

{code}
 org.apache.spark.sql.types.StructType = 
StructType(
  StructField(value,IntegerType,false), 
  StructField(ReduceAggregator(Customer),
    StructType(StructField(id,IntegerType,false),
    StructField(person,
      StructType(StructField(name,StringType,true),
      StructField(age,IntegerType,false))
   ,true))
,true))
{code}

The column names are 
{code}
Array(value, ReduceAggregator(Customer))
{code}

But you cannot select the "ReduceAggregatorColumn"
{code}
grouped.select(grouped.columns(1))
org.apache.spark.sql.AnalysisException: cannot resolve 
'`ReduceAggregator(Customer)`' given input columns: [value, 
ReduceAggregator(Customer)];;
'Project ['ReduceAggregator(Customer)]
+- Aggregate [value#338], [value#338, 
reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573, 
Some(newInstance(class Customer)), Some(class Customer), 
Some(StructType(StructField(id,IntegerType,false), 
StructField(person,StructType(StructField(name,StringType,true), 
StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1 AS 
value#340, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) null 
else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2, 
true]._2)).id AS id#195, person, if 
(isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).person)) 
null else named_struct(name, staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
true]._2)).person).name, true), age, 
assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
true]._2)).person).age) AS person#196) AS _2#341, newInstance(class 
scala.Tuple2), assertnotnull(assertnotnull(input[0, Customer, true])).id AS 
id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer, 
true])).person)) null else named_struct(name, staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
true])).person).name, true), age, 
assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
true])).person).age) AS person#196, StructField(id,IntegerType,false), 
StructField(person,StructType(StructField(name,StringType,true), 
StructField(age,IntegerType,false)),true), true, 0, 0) AS 
ReduceAggregator(Customer)#346]
   +- AppendColumns <function1>, class Customer, 
[StructField(id,IntegerType,false), 
StructField(person,StructType(StructField(name,StringType,true), 
StructField(age,IntegerType,false)),true)], newInstance(class Customer), 
[input[0, int, false] AS value#338]
      +- LocalRelation [id#197, person#198]
{code}

You can work around this by using "toDF" to rename the column

{code}
scala> grouped.toDF("key", "reduced").select("reduced")
res55: org.apache.spark.sql.DataFrame = [reduced: struct<id: int, person: 
struct<name: string, age: int>>]
{code}

I think that all invocations of 
{code}
ds.select(ds.columns(i))
{code}
For all valid i < columns size should work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to