Repository: spark Updated Branches: refs/heads/master 692378c01 -> 86c50bf72
[SPARK-9171][SQL] add and improve tests for nondeterministic expressions Author: Wenchen Fan <cloud0...@outlook.com> Closes #7496 from cloud-fan/tests and squashes the following commits: 0958f90 [Wenchen Fan] improve test for nondeterministic expressions Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86c50bf7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86c50bf7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86c50bf7 Branch: refs/heads/master Commit: 86c50bf72c41d95107a55c16a6853dcda7f3e143 Parents: 692378c Author: Wenchen Fan <cloud0...@outlook.com> Authored: Sat Jul 18 11:58:53 2015 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Sat Jul 18 11:58:53 2015 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/TaskContext.scala | 2 +- .../expressions/ExpressionEvalHelper.scala | 108 ++++++++++--------- .../expressions/MathFunctionsSuite.scala | 18 +--- .../sql/catalyst/expressions/RandomSuite.scala | 6 +- .../spark/sql/ColumnExpressionSuite.scala | 9 +- .../expression/NondeterministicSuite.scala | 32 ++++++ 6 files changed, 103 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/core/src/main/scala/org/apache/spark/TaskContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 345bb50..e93eb93 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -38,7 +38,7 @@ object TaskContext { */ def getPartitionId(): Int = { val tc = taskContext.get() - if (tc == null) { + if (tc eq null) { 0 } else { tc.partitionId() http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index c43486b..7a96044 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -23,7 +23,7 @@ import org.scalatest.Matchers._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateProjection, GenerateMutableProjection} +import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project} @@ -38,7 +38,7 @@ trait ExpressionEvalHelper { } protected def checkEvaluation( - expression: Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = { + expression: => Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = { val catalystValue = CatalystTypeConverters.convertToCatalyst(expected) checkEvaluationWithoutCodegen(expression, catalystValue, inputRow) checkEvaluationWithGeneratedMutableProjection(expression, catalystValue, inputRow) @@ -51,12 +51,14 @@ trait ExpressionEvalHelper { /** * Check the equality between result of expression and expected value, it will handle - * Array[Byte]. + * Array[Byte] and Spread[Double]. */ protected def checkResult(result: Any, expected: Any): Boolean = { (result, expected) match { case (result: Array[Byte], expected: Array[Byte]) => java.util.Arrays.equals(result, expected) + case (result: Double, expected: Spread[Double]) => + expected.isWithin(result) case _ => result == expected } } @@ -65,10 +67,29 @@ trait ExpressionEvalHelper { expression.eval(inputRow) } + protected def generateProject( + generator: => Projection, + expression: Expression): Projection = { + try { + generator + } catch { + case e: Throwable => + val ctx = new CodeGenContext + val evaluated = expression.gen(ctx) + fail( + s""" + |Code generation of $expression failed: + |${evaluated.code} + |$e + """.stripMargin) + } + } + protected def checkEvaluationWithoutCodegen( expression: Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = { + val actual = try evaluate(expression, inputRow) catch { case e: Exception => fail(s"Exception evaluating $expression", e) } @@ -85,21 +106,11 @@ trait ExpressionEvalHelper { expected: Any, inputRow: InternalRow = EmptyRow): Unit = { - val plan = try { - GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)() - } catch { - case e: Throwable => - val ctx = GenerateProjection.newCodeGenContext() - val evaluated = expression.gen(ctx) - fail( - s""" - |Code generation of $expression failed: - |${evaluated.code} - |$e - """.stripMargin) - } + val plan = generateProject( + GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)(), + expression) - val actual = plan(inputRow).apply(0) + val actual = plan(inputRow).get(0) if (!checkResult(actual, expected)) { val input = if (inputRow == EmptyRow) "" else s", input: $inputRow" fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expected$input") @@ -110,24 +121,19 @@ trait ExpressionEvalHelper { expression: Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = { - val ctx = GenerateProjection.newCodeGenContext() - lazy val evaluated = expression.gen(ctx) - val plan = try { - GenerateProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil) - } catch { - case e: Throwable => - fail( - s""" - |Code generation of $expression failed: - |${evaluated.code} - |$e - """.stripMargin) - } + val plan = generateProject( + GenerateProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil), + expression) val actual = plan(inputRow) val expectedRow = InternalRow(expected) + + // We reimplement hashCode in generated `SpecificRow`, make sure it's consistent with our + // interpreted version. if (actual.hashCode() != expectedRow.hashCode()) { + val ctx = new CodeGenContext + val evaluated = expression.gen(ctx) fail( s""" |Mismatched hashCodes for values: $actual, $expectedRow @@ -136,9 +142,10 @@ trait ExpressionEvalHelper { |Code: $evaluated """.stripMargin) } + if (actual != expectedRow) { val input = if (inputRow == EmptyRow) "" else s", input: $inputRow" - fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expected$input") + fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expectedRow$input") } if (actual.copy() != expectedRow) { fail(s"Copy of generated Row is wrong: actual: ${actual.copy()}, expected: $expectedRow") @@ -149,20 +156,10 @@ trait ExpressionEvalHelper { expression: Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = { - val ctx = GenerateUnsafeProjection.newCodeGenContext() - lazy val evaluated = expression.gen(ctx) - val plan = try { - GenerateUnsafeProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil) - } catch { - case e: Throwable => - fail( - s""" - |Code generation of $expression failed: - |${evaluated.code} - |$e - """.stripMargin) - } + val plan = generateProject( + GenerateUnsafeProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil), + expression) val unsafeRow = plan(inputRow) // UnsafeRow cannot be compared with GenericInternalRow directly @@ -170,7 +167,7 @@ trait ExpressionEvalHelper { val expectedRow = InternalRow(expected) if (actual != expectedRow) { val input = if (inputRow == EmptyRow) "" else s", input: $inputRow" - fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expected$input") + fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expectedRow$input") } } @@ -184,12 +181,23 @@ trait ExpressionEvalHelper { } protected def checkDoubleEvaluation( - expression: Expression, + expression: => Expression, expected: Spread[Double], inputRow: InternalRow = EmptyRow): Unit = { - val actual = try evaluate(expression, inputRow) catch { - case e: Exception => fail(s"Exception evaluating $expression", e) - } - actual.asInstanceOf[Double] shouldBe expected + checkEvaluationWithoutCodegen(expression, expected) + checkEvaluationWithGeneratedMutableProjection(expression, expected) + checkEvaluationWithOptimization(expression, expected) + + var plan = generateProject( + GenerateProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil), + expression) + var actual = plan(inputRow).get(0) + assert(checkResult(actual, expected)) + + plan = generateProject( + GenerateUnsafeProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil), + expression) + actual = FromUnsafeProjection(expression.dataType :: Nil)(plan(inputRow)).get(0) + assert(checkResult(actual, expected)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala index df988f5..04acd5b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala @@ -143,7 +143,6 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { case e: Exception => fail(s"Exception evaluating $expression", e) } if (!actual.asInstanceOf[Double].isNaN) { - val input = if (inputRow == EmptyRow) "" else s", input: $inputRow" fail(s"Incorrect evaluation (codegen off): $expression, " + s"actual: $actual, " + s"expected: NaN") @@ -155,23 +154,12 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { expression: Expression, inputRow: InternalRow = EmptyRow): Unit = { - val plan = try { - GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)() - } catch { - case e: Throwable => - val ctx = GenerateProjection.newCodeGenContext() - val evaluated = expression.gen(ctx) - fail( - s""" - |Code generation of $expression failed: - |${evaluated.code} - |$e - """.stripMargin) - } + val plan = generateProject( + GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)(), + expression) val actual = plan(inputRow).apply(0) if (!actual.asInstanceOf[Double].isNaN) { - val input = if (inputRow == EmptyRow) "" else s", input: $inputRow" fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: NaN") } } http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RandomSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RandomSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RandomSuite.scala index 9be2b23..698c81b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RandomSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RandomSuite.scala @@ -21,13 +21,13 @@ import org.scalatest.Matchers._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.types.{DoubleType, IntegerType} +import org.apache.spark.sql.types.DoubleType class RandomSuite extends SparkFunSuite with ExpressionEvalHelper { test("random") { - val row = create_row(1.1, 2.0, 3.1, null) - checkDoubleEvaluation(Rand(30), (0.7363714192755834 +- 0.001), row) + checkDoubleEvaluation(Rand(30), 0.7363714192755834 +- 0.001) + checkDoubleEvaluation(Randn(30), 0.5181478766595276 +- 0.001) } } http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala index 8f15479..6bd5804 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala @@ -450,7 +450,7 @@ class ColumnExpressionSuite extends QueryTest { test("monotonicallyIncreasingId") { // Make sure we have 2 partitions, each with 2 records. - val df = ctx.sparkContext.parallelize(1 to 2, 2).mapPartitions { iter => + val df = ctx.sparkContext.parallelize(Seq[Int](), 2).mapPartitions { _ => Iterator(Tuple1(1), Tuple1(2)) }.toDF("a") checkAnswer( @@ -460,10 +460,13 @@ class ColumnExpressionSuite extends QueryTest { } test("sparkPartitionId") { - val df = ctx.sparkContext.parallelize(1 to 1, 1).map(i => (i, i)).toDF("a", "b") + // Make sure we have 2 partitions, each with 2 records. + val df = ctx.sparkContext.parallelize(Seq[Int](), 2).mapPartitions { _ => + Iterator(Tuple1(1), Tuple1(2)) + }.toDF("a") checkAnswer( df.select(sparkPartitionId()), - Row(0) + Row(0) :: Row(0) :: Row(1) :: Row(1) :: Nil ) } http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala new file mode 100644 index 0000000..99e11fd --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.expression + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions. ExpressionEvalHelper +import org.apache.spark.sql.execution.expressions.{SparkPartitionID, MonotonicallyIncreasingID} + +class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper { + test("MonotonicallyIncreasingID") { + checkEvaluation(MonotonicallyIncreasingID(), 0) + } + + test("SparkPartitionID") { + checkEvaluation(SparkPartitionID, 0) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org