Repository: spark
Updated Branches:
  refs/heads/master b2329fb1f -> d9ca1c906


[SPARK-23593][SQL] Add interpreted execution for InitializeJavaBean expression

## What changes were proposed in this pull request?

Add interpreted execution for `InitializeJavaBean` expression.

## How was this patch tested?

Added unit test.

Author: Liang-Chi Hsieh <vii...@gmail.com>

Closes #20985 from viirya/SPARK-23593-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9ca1c90
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9ca1c90
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9ca1c90

Branch: refs/heads/master
Commit: d9ca1c906bd0571802f2297c36b407e660fcdb64
Parents: b2329fb
Author: Liang-Chi Hsieh <vii...@gmail.com>
Authored: Thu Apr 5 20:43:05 2018 +0200
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Thu Apr 5 20:43:05 2018 +0200

----------------------------------------------------------------------
 .../catalyst/expressions/objects/objects.scala  | 45 ++++++++++++++++--
 .../expressions/ExpressionEvalHelper.scala      |  9 ++--
 .../expressions/ObjectExpressionsSuite.scala    | 48 ++++++++++++++++++++
 3 files changed, 96 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d9ca1c90/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 3fa91bd..9252425 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -1420,8 +1420,45 @@ case class InitializeJavaBean(beanInstance: Expression, 
setters: Map[String, Exp
   override def children: Seq[Expression] = beanInstance +: setters.values.toSeq
   override def dataType: DataType = beanInstance.dataType
 
-  override def eval(input: InternalRow): Any =
-    throw new UnsupportedOperationException("Only code-generated evaluation is 
supported.")
+  private lazy val resolvedSetters = {
+    assert(beanInstance.dataType.isInstanceOf[ObjectType])
+
+    val ObjectType(beanClass) = beanInstance.dataType
+    setters.map {
+      case (name, expr) =>
+        // Looking for known type mapping.
+        // But also looking for general `Object`-type parameter for generic 
methods.
+        val paramTypes = ScalaReflection.expressionJavaClasses(Seq(expr)) ++ 
Seq(classOf[Object])
+        val methods = paramTypes.flatMap { fieldClass =>
+          try {
+            Some(beanClass.getDeclaredMethod(name, fieldClass))
+          } catch {
+            case e: NoSuchMethodException => None
+          }
+        }
+        if (methods.isEmpty) {
+          throw new NoSuchMethodException(s"""A method named "$name" is not 
declared """ +
+            "in any enclosing class nor any supertype")
+        }
+        methods.head -> expr
+    }
+  }
+
+  override def eval(input: InternalRow): Any = {
+    val instance = beanInstance.eval(input)
+    if (instance != null) {
+      val bean = instance.asInstanceOf[Object]
+      resolvedSetters.foreach {
+        case (setter, expr) =>
+          val paramVal = expr.eval(input)
+          // We don't call setter if input value is null.
+          if (paramVal != null) {
+            setter.invoke(bean, paramVal.asInstanceOf[AnyRef])
+          }
+      }
+    }
+    instance
+  }
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     val instanceGen = beanInstance.genCode(ctx)
@@ -1434,7 +1471,9 @@ case class InitializeJavaBean(beanInstance: Expression, 
setters: Map[String, Exp
         val fieldGen = fieldValue.genCode(ctx)
         s"""
            |${fieldGen.code}
-           |$javaBeanInstance.$setterMethod(${fieldGen.value});
+           |if (!${fieldGen.isNull}) {
+           |  $javaBeanInstance.$setterMethod(${fieldGen.value});
+           |}
          """.stripMargin
     }
     val initializeCode = ctx.splitExpressionsWithCurrentInputs(

http://git-wip-us.apache.org/repos/asf/spark/blob/d9ca1c90/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index 3828f17..a5ecd1b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -55,7 +55,8 @@ trait ExpressionEvalHelper extends 
GeneratorDrivenPropertyChecks {
 
   protected def checkEvaluation(
       expression: => Expression, expected: Any, inputRow: InternalRow = 
EmptyRow): Unit = {
-    val expr = prepareEvaluation(expression)
+    // Make it as method to obtain fresh expression everytime.
+    def expr = prepareEvaluation(expression)
     val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
     checkEvaluationWithoutCodegen(expr, catalystValue, inputRow)
     checkEvaluationWithGeneratedMutableProjection(expr, catalystValue, 
inputRow)
@@ -111,12 +112,14 @@ trait ExpressionEvalHelper extends 
GeneratorDrivenPropertyChecks {
         val errMsg = intercept[T] {
           eval
         }.getMessage
-        if (errMsg != expectedErrMsg) {
+        if (!errMsg.contains(expectedErrMsg)) {
           fail(s"Expected error message is `$expectedErrMsg`, but `$errMsg` 
found")
         }
       }
     }
-    val expr = prepareEvaluation(expression)
+
+    // Make it as method to obtain fresh expression everytime.
+    def expr = prepareEvaluation(expression)
     checkException(evaluateWithoutCodegen(expr, inputRow), "non-codegen mode")
     checkException(evaluateWithGeneratedMutableProjection(expr, inputRow), 
"codegen mode")
     if (GenerateUnsafeProjection.canSupport(expr.dataType)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/d9ca1c90/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index 1d59b20..b1bc67d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -192,6 +192,46 @@ class ObjectExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
       Invoke(funcSubObj, "binOp", DoubleType, inputSum), 0.75, 
InternalRow.apply(1, 0.25))
   }
 
+  test("SPARK-23593: InitializeJavaBean should support interpreted execution") 
{
+    val list = new java.util.LinkedList[Int]()
+    list.add(1)
+
+    val initializeBean = InitializeJavaBean(Literal.fromObject(new 
java.util.LinkedList[Int]),
+      Map("add" -> Literal(1)))
+    checkEvaluation(initializeBean, list, InternalRow.fromSeq(Seq()))
+
+    val initializeWithNonexistingMethod = InitializeJavaBean(
+      Literal.fromObject(new java.util.LinkedList[Int]),
+      Map("nonexisting" -> Literal(1)))
+    checkExceptionInExpression[Exception](initializeWithNonexistingMethod,
+      InternalRow.fromSeq(Seq()),
+      """A method named "nonexisting" is not declared in any enclosing class 
""" +
+        "nor any supertype")
+
+    val initializeWithWrongParamType = InitializeJavaBean(
+      Literal.fromObject(new TestBean),
+      Map("setX" -> Literal("1")))
+    intercept[Exception] {
+      evaluateWithoutCodegen(initializeWithWrongParamType, 
InternalRow.fromSeq(Seq()))
+    }.getMessage.contains(
+      """A method named "setX" is not declared in any enclosing class """ +
+        "nor any supertype")
+  }
+
+  test("InitializeJavaBean doesn't call setters if input in null") {
+    val initializeBean = InitializeJavaBean(
+      Literal.fromObject(new TestBean),
+      Map("setNonPrimitive" -> Literal(null)))
+    evaluateWithoutCodegen(initializeBean, InternalRow.fromSeq(Seq()))
+    evaluateWithGeneratedMutableProjection(initializeBean, 
InternalRow.fromSeq(Seq()))
+
+    val initializeBean2 = InitializeJavaBean(
+      Literal.fromObject(new TestBean),
+      Map("setNonPrimitive" -> Literal("string")))
+    evaluateWithoutCodegen(initializeBean2, InternalRow.fromSeq(Seq()))
+    evaluateWithGeneratedMutableProjection(initializeBean2, 
InternalRow.fromSeq(Seq()))
+  }
+
   test("SPARK-23585: UnwrapOption should support interpreted execution") {
     val cls = classOf[Option[Int]]
     val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
@@ -342,3 +382,11 @@ class ObjectExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     }
   }
 }
+
+class TestBean extends Serializable {
+  private var x: Int = 0
+
+  def setX(i: Int): Unit = x = i
+  def setNonPrimitive(i: AnyRef): Unit =
+    assert(i != null, "this setter should not be called with null.")
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to