[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22527 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/22527#discussion_r222817293 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1098,16 +1098,26 @@ object SQLContext { data: Iterator[_], beanClass: Class[_], attrs: Seq[AttributeReference]): Iterator[InternalRow] = { -val extractors = - JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) -val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => - (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) +def createStructConverter(cls: Class[_], fieldTypes: Seq[DataType]): Any => InternalRow = { + val methodConverters = + JavaTypeInference.getJavaBeanReadableProperties(cls).zip(fieldTypes) + .map { case (property, fieldType) => +val method = property.getReadMethod +method -> createConverter(method.getReturnType, fieldType) + } + value => +if (value == null) null +else new GenericInternalRow( + methodConverters.map { case (method, converter) => +converter(method.invoke(value)) + }) --- End diff -- Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22527#discussion_r222520022 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1098,16 +1098,26 @@ object SQLContext { data: Iterator[_], beanClass: Class[_], attrs: Seq[AttributeReference]): Iterator[InternalRow] = { -val extractors = - JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) -val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => - (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) +def createStructConverter(cls: Class[_], fieldTypes: Seq[DataType]): Any => InternalRow = { + val methodConverters = + JavaTypeInference.getJavaBeanReadableProperties(cls).zip(fieldTypes) + .map { case (property, fieldType) => +val method = property.getReadMethod +method -> createConverter(method.getReturnType, fieldType) + } + value => +if (value == null) null +else new GenericInternalRow( + methodConverters.map { case (method, converter) => +converter(method.invoke(value)) + }) --- End diff -- nit: please use braces for multi-lined if-else. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/22527#discussion_r222415998 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1098,16 +1098,24 @@ object SQLContext { data: Iterator[_], beanClass: Class[_], attrs: Seq[AttributeReference]): Iterator[InternalRow] = { -val extractors = - JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) -val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => - (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) +def createStructConverter(cls: Class[_], fieldTypes: Iterator[DataType]): Any => InternalRow = { + val methodConverters = + JavaTypeInference.getJavaBeanReadableProperties(cls).iterator.zip(fieldTypes) + .map { case (property, fieldType) => +val method = property.getReadMethod +method -> createConverter(method.getReturnType, fieldType) + }.toArray + value => new GenericInternalRow( --- End diff -- You're right. Added. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/22527#discussion_r222415649 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1098,16 +1098,24 @@ object SQLContext { data: Iterator[_], beanClass: Class[_], attrs: Seq[AttributeReference]): Iterator[InternalRow] = { -val extractors = - JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) -val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => - (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) +def createStructConverter(cls: Class[_], fieldTypes: Iterator[DataType]): Any => InternalRow = { --- End diff -- I used `Iterator`s instead of `Seq`s in order to avoid creating intermediate collections. However, I agree it's more concise without that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22527#discussion_r222183494 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1098,16 +1098,24 @@ object SQLContext { data: Iterator[_], beanClass: Class[_], attrs: Seq[AttributeReference]): Iterator[InternalRow] = { -val extractors = - JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) -val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => - (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) +def createStructConverter(cls: Class[_], fieldTypes: Iterator[DataType]): Any => InternalRow = { --- End diff -- nit: `Seq[DataType]` instead of `Iterator[DataType]`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22527#discussion_r222185482 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1098,16 +1098,24 @@ object SQLContext { data: Iterator[_], beanClass: Class[_], attrs: Seq[AttributeReference]): Iterator[InternalRow] = { -val extractors = - JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) -val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => - (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) +def createStructConverter(cls: Class[_], fieldTypes: Iterator[DataType]): Any => InternalRow = { + val methodConverters = + JavaTypeInference.getJavaBeanReadableProperties(cls).iterator.zip(fieldTypes) + .map { case (property, fieldType) => +val method = property.getReadMethod +method -> createConverter(method.getReturnType, fieldType) + }.toArray + value => new GenericInternalRow( --- End diff -- We should check whether the `value` is `null` or not? Also could you add a test for the case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/22527#discussion_r222079624 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1100,13 +1101,23 @@ object SQLContext { attrs: Seq[AttributeReference]): Iterator[InternalRow] = { val extractors = JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) -val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => - (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) +val methodsToTypes = extractors.zip(attrs).map { case (e, attr) => + (e, attr.dataType) +} +def invoke(element: Any)(tuple: (Method, DataType)): Any = tuple match { --- End diff -- Good point. Thank you. Changed it in the latest commit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22527#discussion_r221857463 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1100,13 +1101,23 @@ object SQLContext { attrs: Seq[AttributeReference]): Iterator[InternalRow] = { val extractors = JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) -val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => - (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) +val methodsToTypes = extractors.zip(attrs).map { case (e, attr) => + (e, attr.dataType) +} +def invoke(element: Any)(tuple: (Method, DataType)): Any = tuple match { --- End diff -- Can we create converters before `data.map { ... }` instead of calculating converters for each row? I mean something like: ```scala def converter(e: Method, dt: DataType): Any => Any = dt match { case StructType(fields) => val nestedExtractors = JavaTypeInference.getJavaBeanReadableProperties(e.getReturnType).map(_.getReadMethod) val nestedConverters = nestedExtractors.zip(fields).map { case (extractor, field) => converter(extractor, field.dataType) } element => val value = e.invoke(element) new GenericInternalRow(nestedConverters.map(_(value))) case _ => val convert = CatalystTypeConverters.createToCatalystConverter(dt) element => convert(e.invoke(element)) } ``` and then ```scala val converters = extractors.zip(attrs).map { case (e, attr) => converter(e, attr.dataType) } data.map { element => new GenericInternalRow(converters.map(_(element))): InternalRow } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/22527#discussion_r219691829 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1100,13 +1101,24 @@ object SQLContext { attrs: Seq[AttributeReference]): Iterator[InternalRow] = { val extractors = JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) -val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => - (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) +val methodsToTypes = extractors.zip(attrs).map { case (e, attr) => + (e, attr.dataType) +} +def invoke(element: Any)(tuple: (Method, DataType)): Any = tuple match { + case (e, structType: StructType) => +val value = e.invoke(element) +val nestedExtractors = JavaTypeInference.getJavaBeanReadableProperties(value.getClass) +.map(desc => desc.getName -> desc.getReadMethod) +.toMap +new GenericInternalRow(structType.map(nestedProperty => + invoke(value)(nestedExtractors(nestedProperty.name) -> nestedProperty.dataType) +).toArray) --- End diff -- Right, we don't have to. Just checked and `JavaTypeInference.inferDataType` also uses `JavaTypeInference.getJavaBeanReadableProperties` so the order should be the same. Also double-checked manually in Spark shell with a more complex nested bean to be sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22527#discussion_r219686433 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java --- @@ -171,7 +184,12 @@ void validateDataFrameWithBeans(Bean bean, Dataset df) { schema.apply("d")); Assert.assertEquals(new StructField("e", DataTypes.createDecimalType(38,0), true, Metadata.empty()), schema.apply("e")); -Row first = df.select("a", "b", "c", "d", "e").first(); +Assert.assertEquals(new StructField("f", + DataTypes.createStructType(Collections.singletonList(new StructField( +"a", IntegerType$.MODULE$, false, Metadata.empty(, +true, Metadata.empty()), +schema.apply("f")); --- End diff -- should be double spaced. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22527#discussion_r219686429 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1100,13 +1101,24 @@ object SQLContext { attrs: Seq[AttributeReference]): Iterator[InternalRow] = { val extractors = JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) -val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => - (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) +val methodsToTypes = extractors.zip(attrs).map { case (e, attr) => + (e, attr.dataType) +} +def invoke(element: Any)(tuple: (Method, DataType)): Any = tuple match { + case (e, structType: StructType) => +val value = e.invoke(element) +val nestedExtractors = JavaTypeInference.getJavaBeanReadableProperties(value.getClass) +.map(desc => desc.getName -> desc.getReadMethod) +.toMap +new GenericInternalRow(structType.map(nestedProperty => + invoke(value)(nestedExtractors(nestedProperty.name) -> nestedProperty.dataType) +).toArray) --- End diff -- Why should we use a map here while we don't need it for the root bean? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
GitHub user michalsenkyr opened a pull request: https://github.com/apache/spark/pull/22527 [SPARK-17952][SQL] Nested Java beans support in createDataFrame ## What changes were proposed in this pull request? When constructing a DataFrame from a Java bean, using nested beans throws an error despite [documentation](http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection) stating otherwise. This PR aims to add that support. This PR does not yet add nested beans support in array or List fields. This can be added later or in another PR. ## How was this patch tested? Nested bean was added to the appropriate unit test. Also manually tested in Spark shell on code emulating the referenced JIRA: ``` scala> import scala.beans.BeanProperty import scala.beans.BeanProperty scala> class SubCategory(@BeanProperty var id: String, @BeanProperty var name: String) extends Serializable defined class SubCategory scala> class Category(@BeanProperty var id: String, @BeanProperty var subCategory: SubCategory) extends Serializable defined class Category scala> import scala.collection.JavaConverters._ import scala.collection.JavaConverters._ scala> spark.createDataFrame(Seq(new Category("s-111", new SubCategory("sc-111", "Sub-1"))).asJava, classOf[Category]) java.lang.IllegalArgumentException: The value (SubCategory@65130cf2) of the type (SubCategory) cannot be converted to struct at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396) at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1108) at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1108) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1108) at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1106) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$class.toStream(Iterator.scala:1320) at scala.collection.AbstractIterator.toStream(Iterator.scala:1334) at scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298) at scala.collection.AbstractIterator.toSeq(Iterator.scala:1334) at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:423) ... 51 elided ``` New behavior: ``` scala> spark.createDataFrame(Seq(new Category("s-111", new SubCategory("sc-111", "Sub-1"))).asJava, classOf[Category]) res0: org.apache.spark.sql.DataFrame = [id: string, subCategory: struct] scala> res0.show() +-+---+ | id|subCategory| +-+---+ |s-111|[sc-111, Sub-1]| +-+---+ ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/michalsenkyr/spark SPARK-17952 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22527.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22527 commit ccea758b069c4622e9b1f71b92167c81cfcd81b8 Author: Michal Senkyr Date: 2018-09-22T18:25:36Z Add nested Java beans support to SQLContext.beansToRow --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org