This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new e3b031276e4 [SPARK-44634][SQL] Encoders.bean does no longer support 
nested beans with type arguments
e3b031276e4 is described below

commit e3b031276e4ab626f7db7b8d95f01a598e25a6b1
Author: Giambattista Bloisi <giambattista.blo...@openaire.eu>
AuthorDate: Sun Aug 6 21:47:57 2023 +0200

    [SPARK-44634][SQL] Encoders.bean does no longer support nested beans with 
type arguments
    
    ### What changes were proposed in this pull request?
    This PR fixes a regression introduced in Spark 3.4.x  where  Encoders.bean 
is no longer able to process nested beans having type arguments. For example:
    
    ```
    class A<T> {
       T value;
       // value getter and setter
    }
    
    class B {
       A<String> stringHolder;
       // stringHolder getter and setter
    }
    
    Encoders.bean(B.class); // throws "SparkUnsupportedOperationException: 
[ENCODER_NOT_FOUND]..."
    ```
    
    ### Why are the changes needed?
    JavaTypeInference.encoderFor main match does not manage ParameterizedType 
and TypeVariable cases. I think this is a regression introduced after getting 
rid of usage of guava TypeToken: [SPARK-42093 SQL Move JavaTypeInference to 
AgnosticEncoders](https://github.com/apache/spark/commit/18672003513d5a4aa610b6b94dbbc15c33185d3#diff-1191737b908340a2f4c22b71b1c40ebaa0da9d8b40c958089c346a3bda26943b)
 hvanhovell cloud-fan
    
    In this PR I'm leveraging commons lang3 TypeUtils functionalities to solve 
ParameterizedType type arguments for classes
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests have been extended to check correct encoding of a nested 
bean having type arguments.
    
    Closes #42327 from gbloisi-openaire/spark-44634.
    
    Authored-by: Giambattista Bloisi <giambattista.blo...@openaire.eu>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
    (cherry picked from commit d6998979427b6ad3a0f16d6966b3927d40440a60)
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../spark/sql/catalyst/JavaTypeInference.scala     | 84 +++++-----------------
 .../spark/sql/catalyst/JavaBeanWithGenerics.java   | 41 +++++++++++
 .../sql/catalyst/JavaTypeInferenceSuite.scala      |  4 ++
 3 files changed, 64 insertions(+), 65 deletions(-)

diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index f352d28a7b5..3d536b735db 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -18,12 +18,14 @@ package org.apache.spark.sql.catalyst
 
 import java.beans.{Introspector, PropertyDescriptor}
 import java.lang.reflect.{ParameterizedType, Type, TypeVariable}
-import java.util.{ArrayDeque, List => JList, Map => JMap}
+import java.util.{List => JList, Map => JMap}
 import javax.annotation.Nonnull
 
-import scala.annotation.tailrec
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
+import org.apache.commons.lang3.reflect.{TypeUtils => JavaTypeUtils}
+
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, 
BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, 
BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, 
DayTimeIntervalEncoder, DEFAULT_JAVA_DECIMAL_ENCODER, EncoderField, 
IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaEnumEncoder, 
LocalDateTimeEncoder, MapEncoder, PrimitiveBooleanEncoder, 
PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, P [...]
 import org.apache.spark.sql.errors.ExecutionErrors
@@ -57,7 +59,8 @@ object JavaTypeInference {
     encoderFor(beanType, Set.empty).asInstanceOf[AgnosticEncoder[T]]
   }
 
-  private def encoderFor(t: Type, seenTypeSet: Set[Class[_]]): 
AgnosticEncoder[_] = t match {
+  private def encoderFor(t: Type, seenTypeSet: Set[Class[_]],
+    typeVariables: Map[TypeVariable[_], Type] = Map.empty): AgnosticEncoder[_] 
= t match {
 
     case c: Class[_] if c == java.lang.Boolean.TYPE => PrimitiveBooleanEncoder
     case c: Class[_] if c == java.lang.Byte.TYPE => PrimitiveByteEncoder
@@ -101,18 +104,24 @@ object JavaTypeInference {
       UDTEncoder(udt, udt.getClass)
 
     case c: Class[_] if c.isArray =>
-      val elementEncoder = encoderFor(c.getComponentType, seenTypeSet)
+      val elementEncoder = encoderFor(c.getComponentType, seenTypeSet, 
typeVariables)
       ArrayEncoder(elementEncoder, elementEncoder.nullable)
 
-    case ImplementsList(c, Array(elementCls)) =>
-      val element = encoderFor(elementCls, seenTypeSet)
+    case c: Class[_] if classOf[JList[_]].isAssignableFrom(c) =>
+      val element = encoderFor(c.getTypeParameters.array(0), seenTypeSet, 
typeVariables)
       IterableEncoder(ClassTag(c), element, element.nullable, 
lenientSerialization = false)
 
-    case ImplementsMap(c, Array(keyCls, valueCls)) =>
-      val keyEncoder = encoderFor(keyCls, seenTypeSet)
-      val valueEncoder = encoderFor(valueCls, seenTypeSet)
+    case c: Class[_] if classOf[JMap[_, _]].isAssignableFrom(c) =>
+      val keyEncoder = encoderFor(c.getTypeParameters.array(0), seenTypeSet, 
typeVariables)
+      val valueEncoder = encoderFor(c.getTypeParameters.array(1), seenTypeSet, 
typeVariables)
       MapEncoder(ClassTag(c), keyEncoder, valueEncoder, valueEncoder.nullable)
 
+    case tv: TypeVariable[_] =>
+      encoderFor(typeVariables(tv), seenTypeSet, typeVariables)
+
+    case pt: ParameterizedType =>
+      encoderFor(pt.getRawType, seenTypeSet, 
JavaTypeUtils.getTypeArguments(pt).asScala.toMap)
+
     case c: Class[_] =>
       if (seenTypeSet.contains(c)) {
         throw ExecutionErrors.cannotHaveCircularReferencesInBeanClassError(c)
@@ -124,7 +133,7 @@ object JavaTypeInference {
       // Note that the fields are ordered by name.
       val fields = properties.map { property =>
         val readMethod = property.getReadMethod
-        val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet 
+ c)
+        val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet 
+ c, typeVariables)
         // The existence of `javax.annotation.Nonnull`, means this field is 
not nullable.
         val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull])
         EncoderField(
@@ -147,59 +156,4 @@ object JavaTypeInference {
       .filterNot(_.getName == "declaringClass")
       .filter(_.getReadMethod != null)
   }
-
-  private class ImplementsGenericInterface(interface: Class[_]) {
-    assert(interface.isInterface)
-    assert(interface.getTypeParameters.nonEmpty)
-
-    def unapply(t: Type): Option[(Class[_], Array[Type])] = 
implementsInterface(t).map { cls =>
-      cls -> findTypeArgumentsForInterface(t)
-    }
-
-    @tailrec
-    private def implementsInterface(t: Type): Option[Class[_]] = t match {
-      case pt: ParameterizedType => implementsInterface(pt.getRawType)
-      case c: Class[_] if interface.isAssignableFrom(c) => Option(c)
-      case _ => None
-    }
-
-    private def findTypeArgumentsForInterface(t: Type): Array[Type] = {
-      val queue = new ArrayDeque[(Type, Map[Any, Type])]
-      queue.add(t -> Map.empty)
-      while (!queue.isEmpty) {
-        queue.poll() match {
-          case (pt: ParameterizedType, bindings) =>
-            // translate mappings...
-            val mappedTypeArguments = pt.getActualTypeArguments.map {
-              case v: TypeVariable[_] => bindings(v.getName)
-              case v => v
-            }
-            if (pt.getRawType == interface) {
-              return mappedTypeArguments
-            } else {
-              val mappedTypeArgumentMap = mappedTypeArguments
-                .zipWithIndex.map(_.swap)
-                .toMap[Any, Type]
-              queue.add(pt.getRawType -> mappedTypeArgumentMap)
-            }
-          case (c: Class[_], indexedBindings) =>
-            val namedBindings = c.getTypeParameters.zipWithIndex.map {
-              case (parameter, index) =>
-                parameter.getName -> indexedBindings(index)
-            }.toMap[Any, Type]
-            val superClass = c.getGenericSuperclass
-            if (superClass != null) {
-              queue.add(superClass -> namedBindings)
-            }
-            c.getGenericInterfaces.foreach { iface =>
-              queue.add(iface -> namedBindings)
-            }
-        }
-      }
-      throw ExecutionErrors.unreachableError()
-    }
-  }
-
-  private object ImplementsList extends 
ImplementsGenericInterface(classOf[JList[_]])
-  private object ImplementsMap extends 
ImplementsGenericInterface(classOf[JMap[_, _]])
 }
diff --git 
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/JavaBeanWithGenerics.java
 
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/JavaBeanWithGenerics.java
new file mode 100644
index 00000000000..b84a3122cf8
--- /dev/null
+++ 
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/JavaBeanWithGenerics.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst;
+
+class JavaBeanWithGenerics<T,A> {
+    private A attribute;
+
+    private T value;
+
+    public A getAttribute() {
+        return attribute;
+    }
+
+    public void setAttribute(A attribute) {
+        this.attribute = attribute;
+    }
+
+    public T getValue() {
+        return value;
+    }
+
+    public void setValue(T value) {
+        this.value = value;
+    }
+}
+
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/JavaTypeInferenceSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/JavaTypeInferenceSuite.scala
index 35f5bf739bf..64399976097 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/JavaTypeInferenceSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/JavaTypeInferenceSuite.scala
@@ -66,6 +66,7 @@ class LeafBean {
   @BeanProperty var period: java.time.Period = _
   @BeanProperty var enum: java.time.Month = _
   @BeanProperty val readOnlyString = "read-only"
+  @BeanProperty var genericNestedBean: JavaBeanWithGenerics[String, String] = _
 
   var nonNullString: String = "value"
   @javax.annotation.Nonnull
@@ -184,6 +185,9 @@ class JavaTypeInferenceSuite extends SparkFunSuite {
       encoderField("date", STRICT_DATE_ENCODER),
       encoderField("duration", DayTimeIntervalEncoder),
       encoderField("enum", JavaEnumEncoder(classTag[java.time.Month])),
+      encoderField("genericNestedBean", JavaBeanEncoder(
+        ClassTag(classOf[JavaBeanWithGenerics[String, String]]),
+        Seq(encoderField("attribute", StringEncoder), encoderField("value", 
StringEncoder)))),
       encoderField("instant", STRICT_INSTANT_ENCODER),
       encoderField("localDate", STRICT_LOCAL_DATE_ENCODER),
       encoderField("localDateTime", LocalDateTimeEncoder),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to