Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/15976#discussion_r89178617 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ObjectHashAggregateSuite.scala --- @@ -325,70 +320,67 @@ class ObjectHashAggregateSuite // Currently Spark SQL doesn't support evaluating distinct aggregate function together // with aggregate functions without partial aggregation support. - if (!(aggs.contains(withoutPartial) && aggs.contains(withDistinct))) { - // TODO Re-enables them after fixing SPARK-18403 - ignore( - s"randomized aggregation test - " + - s"${names.mkString("[", ", ", "]")} - " + - s"${if (withGroupingKeys) "with" else "without"} grouping keys - " + - s"with ${if (emptyInput) "empty" else "non-empty"} input" - ) { - var expected: Seq[Row] = null - var actual1: Seq[Row] = null - var actual2: Seq[Row] = null - - // Disables `ObjectHashAggregateExec` to obtain a standard answer - withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") { - val aggDf = doAggregation(df) - - if (aggs.intersect(Seq(withoutPartial, withPartialSafe, typed)).nonEmpty) { - assert(containsSortAggregateExec(aggDf)) - assert(!containsObjectHashAggregateExec(aggDf)) - assert(!containsHashAggregateExec(aggDf)) - } else { - assert(!containsSortAggregateExec(aggDf)) - assert(!containsObjectHashAggregateExec(aggDf)) - assert(containsHashAggregateExec(aggDf)) - } - - expected = aggDf.collect().toSeq + test( + s"randomized aggregation test - " + + s"${names.mkString("[", ", ", "]")} - " + + s"${if (withGroupingKeys) "with" else "without"} grouping keys - " + + s"with ${if (emptyInput) "empty" else "non-empty"} input" + ) { + var expected: Seq[Row] = null + var actual1: Seq[Row] = null + var actual2: Seq[Row] = null + + // Disables `ObjectHashAggregateExec` to obtain a standard answer + withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") { + val aggDf = doAggregation(df) + + if (aggs.intersect(Seq(withPartialSafe, typed)).nonEmpty) { + assert(containsSortAggregateExec(aggDf)) + assert(!containsObjectHashAggregateExec(aggDf)) + assert(!containsHashAggregateExec(aggDf)) + } else { + assert(!containsSortAggregateExec(aggDf)) + assert(!containsObjectHashAggregateExec(aggDf)) + assert(containsHashAggregateExec(aggDf)) } - // Enables `ObjectHashAggregateExec` - withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") { - val aggDf = doAggregation(df) - - if (aggs.contains(typed) && !aggs.contains(withoutPartial)) { - assert(!containsSortAggregateExec(aggDf)) - assert(containsObjectHashAggregateExec(aggDf)) - assert(!containsHashAggregateExec(aggDf)) - } else if (aggs.intersect(Seq(withoutPartial, withPartialSafe)).nonEmpty) { - assert(containsSortAggregateExec(aggDf)) - assert(!containsObjectHashAggregateExec(aggDf)) - assert(!containsHashAggregateExec(aggDf)) - } else { - assert(!containsSortAggregateExec(aggDf)) - assert(!containsObjectHashAggregateExec(aggDf)) - assert(containsHashAggregateExec(aggDf)) - } - - // Disables sort-based aggregation fallback (we only generate 50 rows, so 100 is - // big enough) to obtain a result to be checked. - withSQLConf(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "100") { - actual1 = aggDf.collect().toSeq - } - - // Enables sort-based aggregation fallback to obtain another result to be checked. - withSQLConf(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "3") { - // Here we are not reusing `aggDf` because the physical plan in `aggDf` is - // cached and won't be re-planned using the new fallback threshold. - actual2 = doAggregation(df).collect().toSeq - } + expected = aggDf.collect().toSeq + } + + // Enables `ObjectHashAggregateExec` + withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") { + val aggDf = doAggregation(df) + + if (aggs.contains(typed)) { + assert(!containsSortAggregateExec(aggDf)) + assert(containsObjectHashAggregateExec(aggDf)) + assert(!containsHashAggregateExec(aggDf)) + } else if (aggs.contains(withPartialSafe)) { + assert(containsSortAggregateExec(aggDf)) + assert(!containsObjectHashAggregateExec(aggDf)) + assert(!containsHashAggregateExec(aggDf)) + } else { + assert(!containsSortAggregateExec(aggDf)) + assert(!containsObjectHashAggregateExec(aggDf)) + assert(containsHashAggregateExec(aggDf)) } - doubleSafeCheckRows(actual1, expected, 1e-4) - doubleSafeCheckRows(actual2, expected, 1e-4) + // Disables sort-based aggregation fallback (we only generate 50 rows, so 100 is + // big enough) to obtain a result to be checked. + withSQLConf(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "100") { + actual1 = aggDf.collect().toSeq + } + + // Enables sort-based aggregation fallback to obtain another result to be checked. + withSQLConf(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "3") { + // Here we are not reusing `aggDf` because the physical plan in `aggDf` is + // cached and won't be re-planned using the new fallback threshold. + actual2 = doAggregation(df).collect().toSeq + } } + + doubleSafeCheckRows(actual1, expected, 1e-4) + doubleSafeCheckRows(actual2, expected, 1e-4) --- End diff -- All the changes made above in this file are used to resolve a logical conflict with PR #15703. We don't really have any aggregate functions that don't support partial aggregation now after merging #15703, must update the tests to reflect that.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org