[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-10-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22527


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-10-04 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22527#discussion_r222817293
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1098,16 +1098,26 @@ object SQLContext {
   data: Iterator[_],
   beanClass: Class[_],
   attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
-val extractors =
-  
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
-val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
-  (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+def createStructConverter(cls: Class[_], fieldTypes: Seq[DataType]): 
Any => InternalRow = {
+  val methodConverters =
+
JavaTypeInference.getJavaBeanReadableProperties(cls).zip(fieldTypes)
+  .map { case (property, fieldType) =>
+val method = property.getReadMethod
+method -> createConverter(method.getReturnType, fieldType)
+  }
+  value =>
+if (value == null) null
+else new GenericInternalRow(
+  methodConverters.map { case (method, converter) =>
+converter(method.invoke(value))
+  })
--- End diff --

Done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-10-03 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22527#discussion_r222520022
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1098,16 +1098,26 @@ object SQLContext {
   data: Iterator[_],
   beanClass: Class[_],
   attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
-val extractors =
-  
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
-val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
-  (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+def createStructConverter(cls: Class[_], fieldTypes: Seq[DataType]): 
Any => InternalRow = {
+  val methodConverters =
+
JavaTypeInference.getJavaBeanReadableProperties(cls).zip(fieldTypes)
+  .map { case (property, fieldType) =>
+val method = property.getReadMethod
+method -> createConverter(method.getReturnType, fieldType)
+  }
+  value =>
+if (value == null) null
+else new GenericInternalRow(
+  methodConverters.map { case (method, converter) =>
+converter(method.invoke(value))
+  })
--- End diff --

nit: please use braces for multi-lined if-else.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-10-03 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22527#discussion_r222415998
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1098,16 +1098,24 @@ object SQLContext {
   data: Iterator[_],
   beanClass: Class[_],
   attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
-val extractors =
-  
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
-val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
-  (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+def createStructConverter(cls: Class[_], fieldTypes: 
Iterator[DataType]): Any => InternalRow = {
+  val methodConverters =
+
JavaTypeInference.getJavaBeanReadableProperties(cls).iterator.zip(fieldTypes)
+  .map { case (property, fieldType) =>
+val method = property.getReadMethod
+method -> createConverter(method.getReturnType, fieldType)
+  }.toArray
+  value => new GenericInternalRow(
--- End diff --

You're right. Added. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-10-03 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22527#discussion_r222415649
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1098,16 +1098,24 @@ object SQLContext {
   data: Iterator[_],
   beanClass: Class[_],
   attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
-val extractors =
-  
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
-val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
-  (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+def createStructConverter(cls: Class[_], fieldTypes: 
Iterator[DataType]): Any => InternalRow = {
--- End diff --

I used `Iterator`s instead of `Seq`s in order to avoid creating 
intermediate collections. However, I agree it's more concise without that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-10-02 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22527#discussion_r222183494
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1098,16 +1098,24 @@ object SQLContext {
   data: Iterator[_],
   beanClass: Class[_],
   attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
-val extractors =
-  
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
-val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
-  (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+def createStructConverter(cls: Class[_], fieldTypes: 
Iterator[DataType]): Any => InternalRow = {
--- End diff --

nit: `Seq[DataType]` instead of `Iterator[DataType]`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-10-02 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22527#discussion_r222185482
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1098,16 +1098,24 @@ object SQLContext {
   data: Iterator[_],
   beanClass: Class[_],
   attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
-val extractors =
-  
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
-val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
-  (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+def createStructConverter(cls: Class[_], fieldTypes: 
Iterator[DataType]): Any => InternalRow = {
+  val methodConverters =
+
JavaTypeInference.getJavaBeanReadableProperties(cls).iterator.zip(fieldTypes)
+  .map { case (property, fieldType) =>
+val method = property.getReadMethod
+method -> createConverter(method.getReturnType, fieldType)
+  }.toArray
+  value => new GenericInternalRow(
--- End diff --

We should check whether the `value` is `null` or not? Also could you add a 
test for the case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-10-02 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22527#discussion_r222079624
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1100,13 +1101,23 @@ object SQLContext {
   attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
 val extractors =
   
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
-val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
-  (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+val methodsToTypes = extractors.zip(attrs).map { case (e, attr) =>
+  (e, attr.dataType)
+}
+def invoke(element: Any)(tuple: (Method, DataType)): Any = tuple match 
{
--- End diff --

Good point. Thank you. Changed it in the latest commit


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-10-02 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22527#discussion_r221857463
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1100,13 +1101,23 @@ object SQLContext {
   attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
 val extractors =
   
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
-val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
-  (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+val methodsToTypes = extractors.zip(attrs).map { case (e, attr) =>
+  (e, attr.dataType)
+}
+def invoke(element: Any)(tuple: (Method, DataType)): Any = tuple match 
{
--- End diff --

Can we create converters before `data.map { ... }` instead of calculating 
converters for each row?

I mean something like:

```scala
def converter(e: Method, dt: DataType): Any => Any = dt match {
  case StructType(fields) =>
val nestedExtractors =
  
JavaTypeInference.getJavaBeanReadableProperties(e.getReturnType).map(_.getReadMethod)
val nestedConverters =
  nestedExtractors.zip(fields).map { case (extractor, field) =>
converter(extractor, field.dataType)
  }

element =>
  val value = e.invoke(element)
  new GenericInternalRow(nestedConverters.map(_(value)))
  case _ =>
val convert = CatalystTypeConverters.createToCatalystConverter(dt)
element => convert(e.invoke(element))
}
```
and then
```scala
val converters = extractors.zip(attrs).map { case (e, attr) =>
  converter(e, attr.dataType)
}
data.map { element =>
  new GenericInternalRow(converters.map(_(element))): InternalRow
}
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-09-23 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22527#discussion_r219691829
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1100,13 +1101,24 @@ object SQLContext {
   attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
 val extractors =
   
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
-val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
-  (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+val methodsToTypes = extractors.zip(attrs).map { case (e, attr) =>
+  (e, attr.dataType)
+}
+def invoke(element: Any)(tuple: (Method, DataType)): Any = tuple match 
{
+  case (e, structType: StructType) =>
+val value = e.invoke(element)
+val nestedExtractors = 
JavaTypeInference.getJavaBeanReadableProperties(value.getClass)
+.map(desc => desc.getName -> desc.getReadMethod)
+.toMap
+new GenericInternalRow(structType.map(nestedProperty =>
+  invoke(value)(nestedExtractors(nestedProperty.name) -> 
nestedProperty.dataType)
+).toArray)
--- End diff --

Right, we don't have to. Just checked and `JavaTypeInference.inferDataType` 
also uses `JavaTypeInference.getJavaBeanReadableProperties` so the order should 
be the same. Also double-checked manually in Spark shell with a more complex 
nested bean to be sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-09-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22527#discussion_r219686433
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java ---
@@ -171,7 +184,12 @@ void validateDataFrameWithBeans(Bean bean, 
Dataset df) {
   schema.apply("d"));
 Assert.assertEquals(new StructField("e", 
DataTypes.createDecimalType(38,0), true,
   Metadata.empty()), schema.apply("e"));
-Row first = df.select("a", "b", "c", "d", "e").first();
+Assert.assertEquals(new StructField("f",
+
DataTypes.createStructType(Collections.singletonList(new StructField(
+"a", IntegerType$.MODULE$, false, 
Metadata.empty(,
+true, Metadata.empty()),
+schema.apply("f"));
--- End diff --

should be double spaced.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-09-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22527#discussion_r219686429
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1100,13 +1101,24 @@ object SQLContext {
   attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
 val extractors =
   
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
-val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
-  (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+val methodsToTypes = extractors.zip(attrs).map { case (e, attr) =>
+  (e, attr.dataType)
+}
+def invoke(element: Any)(tuple: (Method, DataType)): Any = tuple match 
{
+  case (e, structType: StructType) =>
+val value = e.invoke(element)
+val nestedExtractors = 
JavaTypeInference.getJavaBeanReadableProperties(value.getClass)
+.map(desc => desc.getName -> desc.getReadMethod)
+.toMap
+new GenericInternalRow(structType.map(nestedProperty =>
+  invoke(value)(nestedExtractors(nestedProperty.name) -> 
nestedProperty.dataType)
+).toArray)
--- End diff --

Why should we use a map here while we don't need it for the root bean?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-09-22 Thread michalsenkyr
GitHub user michalsenkyr opened a pull request:

https://github.com/apache/spark/pull/22527

[SPARK-17952][SQL] Nested Java beans support in createDataFrame

## What changes were proposed in this pull request?

When constructing a DataFrame from a Java bean, using nested beans throws 
an error despite 
[documentation](http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection)
 stating otherwise. This PR aims to add that support.

This PR does not yet add nested beans support in array or List fields. This 
can be added later or in another PR.

## How was this patch tested?

Nested bean was added to the appropriate unit test.

Also manually tested in Spark shell on code emulating the referenced JIRA:

```
scala> import scala.beans.BeanProperty
import scala.beans.BeanProperty

scala> class SubCategory(@BeanProperty var id: String, @BeanProperty var 
name: String) extends Serializable
defined class SubCategory

scala> class Category(@BeanProperty var id: String, @BeanProperty var 
subCategory: SubCategory) extends Serializable
defined class Category

scala> import scala.collection.JavaConverters._
import scala.collection.JavaConverters._

scala> spark.createDataFrame(Seq(new Category("s-111", new 
SubCategory("sc-111", "Sub-1"))).asJava, classOf[Category])
java.lang.IllegalArgumentException: The value (SubCategory@65130cf2) of the 
type (SubCategory) cannot be converted to struct
  at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
  at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
  at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
  at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
  at 
org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1108)
  at 
org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1108)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at 
org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1108)
  at 
org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1106)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
  at scala.collection.Iterator$class.toStream(Iterator.scala:1320)
  at scala.collection.AbstractIterator.toStream(Iterator.scala:1334)
  at scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298)
  at scala.collection.AbstractIterator.toSeq(Iterator.scala:1334)
  at 
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:423)
  ... 51 elided
```

New behavior:

```
scala> spark.createDataFrame(Seq(new Category("s-111", new 
SubCategory("sc-111", "Sub-1"))).asJava, classOf[Category])
res0: org.apache.spark.sql.DataFrame = [id: string, subCategory: struct]

scala> res0.show()
+-+---+
|   id|subCategory|
+-+---+
|s-111|[sc-111, Sub-1]|
+-+---+
```



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michalsenkyr/spark SPARK-17952

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22527.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22527


commit ccea758b069c4622e9b1f71b92167c81cfcd81b8
Author: Michal Senkyr 
Date:   2018-09-22T18:25:36Z

Add nested Java beans support to SQLContext.beansToRow




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org