There are multiple records for the DF scala> structDF.groupBy($"a").agg(min(struct($"record.*"))).show +---+-----------------------------+ | a|min(struct(unresolvedstar()))| +---+-----------------------------+ | 1| [1,1]| | 3| [3,1]| | 2| [2,1]|
The meaning of .groupBy($"a").agg(min(struct($"record.*"))) is to get the min for all the records with the same $”a” For example: TestData2(1,1) :: TestData2(1,2) The result would be 1, (1, 1), since struct(1, 1) is less than struct(1, 2). Please check how the Ordering is implemented in InterpretedOrdering. The output itself does not have any ordering. I am not sure why the unit test and the real env have different environment. Xiao, I do see the difference between unit test and local cluster run. Do you know the reason? Thanks. Zhan Zhang On Apr 22, 2016, at 11:23 AM, Yong Zhang <java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote: Hi, I was trying to find out why this unit test can pass in Spark code. in https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala for this unit test: test("Star Expansion - CreateStruct and CreateArray") { val structDf = testData2.select("a", "b").as("record") // CreateStruct and CreateArray in aggregateExpressions assert(structDf.groupBy($"a").agg(min(struct($"record.*"))).first() == Row(3, Row(3, 1))) assert(structDf.groupBy($"a").agg(min(array($"record.*"))).first() == Row(3, Seq(3, 1))) // CreateStruct and CreateArray in project list (unresolved alias) assert(structDf.select(struct($"record.*")).first() == Row(Row(1, 1))) assert(structDf.select(array($"record.*")).first().getAs[Seq[Int]](0) === Seq(1, 1)) // CreateStruct and CreateArray in project list (alias) assert(structDf.select(struct($"record.*").as("a")).first() == Row(Row(1, 1))) assert(structDf.select(array($"record.*").as("a")).first().getAs[Seq[Int]](0) === Seq(1, 1)) } >From my understanding, the data return in this case should be Row(1, Row(1, >1]), as that will be min of struct. In fact, if I run the spark-shell on my laptop, and I got the result I expected: ./bin/spark-shell Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.0.0-SNAPSHOT /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91) Type in expressions to have them evaluated. Type :help for more information. scala> case class TestData2(a: Int, b: Int) defined class TestData2 scala> val testData2DF = sqlContext.sparkContext.parallelize(TestData2(1,1) :: TestData2(1,2) :: TestData2(2,1) :: TestData2(2,2) :: TestData2(3,1) :: TestData2(3,2) :: Nil, 2).toDF() scala> val structDF = testData2DF.select("a","b").as("record") scala> structDF.groupBy($"a").agg(min(struct($"record.*"))).first() res0: org.apache.spark.sql.Row = [1,[1,1]] scala> structDF.show +---+---+ | a| b| +---+---+ | 1| 1| | 1| 2| | 2| 1| | 2| 2| | 3| 1| | 3| 2| +---+---+ So from my spark, which I built on the master, I cannot get Row[3,[1,1]] back in this case. Why the unit test asserts that Row[3,[1,1]] should be the first, and it will pass? But I cannot reproduce that in my spark-shell? I am trying to understand how to interpret the meaning of "agg(min(struct($"record.*")))" Thanks Yong