[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...

2018-10-29 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22646#discussion_r229099482
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1115,9 +1126,38 @@ object SQLContext {
 })
 }
 }
-def createConverter(cls: Class[_], dataType: DataType): Any => Any = 
dataType match {
-  case struct: StructType => createStructConverter(cls, 
struct.map(_.dataType))
-  case _ => CatalystTypeConverters.createToCatalystConverter(dataType)
+def createConverter(t: Type, dataType: DataType): Any => Any = (t, 
dataType) match {
+  case (cls: Class[_], struct: StructType) =>
--- End diff --

Frankly, I was not really sure about serializing sets as arrays as the 
result stops behaving like a set, but I found a PR (#18416) where this seems to 
have been permitted, so I will go ahead and add that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...

2018-10-29 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22646#discussion_r229093388
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1115,9 +1126,38 @@ object SQLContext {
 })
 }
 }
-def createConverter(cls: Class[_], dataType: DataType): Any => Any = 
dataType match {
-  case struct: StructType => createStructConverter(cls, 
struct.map(_.dataType))
-  case _ => CatalystTypeConverters.createToCatalystConverter(dataType)
+def createConverter(t: Type, dataType: DataType): Any => Any = (t, 
dataType) match {
--- End diff --

I took a quick look at `CatalystTypeConverters` and I believe there would 
be a problem in not being able to reliably distinguish Java beans from other 
arbitrary classes. We might use setters or set fields directly to objects which 
would not be prepared for such manipulation, potentially creating hard to find 
errors. This method already assumes a Java bean so that problem is not present 
here. Isn't that so?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...

2018-10-13 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22646#discussion_r224962460
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1115,9 +1126,38 @@ object SQLContext {
 })
 }
 }
-def createConverter(cls: Class[_], dataType: DataType): Any => Any = 
dataType match {
-  case struct: StructType => createStructConverter(cls, 
struct.map(_.dataType))
-  case _ => CatalystTypeConverters.createToCatalystConverter(dataType)
+def createConverter(t: Type, dataType: DataType): Any => Any = (t, 
dataType) match {
+  case (cls: Class[_], struct: StructType) =>
+// bean type
+createStructConverter(cls, struct.map(_.dataType))
+  case (arrayType: Class[_], array: ArrayType) if arrayType.isArray =>
+// array type
+val converter = createConverter(arrayType.getComponentType, 
array.elementType)
+value => new GenericArrayData(
+  (0 until JavaArray.getLength(value)).map(i =>
+converter(JavaArray.get(value, i))).toArray)
+  case (_, array: ArrayType) =>
+// java.util.List type
+val cls = classOf[java.util.List[_]]
--- End diff --

I think you are right. It should be better to change it to avoid confusion. 
I also agree with a separate PR for that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...

2018-10-07 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22646#discussion_r223212772
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1098,12 +1099,19 @@ object SQLContext {
   data: Iterator[_],
   beanClass: Class[_],
   attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
+import scala.collection.JavaConverters._
+import java.lang.reflect.{Type, ParameterizedType, Array => JavaArray}
--- End diff --

I didn't want to needlessly add those to the whole file as the reflection 
stuff is needed only in this method. Ditto with collection converters. But if 
you think it is better at the top, I'll move it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...

2018-10-07 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22646#discussion_r223212724
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1115,8 +1123,31 @@ object SQLContext {
 })
 }
 }
-def createConverter(cls: Class[_], dataType: DataType): Any => Any = 
dataType match {
-  case struct: StructType => createStructConverter(cls, 
struct.map(_.dataType))
+def createConverter(t: Type, dataType: DataType): Any => Any = (t, 
dataType) match {
+  case (cls: Class[_], struct: StructType) =>
+createStructConverter(cls, struct.map(_.dataType))
+  case (arrayType: Class[_], array: ArrayType) =>
+val converter = createConverter(arrayType.getComponentType, 
array.elementType)
+value => new GenericArrayData(
+  (0 until JavaArray.getLength(value)).map(i =>
+converter(JavaArray.get(value, i))).toArray)
+  case (_, array: ArrayType) =>
--- End diff --

Sorry, I should have added a check for `cls.isArray` in the array case. 
That would make it clearer. I will also add a comment to each case with the 
actual type expected for that conversion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22527: [SPARK-17952][SQL] Nested Java beans support in createDa...

2018-10-05 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/22527
  
Thanks! I created a new PR with array, list and map support.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22646: Support for nested JavaBean arrays, lists and map...

2018-10-05 Thread michalsenkyr
GitHub user michalsenkyr opened a pull request:

https://github.com/apache/spark/pull/22646

Support for nested JavaBean arrays, lists and maps in createDataFrame

## What changes were proposed in this pull request?

Continuing from #22527, this PR seeks to add support for beans in array, 
list and map fields when creating DataFrames from Java beans.

## How was this patch tested?

Appropriate unit tests were amended.

Also manually tested in Spark shell:

```
scala> import scala.beans.BeanProperty
import scala.beans.BeanProperty

scala> class Nested(@BeanProperty var i: Int) extends Serializable
defined class Nested

scala> class Test(@BeanProperty var array: Array[Nested], @BeanProperty var 
list: java.util.List[Nested], @BeanProperty var map: java.util.Map[Integer, 
Nested]) extends Serializable
defined class Test

scala> import scala.collection.JavaConverters._
import scala.collection.JavaConverters._

scala> val array = Array(new Nested(1))
array: Array[Nested] = Array(Nested@757ad227)

scala> val list = Seq(new Nested(2), new Nested(3)).asJava
list: java.util.List[Nested] = [Nested@633dce39, Nested@4dd28982]

scala> val map = Map(Int.box(1) -> new Nested(4), Int.box(2) -> new 
Nested(5)).asJava
map: java.util.Map[Integer,Nested] = {1=Nested@57421e4e, 2=Nested@5a75bad4}

scala> val df = spark.createDataFrame(Seq(new Test(array, list, 
map)).asJava, classOf[Test])
df: org.apache.spark.sql.DataFrame = [array: array>, list: 
array> ... 1 more field]

scala> df.show()
+-+--++
|array|  list| map|
+-+--++
|[[1]]|[[2], [3]]|[1 -> [4], 2 -> [5]]|
+-+--++
```

Previous behavior:

```
scala> val df = spark.createDataFrame(Seq(new Test(array, list, 
map)).asJava, classOf[Test])
java.lang.IllegalArgumentException: The value (Nested@3dedc8b8) of the type 
(Nested) cannot be converted to struct
  at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
  at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
  at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
  at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$1.apply(CatalystTypeConverters.scala:162)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:162)
  at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:154)
  at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
  at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
  at 
org.apache.spark.sql.SQLContext$$anonfun$createStructConverter$1$1$$anonfun$apply$1.apply(SQLContext.scala:1114)
  at 
org.apache.spark.sql.SQLContext$$anonfun$createStructConverter$1$1$$anonfun$apply$1.apply(SQLContext.scala:1113)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at 
org.apache.spark.sql.SQLContext$$anonfun$createStructConverter$1$1.apply(SQLContext.scala:1113)
  at 
org.apache.spark.sql.SQLContext$$anonfun$createStructConverter$1$1.apply(SQLContext.scala:1108)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
  at scala.collection.Iterator$class.toStream(Iterator.scala:1320)
  at scala.collection.AbstractIterator.toStream(Iterator.scala:1334)
  at scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298)

[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-10-04 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22527#discussion_r222817293
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1098,16 +1098,26 @@ object SQLContext {
   data: Iterator[_],
   beanClass: Class[_],
   attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
-val extractors =
-  
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
-val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
-  (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+def createStructConverter(cls: Class[_], fieldTypes: Seq[DataType]): 
Any => InternalRow = {
+  val methodConverters =
+
JavaTypeInference.getJavaBeanReadableProperties(cls).zip(fieldTypes)
+  .map { case (property, fieldType) =>
+val method = property.getReadMethod
+method -> createConverter(method.getReturnType, fieldType)
+  }
+  value =>
+if (value == null) null
+else new GenericInternalRow(
+  methodConverters.map { case (method, converter) =>
+converter(method.invoke(value))
+  })
--- End diff --

Done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22527: [SPARK-17952][SQL] Nested Java beans support in createDa...

2018-10-03 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/22527
  
@ueshin Yes. I am already working on array/list support. Will add maps as 
well. It shouldn't require a rewrite now that the code is restructured, just 
new cases in pattern match. So I think it's ok to do in another PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-10-03 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22527#discussion_r222415998
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1098,16 +1098,24 @@ object SQLContext {
   data: Iterator[_],
   beanClass: Class[_],
   attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
-val extractors =
-  
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
-val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
-  (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+def createStructConverter(cls: Class[_], fieldTypes: 
Iterator[DataType]): Any => InternalRow = {
+  val methodConverters =
+
JavaTypeInference.getJavaBeanReadableProperties(cls).iterator.zip(fieldTypes)
+  .map { case (property, fieldType) =>
+val method = property.getReadMethod
+method -> createConverter(method.getReturnType, fieldType)
+  }.toArray
+  value => new GenericInternalRow(
--- End diff --

You're right. Added. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-10-03 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22527#discussion_r222415649
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1098,16 +1098,24 @@ object SQLContext {
   data: Iterator[_],
   beanClass: Class[_],
   attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
-val extractors =
-  
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
-val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
-  (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+def createStructConverter(cls: Class[_], fieldTypes: 
Iterator[DataType]): Any => InternalRow = {
--- End diff --

I used `Iterator`s instead of `Seq`s in order to avoid creating 
intermediate collections. However, I agree it's more concise without that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22527: [SPARK-17952][SQL] Nested Java beans support in createDa...

2018-10-02 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/22527
  
I restructured the code in this commit to allow easier addition of 
array/list support in the future.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-10-02 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22527#discussion_r222079624
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1100,13 +1101,23 @@ object SQLContext {
   attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
 val extractors =
   
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
-val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
-  (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+val methodsToTypes = extractors.zip(attrs).map { case (e, attr) =>
+  (e, attr.dataType)
+}
+def invoke(element: Any)(tuple: (Method, DataType)): Any = tuple match 
{
--- End diff --

Good point. Thank you. Changed it in the latest commit


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-09-23 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/22527#discussion_r219691829
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -1100,13 +1101,24 @@ object SQLContext {
   attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
 val extractors =
   
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
-val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
-  (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
+val methodsToTypes = extractors.zip(attrs).map { case (e, attr) =>
+  (e, attr.dataType)
+}
+def invoke(element: Any)(tuple: (Method, DataType)): Any = tuple match 
{
+  case (e, structType: StructType) =>
+val value = e.invoke(element)
+val nestedExtractors = 
JavaTypeInference.getJavaBeanReadableProperties(value.getClass)
+.map(desc => desc.getName -> desc.getReadMethod)
+.toMap
+new GenericInternalRow(structType.map(nestedProperty =>
+  invoke(value)(nestedExtractors(nestedProperty.name) -> 
nestedProperty.dataType)
+).toArray)
--- End diff --

Right, we don't have to. Just checked and `JavaTypeInference.inferDataType` 
also uses `JavaTypeInference.getJavaBeanReadableProperties` so the order should 
be the same. Also double-checked manually in Spark shell with a more complex 
nested bean to be sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...

2018-09-22 Thread michalsenkyr
GitHub user michalsenkyr opened a pull request:

https://github.com/apache/spark/pull/22527

[SPARK-17952][SQL] Nested Java beans support in createDataFrame

## What changes were proposed in this pull request?

When constructing a DataFrame from a Java bean, using nested beans throws 
an error despite 
[documentation](http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection)
 stating otherwise. This PR aims to add that support.

This PR does not yet add nested beans support in array or List fields. This 
can be added later or in another PR.

## How was this patch tested?

Nested bean was added to the appropriate unit test.

Also manually tested in Spark shell on code emulating the referenced JIRA:

```
scala> import scala.beans.BeanProperty
import scala.beans.BeanProperty

scala> class SubCategory(@BeanProperty var id: String, @BeanProperty var 
name: String) extends Serializable
defined class SubCategory

scala> class Category(@BeanProperty var id: String, @BeanProperty var 
subCategory: SubCategory) extends Serializable
defined class Category

scala> import scala.collection.JavaConverters._
import scala.collection.JavaConverters._

scala> spark.createDataFrame(Seq(new Category("s-111", new 
SubCategory("sc-111", "Sub-1"))).asJava, classOf[Category])
java.lang.IllegalArgumentException: The value (SubCategory@65130cf2) of the 
type (SubCategory) cannot be converted to struct
  at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
  at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
  at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
  at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
  at 
org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1108)
  at 
org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1108)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at 
org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1108)
  at 
org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1106)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
  at scala.collection.Iterator$class.toStream(Iterator.scala:1320)
  at scala.collection.AbstractIterator.toStream(Iterator.scala:1334)
  at scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298)
  at scala.collection.AbstractIterator.toSeq(Iterator.scala:1334)
  at 
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:423)
  ... 51 elided
```

New behavior:

```
scala> spark.createDataFrame(Seq(new Category("s-111", new 
SubCategory("sc-111", "Sub-1"))).asJava, classOf[Category])
res0: org.apache.spark.sql.DataFrame = [id: string, subCategory: struct]

scala> res0.show()
+-+---+
|   id|subCategory|
+-+---+
|s-111|[sc-111, Sub-1]|
+-+---+
```



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michalsenkyr/spark SPARK-17952

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22527.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22527


commit ccea758b069c4622e9b1f71b92167c81cfcd81b8
Author: Michal Senkyr 
Date:   2018-09-22T18:25:36Z

Add nested Java beans support to SQLContext.beansToRow




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20505: [SPARK-23251][SQL] Add checks for collection element Enc...

2018-02-15 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/20505
  
Yes, that is the idea. Frankly, I am not that familiar with how the 
compiler resolves all the implicit parameters to say confidently what is going 
on. But here's my take:

I did a little more research and found out that the "diverging implicit 
expansion" error means the compiler is able to follow a resolution route that 
can be potentially infinite. I think it might be possible that the compiler is 
following my other implicit methods, trying to fit various collections inside V 
with the hope of satisfying the `<:<` condition some time in the future before 
finally giving up.

Just to be sure multi-level collections work with my change, I successfully 
tried this:
```
scala> implicitly[Encoder[Map[Seq[Map[String, Seq[Long]]], 
List[Array[Map[String, Int]]
res5: 
org.apache.spark.sql.Encoder[Map[Seq[Map[String,Seq[Long]]],List[Array[Map[String,Int]
 = class[value[0]: 
map<array<map<string,array>>,array<array<map<string,int>>>>]
```

One thing that doesn't make sense for me, however, is the information the 
compiler gives me when enabling implicit resolution logging via 
`-Xlog-implicits`:
```
scala> implicitly[Encoder[Map[String, Any]]]
:25: newCheckedSetEncoder is not a valid implicit value for 
org.apache.spark.sql.Encoder[Map[String,Any]] because:
hasMatchingSymbol reported error: polymorphic expression cannot be 
instantiated to expected type;
 found   : [T[_], E]org.apache.spark.sql.Encoder[T[E]]
 required: org.apache.spark.sql.Encoder[Map[String,Any]]
   implicitly[Encoder[Map[String, Any]]]
 ^
:25: newCheckedSetEncoder is not a valid implicit value for 
org.apache.spark.sql.Encoder[String] because:
hasMatchingSymbol reported error: polymorphic expression cannot be 
instantiated to expected type;
 found   : [T[_], E]org.apache.spark.sql.Encoder[T[E]]
 required: org.apache.spark.sql.Encoder[String]
   implicitly[Encoder[Map[String, Any]]]
 ^
:25: newCheckedMapEncoder is not a valid implicit value for 
org.apache.spark.sql.Encoder[String] because:
hasMatchingSymbol reported error: polymorphic expression cannot be 
instantiated to expected type;
 found   : [T[_, _], K, V]org.apache.spark.sql.Encoder[T[K,V]]
 required: org.apache.spark.sql.Encoder[String]
   implicitly[Encoder[Map[String, Any]]]
 ^
:25: newCheckedSequenceEncoder is not a valid implicit value for 
org.apache.spark.sql.Encoder[String] because:
hasMatchingSymbol reported error: polymorphic expression cannot be 
instantiated to expected type;
 found   : [T[_], E]org.apache.spark.sql.Encoder[T[E]]
 required: org.apache.spark.sql.Encoder[String]
   implicitly[Encoder[Map[String, Any]]]
 ^
:25: materializing requested 
reflect.runtime.universe.type.TypeTag[A] using 
`package`.this.materializeTypeTag[A](scala.reflect.runtime.`package`.universe)
   implicitly[Encoder[Map[String, Any]]]
 ^
:25: newCheckedMapEncoder is not a valid implicit value for 
org.apache.spark.sql.Encoder[E] because:
hasMatchingSymbol reported error: diverging implicit expansion for type 
org.apache.spark.sql.Encoder[K]
starting with method newStringEncoder in class SQLImplicits
   implicitly[Encoder[Map[String, Any]]]
 ^
:25: newCheckedSetEncoder is not a valid implicit value for 
org.apache.spark.sql.Encoder[Any] because:
hasMatchingSymbol reported error: ambiguous implicit values:
 both method newIntEncoder in class SQLImplicits of type => 
org.apache.spark.sql.Encoder[Int]
 and method newLongEncoder in class SQLImplicits of type => 
org.apache.spark.sql.Encoder[Long]
 match expected type org.apache.spark.sql.Encoder[E]
   implicitly[Encoder[Map[String, Any]]]
 ^
:25: newCheckedMapEncoder is not a valid implicit value for 
org.apache.spark.sql.Encoder[Any] because:
hasMatchingSymbol reported error: diverging implicit expansion for type 
org.apache.spark.sql.Encoder[K]
starting with method newStringEncoder in class SQLImplicits
   implicitly[Encoder[Map[String, Any]]]
 ^
:25: materializing requested 
reflect.runtime.universe.type.TypeTag[A] using 
`package`.this.materializeTypeTag[A](scala.reflect.runtime.`package`.universe)
   implicitly[Encoder[Map[String, Any]]]
 ^
:25: newCheckedMapEncoder is not a valid implicit value for 
org.apache.spark.sql.Encoder[E] because:
hasMatchingSymbol reported error: diverging implicit expansion for type 
org.apache.spark.sql.Encoder[K]
starting with method newStringEncoder in class SQLImplicits
   implicitly[Encoder[Map[String, 

[GitHub] spark pull request #20505: [SPARK-23251][SQL] Add checks for collection elem...

2018-02-11 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20505#discussion_r167437023
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala 
---
@@ -165,11 +165,15 @@ abstract class SQLImplicits extends 
LowPrioritySQLImplicits {
   def newProductSeqEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = 
ExpressionEncoder()
 
   /** @since 2.2.0 */
-  implicit def newSequenceEncoder[T <: Seq[_] : TypeTag]: Encoder[T] = 
ExpressionEncoder()
+  implicit def newSequenceEncoder[T[_], E : Encoder]
--- End diff --

Just letting you know that I updated this PR with the above fix. MiMa is 
now passing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20505: [SPARK-23251][SQL] Add checks for collection elem...

2018-02-05 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/20505#discussion_r165903346
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala 
---
@@ -165,11 +165,15 @@ abstract class SQLImplicits extends 
LowPrioritySQLImplicits {
   def newProductSeqEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = 
ExpressionEncoder()
 
   /** @since 2.2.0 */
-  implicit def newSequenceEncoder[T <: Seq[_] : TypeTag]: Encoder[T] = 
ExpressionEncoder()
+  implicit def newSequenceEncoder[T[_], E : Encoder]
--- End diff --

Looks like we are. I can add new methods and make the old ones not 
implicit. That should fix MiMa. Although that might add to the clutter that's 
already in this class. Is that OK? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20505: [SPARK-23251][SQL] Add checks for collection elem...

2018-02-04 Thread michalsenkyr
GitHub user michalsenkyr opened a pull request:

https://github.com/apache/spark/pull/20505

[SPARK-23251][SQL] Add checks for collection element Encoders

Implicit methods of `SQLImplicits` providing Encoders for collections did 
not check for
Encoders for their elements. This resulted in unrelated error messages 
during run-time instead of proper failures during compilation.

## What changes were proposed in this pull request?

`Seq`, `Map` and `Set` `Encoder` providers' type parameters and implicit 
parameters were modified to facilitate the appropriate checks.

Unfortunately, this doesn't result in the desired "Unable to find encoder 
for type stored in a Dataset" error as the Scala compiler generates a different 
error when an implicit cannot be found due to missing arguments (see 
[ContextErrors](https://github.com/scala/scala/blob/v2.11.12/src/compiler/scala/tools/nsc/typechecker/ContextErrors.scala#L77)).
 `@implicitNotFound` is not used in this case.

## How was this patch tested?

New behavior:
```scala
scala> implicitly[Encoder[Map[String, Any]]]
:25: error: diverging implicit expansion for type 
org.apache.spark.sql.Encoder[Map[String,Any]]
starting with method newStringEncoder in class SQLImplicits
   implicitly[Encoder[Map[String, Any]]]
 ^
```

Old behavior:
```scala
scala> implicitly[Encoder[Map[String, Any]]]
java.lang.ClassNotFoundException: scala.Any
  at 
scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:348)
  at 
scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555)
  at 
scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1211)
  at 
scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1203)
  at 
scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toJava$1.apply(TwoWayCaches.scala:49)
  at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
  at 
scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
  at 
scala.reflect.runtime.TwoWayCaches$TwoWayCache.toJava(TwoWayCaches.scala:44)
  at 
scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1203)
  at 
scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:194)
  at 
scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
  at 
org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:700)
  at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:84)
  at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:65)
  at 
scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
  at 
org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
  at 
org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
  at 
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:64)
  at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:512)
  at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445)
  at 
scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
  at 
org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
  at 
org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
  at 
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445)
  at 
org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:434)
  at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
  at org.apache.spark.sql.SQLImplicits.newMapEncoder(SQLImplicits.scala:172)
  ... 49 elided
```

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michalsenkyr/spark SPARK-23251

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20505

[GitHub] spark pull request #16986: [SPARK-18891][SQL] Support for Scala Map collecti...

2017-06-10 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16986#discussion_r121266969
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala ---
@@ -258,6 +265,80 @@ class DatasetPrimitiveSuite extends QueryTest with 
SharedSQLContext {
   ListClass(List(1)) -> Queue("test" -> SeqClass(Seq(2
   }
 
+  test("arbitrary maps") {
+checkDataset(Seq(Map(1 -> 2)).toDS(), Map(1 -> 2))
+checkDataset(Seq(Map(1.toLong -> 2.toLong)).toDS(), Map(1.toLong -> 
2.toLong))
+checkDataset(Seq(Map(1.toDouble -> 2.toDouble)).toDS(), Map(1.toDouble 
-> 2.toDouble))
+checkDataset(Seq(Map(1.toFloat -> 2.toFloat)).toDS(), Map(1.toFloat -> 
2.toFloat))
+checkDataset(Seq(Map(1.toByte -> 2.toByte)).toDS(), Map(1.toByte -> 
2.toByte))
+checkDataset(Seq(Map(1.toShort -> 2.toShort)).toDS(), Map(1.toShort -> 
2.toShort))
+checkDataset(Seq(Map(true -> false)).toDS(), Map(true -> false))
+checkDataset(Seq(Map("test1" -> "test2")).toDS(), Map("test1" -> 
"test2"))
+checkDataset(Seq(Map(Tuple1(1) -> Tuple1(2))).toDS(), Map(Tuple1(1) -> 
Tuple1(2)))
+checkDataset(Seq(Map(1 -> Tuple1(2))).toDS(), Map(1 -> Tuple1(2)))
+checkDataset(Seq(Map("test" -> 2.toLong)).toDS(), Map("test" -> 
2.toLong))
+
+checkDataset(Seq(LHMap(1 -> 2)).toDS(), LHMap(1 -> 2))
+checkDataset(Seq(LHMap(1.toLong -> 2.toLong)).toDS(), LHMap(1.toLong 
-> 2.toLong))
+checkDataset(Seq(LHMap(1.toDouble -> 2.toDouble)).toDS(), 
LHMap(1.toDouble -> 2.toDouble))
+checkDataset(Seq(LHMap(1.toFloat -> 2.toFloat)).toDS(), 
LHMap(1.toFloat -> 2.toFloat))
+checkDataset(Seq(LHMap(1.toByte -> 2.toByte)).toDS(), LHMap(1.toByte 
-> 2.toByte))
+checkDataset(Seq(LHMap(1.toShort -> 2.toShort)).toDS(), 
LHMap(1.toShort -> 2.toShort))
+checkDataset(Seq(LHMap(true -> false)).toDS(), LHMap(true -> false))
+checkDataset(Seq(LHMap("test1" -> "test2")).toDS(), LHMap("test1" -> 
"test2"))
+checkDataset(Seq(LHMap(Tuple1(1) -> Tuple1(2))).toDS(), 
LHMap(Tuple1(1) -> Tuple1(2)))
+checkDataset(Seq(LHMap(1 -> Tuple1(2))).toDS(), LHMap(1 -> Tuple1(2)))
+checkDataset(Seq(LHMap("test" -> 2.toLong)).toDS(), LHMap("test" -> 
2.toLong))
--- End diff --

Added as a separate test case (same as sequences)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16986: [SPARK-18891][SQL] Support for Scala Map collecti...

2017-06-10 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16986#discussion_r121265890
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala ---
@@ -258,6 +265,80 @@ class DatasetPrimitiveSuite extends QueryTest with 
SharedSQLContext {
   ListClass(List(1)) -> Queue("test" -> SeqClass(Seq(2
   }
 
+  test("arbitrary maps") {
+checkDataset(Seq(Map(1 -> 2)).toDS(), Map(1 -> 2))
+checkDataset(Seq(Map(1.toLong -> 2.toLong)).toDS(), Map(1.toLong -> 
2.toLong))
+checkDataset(Seq(Map(1.toDouble -> 2.toDouble)).toDS(), Map(1.toDouble 
-> 2.toDouble))
+checkDataset(Seq(Map(1.toFloat -> 2.toFloat)).toDS(), Map(1.toFloat -> 
2.toFloat))
+checkDataset(Seq(Map(1.toByte -> 2.toByte)).toDS(), Map(1.toByte -> 
2.toByte))
+checkDataset(Seq(Map(1.toShort -> 2.toShort)).toDS(), Map(1.toShort -> 
2.toShort))
+checkDataset(Seq(Map(true -> false)).toDS(), Map(true -> false))
+checkDataset(Seq(Map("test1" -> "test2")).toDS(), Map("test1" -> 
"test2"))
+checkDataset(Seq(Map(Tuple1(1) -> Tuple1(2))).toDS(), Map(Tuple1(1) -> 
Tuple1(2)))
+checkDataset(Seq(Map(1 -> Tuple1(2))).toDS(), Map(1 -> Tuple1(2)))
+checkDataset(Seq(Map("test" -> 2.toLong)).toDS(), Map("test" -> 
2.toLong))
+
+checkDataset(Seq(LHMap(1 -> 2)).toDS(), LHMap(1 -> 2))
+checkDataset(Seq(LHMap(1.toLong -> 2.toLong)).toDS(), LHMap(1.toLong 
-> 2.toLong))
+checkDataset(Seq(LHMap(1.toDouble -> 2.toDouble)).toDS(), 
LHMap(1.toDouble -> 2.toDouble))
+checkDataset(Seq(LHMap(1.toFloat -> 2.toFloat)).toDS(), 
LHMap(1.toFloat -> 2.toFloat))
+checkDataset(Seq(LHMap(1.toByte -> 2.toByte)).toDS(), LHMap(1.toByte 
-> 2.toByte))
+checkDataset(Seq(LHMap(1.toShort -> 2.toShort)).toDS(), 
LHMap(1.toShort -> 2.toShort))
+checkDataset(Seq(LHMap(true -> false)).toDS(), LHMap(true -> false))
+checkDataset(Seq(LHMap("test1" -> "test2")).toDS(), LHMap("test1" -> 
"test2"))
+checkDataset(Seq(LHMap(Tuple1(1) -> Tuple1(2))).toDS(), 
LHMap(Tuple1(1) -> Tuple1(2)))
+checkDataset(Seq(LHMap(1 -> Tuple1(2))).toDS(), LHMap(1 -> Tuple1(2)))
+checkDataset(Seq(LHMap("test" -> 2.toLong)).toDS(), LHMap("test" -> 
2.toLong))
+  }
+
+  ignore("SPARK-19104: map and product combinations") {
--- End diff --

I added these tests for issue [SPARK-19104 CompileException with Map and 
Case Class in Spark 2.1.0](https://issues.apache.org/jira/browse/SPARK-19104) 
as I thought I could fix it as part of this PR. However, I found out that it 
was a more complicated issue than I anticipated so I left the tests there and 
ignored them. I can remove them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18009: [SPARK-18891][SQL] Support for specific Java List...

2017-06-10 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/18009#discussion_r121263319
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala ---
@@ -28,6 +28,8 @@ case class SeqClass(s: Seq[Int])
 
 case class ListClass(l: List[Int])
 
+case class JListClass(l: java.util.List[Int])
--- End diff --

I did not notice that there is separate inference code for Java classes. It 
would certainly be nice to unify the code for Java and Scala classes. Is this 
already being worked on/planned?

I moved the `List` support to `JavaTypeInference` and rewrote tests 
accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16986: [SPARK-18891][SQL] Support for Scala Map collection type...

2017-06-04 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16986
  
So I tried to simplify the code as much as possible, removing unneeded 
parameters. I must admit I am not entirely sure about whether I am handling all 
the data types correctly but everything seems to work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16986: [SPARK-18891][SQL] Support for Scala Map collecti...

2017-06-04 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16986#discussion_r120010531
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -652,6 +653,299 @@ case class MapObjects private(
   }
 }
 
+object CollectObjectsToMap {
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+
+  /**
+   * Construct an instance of CollectObjects case class.
+   *
+   * @param keyFunction The function applied on the key collection 
elements.
+   * @param keyInputData An expression that when evaluated returns a key 
collection object.
+   * @param keyElementType The data type of key elements in the collection.
+   * @param valueFunction The function applied on the value collection 
elements.
+   * @param valueInputData An expression that when evaluated returns a 
value collection object.
+   * @param valueElementType The data type of value elements in the 
collection.
+   * @param collClass The type of the resulting collection.
+   */
+  def apply(
+  keyFunction: Expression => Expression,
+  keyInputData: Expression,
+  keyElementType: DataType,
+  valueFunction: Expression => Expression,
+  valueInputData: Expression,
+  valueElementType: DataType,
+  collClass: Class[_]): CollectObjectsToMap = {
+val id = curId.getAndIncrement()
+val keyLoopValue = s"CollectObjectsToMap_keyLoopValue$id"
+val keyLoopIsNull = s"CollectObjectsToMap_keyLoopIsNull$id"
+val keyLoopVar = LambdaVariable(keyLoopValue, keyLoopIsNull, 
keyElementType)
+val valueLoopValue = s"CollectObjectsToMap_valueLoopValue$id"
+val valueLoopIsNull = s"CollectObjectsToMap_valueLoopIsNull$id"
+val valueLoopVar = LambdaVariable(valueLoopValue, valueLoopIsNull, 
valueElementType)
+val tupleLoopVar = s"CollectObjectsToMap_tupleLoopValue$id"
+val builderValue = s"CollectObjectsToMap_builderValue$id"
+CollectObjectsToMap(
+  keyLoopValue, keyLoopIsNull, keyElementType, 
keyFunction(keyLoopVar), keyInputData,
+  valueLoopValue, valueLoopIsNull, valueElementType, 
valueFunction(valueLoopVar),
+  valueInputData,
+  tupleLoopVar, collClass, builderValue)
+  }
+}
+
+/**
+ * An equivalent to the [[MapObjects]] case class but returning an 
ObjectType containing
+ * a Scala collection constructed using the associated builder, obtained 
by calling `newBuilder`
+ * on the collection's companion object.
+ *
+ * @param keyLoopValue the name of the loop variable that is used when 
iterating over the key
+ * collection, and which is used as input for the 
`keyLambdaFunction`
+ * @param keyLoopIsNull the nullability of the loop variable that is used 
when iterating over
+ *  the key collection, and which is used as input for 
the `keyLambdaFunction`
+ * @param keyLoopVarDataType the data type of the loop variable that is 
used when iterating over
+ *   the key collection, and which is used as 
input for the
+ *   `keyLambdaFunction`
+ * @param keyLambdaFunction A function that takes the `keyLoopVar` as 
input, and is used as
+ *  a lambda function to handle collection 
elements.
+ * @param keyInputData An expression that when evaluated returns a 
collection object.
+ * @param valueLoopValue the name of the loop variable that is used when 
iterating over the value
+ *   collection, and which is used as input for the 
`valueLambdaFunction`
+ * @param valueLoopIsNull the nullability of the loop variable that is 
used when iterating over
+ *the value collection, and which is used as input 
for the
+ *`valueLambdaFunction`
+ * @param valueLoopVarDataType the data type of the loop variable that is 
used when iterating over
+ * the value collection, and which is used as 
input for the
+ * `valueLambdaFunction`
+ * @param valueLambdaFunction A function that takes the `valueLoopVar` as 
input, and is used as
+ *a lambda function to handle collection 
elements.
+ * @param valueInputData An expression that when evaluated returns a 
collection object.
+ * @param tupleLoopValue the name of the loop variable that holds the 
tuple to be added to the
+  *  resulting map (used only for Scala Map)
+ * @param collClass The type of the resulting collection.
+ * @param builderValue The name of the builder variable used to construct 
the resultin

[GitHub] spark pull request #16986: [SPARK-18891][SQL] Support for Scala Map collecti...

2017-06-04 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16986#discussion_r120010461
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -652,6 +653,299 @@ case class MapObjects private(
   }
 }
 
+object CollectObjectsToMap {
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+
+  /**
+   * Construct an instance of CollectObjects case class.
+   *
+   * @param keyFunction The function applied on the key collection 
elements.
+   * @param keyInputData An expression that when evaluated returns a key 
collection object.
+   * @param keyElementType The data type of key elements in the collection.
+   * @param valueFunction The function applied on the value collection 
elements.
+   * @param valueInputData An expression that when evaluated returns a 
value collection object.
+   * @param valueElementType The data type of value elements in the 
collection.
+   * @param collClass The type of the resulting collection.
+   */
+  def apply(
+  keyFunction: Expression => Expression,
+  keyInputData: Expression,
+  keyElementType: DataType,
+  valueFunction: Expression => Expression,
+  valueInputData: Expression,
+  valueElementType: DataType,
+  collClass: Class[_]): CollectObjectsToMap = {
+val id = curId.getAndIncrement()
+val keyLoopValue = s"CollectObjectsToMap_keyLoopValue$id"
+val keyLoopIsNull = s"CollectObjectsToMap_keyLoopIsNull$id"
+val keyLoopVar = LambdaVariable(keyLoopValue, keyLoopIsNull, 
keyElementType)
+val valueLoopValue = s"CollectObjectsToMap_valueLoopValue$id"
+val valueLoopIsNull = s"CollectObjectsToMap_valueLoopIsNull$id"
+val valueLoopVar = LambdaVariable(valueLoopValue, valueLoopIsNull, 
valueElementType)
+val tupleLoopVar = s"CollectObjectsToMap_tupleLoopValue$id"
+val builderValue = s"CollectObjectsToMap_builderValue$id"
+CollectObjectsToMap(
+  keyLoopValue, keyLoopIsNull, keyElementType, 
keyFunction(keyLoopVar), keyInputData,
+  valueLoopValue, valueLoopIsNull, valueElementType, 
valueFunction(valueLoopVar),
+  valueInputData,
+  tupleLoopVar, collClass, builderValue)
+  }
+}
+
+/**
+ * An equivalent to the [[MapObjects]] case class but returning an 
ObjectType containing
+ * a Scala collection constructed using the associated builder, obtained 
by calling `newBuilder`
+ * on the collection's companion object.
+ *
+ * @param keyLoopValue the name of the loop variable that is used when 
iterating over the key
+ * collection, and which is used as input for the 
`keyLambdaFunction`
+ * @param keyLoopIsNull the nullability of the loop variable that is used 
when iterating over
+ *  the key collection, and which is used as input for 
the `keyLambdaFunction`
+ * @param keyLoopVarDataType the data type of the loop variable that is 
used when iterating over
+ *   the key collection, and which is used as 
input for the
+ *   `keyLambdaFunction`
+ * @param keyLambdaFunction A function that takes the `keyLoopVar` as 
input, and is used as
+ *  a lambda function to handle collection 
elements.
+ * @param keyInputData An expression that when evaluated returns a 
collection object.
+ * @param valueLoopValue the name of the loop variable that is used when 
iterating over the value
+ *   collection, and which is used as input for the 
`valueLambdaFunction`
+ * @param valueLoopIsNull the nullability of the loop variable that is 
used when iterating over
+ *the value collection, and which is used as input 
for the
+ *`valueLambdaFunction`
+ * @param valueLoopVarDataType the data type of the loop variable that is 
used when iterating over
+ * the value collection, and which is used as 
input for the
+ * `valueLambdaFunction`
+ * @param valueLambdaFunction A function that takes the `valueLoopVar` as 
input, and is used as
+ *a lambda function to handle collection 
elements.
+ * @param valueInputData An expression that when evaluated returns a 
collection object.
+ * @param tupleLoopValue the name of the loop variable that holds the 
tuple to be added to the
+  *  resulting map (used only for Scala Map)
+ * @param collClass The type of the resulting collection.
+ * @param builderValue The name of the builder variable used to construct 
the resultin

[GitHub] spark pull request #16986: [SPARK-18891][SQL] Support for Scala Map collecti...

2017-06-04 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16986#discussion_r120010289
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -652,6 +653,299 @@ case class MapObjects private(
   }
 }
 
+object CollectObjectsToMap {
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+
+  /**
+   * Construct an instance of CollectObjects case class.
+   *
+   * @param keyFunction The function applied on the key collection 
elements.
+   * @param keyInputData An expression that when evaluated returns a key 
collection object.
+   * @param keyElementType The data type of key elements in the collection.
+   * @param valueFunction The function applied on the value collection 
elements.
+   * @param valueInputData An expression that when evaluated returns a 
value collection object.
+   * @param valueElementType The data type of value elements in the 
collection.
+   * @param collClass The type of the resulting collection.
+   */
+  def apply(
+  keyFunction: Expression => Expression,
+  keyInputData: Expression,
+  keyElementType: DataType,
+  valueFunction: Expression => Expression,
+  valueInputData: Expression,
+  valueElementType: DataType,
+  collClass: Class[_]): CollectObjectsToMap = {
+val id = curId.getAndIncrement()
+val keyLoopValue = s"CollectObjectsToMap_keyLoopValue$id"
+val keyLoopIsNull = s"CollectObjectsToMap_keyLoopIsNull$id"
+val keyLoopVar = LambdaVariable(keyLoopValue, keyLoopIsNull, 
keyElementType)
+val valueLoopValue = s"CollectObjectsToMap_valueLoopValue$id"
+val valueLoopIsNull = s"CollectObjectsToMap_valueLoopIsNull$id"
+val valueLoopVar = LambdaVariable(valueLoopValue, valueLoopIsNull, 
valueElementType)
+val tupleLoopVar = s"CollectObjectsToMap_tupleLoopValue$id"
+val builderValue = s"CollectObjectsToMap_builderValue$id"
+CollectObjectsToMap(
+  keyLoopValue, keyLoopIsNull, keyElementType, 
keyFunction(keyLoopVar), keyInputData,
+  valueLoopValue, valueLoopIsNull, valueElementType, 
valueFunction(valueLoopVar),
+  valueInputData,
+  tupleLoopVar, collClass, builderValue)
+  }
+}
+
+/**
+ * An equivalent to the [[MapObjects]] case class but returning an 
ObjectType containing
+ * a Scala collection constructed using the associated builder, obtained 
by calling `newBuilder`
+ * on the collection's companion object.
+ *
+ * @param keyLoopValue the name of the loop variable that is used when 
iterating over the key
+ * collection, and which is used as input for the 
`keyLambdaFunction`
+ * @param keyLoopIsNull the nullability of the loop variable that is used 
when iterating over
+ *  the key collection, and which is used as input for 
the `keyLambdaFunction`
+ * @param keyLoopVarDataType the data type of the loop variable that is 
used when iterating over
+ *   the key collection, and which is used as 
input for the
+ *   `keyLambdaFunction`
+ * @param keyLambdaFunction A function that takes the `keyLoopVar` as 
input, and is used as
+ *  a lambda function to handle collection 
elements.
+ * @param keyInputData An expression that when evaluated returns a 
collection object.
+ * @param valueLoopValue the name of the loop variable that is used when 
iterating over the value
+ *   collection, and which is used as input for the 
`valueLambdaFunction`
+ * @param valueLoopIsNull the nullability of the loop variable that is 
used when iterating over
+ *the value collection, and which is used as input 
for the
+ *`valueLambdaFunction`
+ * @param valueLoopVarDataType the data type of the loop variable that is 
used when iterating over
+ * the value collection, and which is used as 
input for the
+ * `valueLambdaFunction`
+ * @param valueLambdaFunction A function that takes the `valueLoopVar` as 
input, and is used as
+ *a lambda function to handle collection 
elements.
+ * @param valueInputData An expression that when evaluated returns a 
collection object.
+ * @param tupleLoopValue the name of the loop variable that holds the 
tuple to be added to the
+  *  resulting map (used only for Scala Map)
+ * @param collClass The type of the resulting collection.
+ * @param builderValue The name of the builder variable used to construct 
the resultin

[GitHub] spark pull request #16986: [SPARK-18891][SQL] Support for Scala Map collecti...

2017-06-04 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16986#discussion_r120009967
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -652,6 +653,299 @@ case class MapObjects private(
   }
 }
 
+object CollectObjectsToMap {
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+
+  /**
+   * Construct an instance of CollectObjects case class.
+   *
+   * @param keyFunction The function applied on the key collection 
elements.
+   * @param keyInputData An expression that when evaluated returns a key 
collection object.
+   * @param keyElementType The data type of key elements in the collection.
+   * @param valueFunction The function applied on the value collection 
elements.
+   * @param valueInputData An expression that when evaluated returns a 
value collection object.
+   * @param valueElementType The data type of value elements in the 
collection.
+   * @param collClass The type of the resulting collection.
+   */
+  def apply(
+  keyFunction: Expression => Expression,
+  keyInputData: Expression,
+  keyElementType: DataType,
+  valueFunction: Expression => Expression,
+  valueInputData: Expression,
+  valueElementType: DataType,
+  collClass: Class[_]): CollectObjectsToMap = {
+val id = curId.getAndIncrement()
+val keyLoopValue = s"CollectObjectsToMap_keyLoopValue$id"
+val keyLoopIsNull = s"CollectObjectsToMap_keyLoopIsNull$id"
--- End diff --

Yes. A key in `MapData` cannot be null. However, since the function takes 
two `ArrayData`s as input, I figured that we shouldn't count on this 
requirement being necessarily fulfilled. As `CollectObjectsToMap` is a class 
separate from its usage in `ScalaReflection`, I tried to make it as generic and 
as similar to `MapObjects` as possible, so it can be used elsewhere without 
having to make sure additional preconditions are met.
It also produces a generic `Map` which has implementations that can support 
null keys. Right now, the only check that prevents this is 
[here](https://github.com/apache/spark/pull/16986/files/7af9b0625a245d34943b7192532c93f2aafac635#diff-e436c96ea839dfe446837ab2a3531f93R933).
 If there is ever a need to support these kinds of `Map`s in the future, this 
should make the job easier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16986: [SPARK-18891][SQL] Support for Scala Map collection type...

2017-05-22 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16986
  
That was because of my other PR that just got accepted. Just a matter of 
appending unit tests. I resolved the conflict from browser for now. Can rebase 
later if merge commits are not encouraged in PRs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16986: [SPARK-18891][SQL] Support for Map collection typ...

2017-05-18 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16986#discussion_r117363594
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -329,35 +329,19 @@ object ScalaReflection extends ScalaReflection {
 }
 UnresolvedMapObjects(mapFunction, getPath, Some(cls))
 
-  case t if t <:< localTypeOf[Map[_, _]] =>
+  case t if t <:< localTypeOf[Map[_, _]] || t <:< 
localTypeOf[java.util.Map[_, _]] =>
--- End diff --

Alright. Should I remove the case condition modifications in this PR or 
leave them as they are and remove them in the next one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18011: [SPARK-19089][SQL] Add support for nested sequenc...

2017-05-16 Thread michalsenkyr
GitHub user michalsenkyr opened a pull request:

https://github.com/apache/spark/pull/18011

[SPARK-19089][SQL] Add support for nested sequences

## What changes were proposed in this pull request?

Replaced specific sequence encoders with generic sequence encoder to enable 
nesting of sequences.

Does not add support for nested arrays as that cannot be solved in this way.

## How was this patch tested?

```bash
build/mvn -DskipTests clean package && dev/run-tests
```

Additionally in Spark shell:

```
scala> Seq(Seq(Seq(1))).toDS.collect()
res0: Array[Seq[Seq[Int]]] = Array(List(List(1)))
```

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michalsenkyr/spark dataset-seq-nested

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18011.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18011


commit dd3bf0113cbf66ebf784f68d7f602c39f4a46b8b
Author: Michal Senkyr <mike.sen...@gmail.com>
Date:   2017-05-17T00:18:41Z

Replace specific sequence encoders with generic sequence encoder




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18009: [SPARK-18891][SQL] Support for specific Java List...

2017-05-16 Thread michalsenkyr
GitHub user michalsenkyr opened a pull request:

https://github.com/apache/spark/pull/18009

[SPARK-18891][SQL] Support for specific Java List subtypes

## What changes were proposed in this pull request?

Add support for specific Java `List` subtypes in deserialization as well as 
a generic implicit encoder.

All `List` subtypes are supported by using either the size-specifying 
constructor (one `int` parameter) or the default constructor.

Interfaces/abstract classes use the following implementations:

* `java.util.List`, `java.util.AbstractList` or 
`java.util.AbstractSequentialList` => `java.util.ArrayList`

## How was this patch tested?

```bash
build/mvn -DskipTests clean package && dev/run-tests
```

Additionally in Spark shell:

```
scala> val jlist = new java.util.LinkedList[Int]; jlist.add(1)
jlist: java.util.LinkedList[Int] = [1]
res0: Boolean = true

scala> Seq(jlist).toDS().map(_.element()).collect()
res1: Array[Int] = Array(1)
```

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michalsenkyr/spark dataset-java-lists

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18009.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18009


commit 1cd6b6b4cdb0ff8a5cc42a105c7a8d4bc16fc0c1
Author: Michal Senkyr <mike.sen...@gmail.com>
Date:   2017-04-16T15:54:40Z

Add arbitrary Java List serialization/deserialization support




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16986: [SPARK-18891][SQL] Support for Map collection types

2017-04-16 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16986
  
Rebased onto the current master and integrated a few minor changes from the 
code review of #16541 in case anyone is still interested in this feature


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...

2017-03-30 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16541
  
@ueshin Thanks for the fix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...

2017-03-26 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16541
  
Thanks. Made the suggested changes in my latest commit.

I also encountered a minor problem when doing final testing. When using a 
collection type that is a type alias (e.g., scala.List), the companion object's 
`newBuilder` could not be found. Fixed it by dealiasing the collection type 
before obtaining the companion object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16541: [SPARK-19088][SQL] Optimize sequence type deseria...

2017-03-19 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16541#discussion_r106810993
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -589,6 +590,170 @@ case class MapObjects private(
   }
 }
 
+object CollectObjects {
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+
+  /**
+   * Construct an instance of CollectObjects case class.
+   *
+   * @param function The function applied on the collection elements.
+   * @param inputData An expression that when evaluated returns a 
collection object.
+   * @param elementType The data type of elements in the collection.
+   * @param collClass The type of the resulting collection.
+   */
+  def apply(
+ function: Expression => Expression,
+ inputData: Expression,
+ elementType: DataType,
+ collClass: Class[_]): CollectObjects = {
+val loopValue = "CollectObjects_loopValue" + curId.getAndIncrement()
+val loopIsNull = "CollectObjects_loopIsNull" + curId.getAndIncrement()
+val loopVar = LambdaVariable(loopValue, loopIsNull, elementType)
+val builderValue = "CollectObjects_builderValue" + 
curId.getAndIncrement()
--- End diff --

Altered vals in `MapObjects` to share the same `curId` instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16541: [SPARK-19088][SQL] Optimize sequence type deseria...

2017-03-19 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16541#discussion_r106810940
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SequenceBenchmark.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[Seq]], [[List]] and [[scala.collection.mutable.Queue]] 
serialization
+ * performance.
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.SequenceBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class SequenceBenchmark extends BenchmarkBase {
--- End diff --

Removed but it's still useful to know that the change didn't affect 
performance in any negative way. It introduced a different approach to 
collection construction after all. Should I also remove the results from PR 
description?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16541: [SPARK-19088][SQL] Optimize sequence type deseria...

2017-03-19 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16541#discussion_r106810789
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -589,6 +590,170 @@ case class MapObjects private(
   }
 }
 
+object CollectObjects {
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+
+  /**
+   * Construct an instance of CollectObjects case class.
+   *
+   * @param function The function applied on the collection elements.
+   * @param inputData An expression that when evaluated returns a 
collection object.
+   * @param elementType The data type of elements in the collection.
+   * @param collClass The type of the resulting collection.
+   */
+  def apply(
+ function: Expression => Expression,
+ inputData: Expression,
+ elementType: DataType,
+ collClass: Class[_]): CollectObjects = {
+val loopValue = "CollectObjects_loopValue" + curId.getAndIncrement()
+val loopIsNull = "CollectObjects_loopIsNull" + curId.getAndIncrement()
+val loopVar = LambdaVariable(loopValue, loopIsNull, elementType)
+val builderValue = "CollectObjects_builderValue" + 
curId.getAndIncrement()
+CollectObjects(loopValue, loopIsNull, elementType, function(loopVar), 
inputData,
+  collClass, builderValue)
+  }
+}
+
+/**
+ * An equivalent to the [[MapObjects]] case class but returning an 
ObjectType containing
+ * a Scala collection constructed using the associated builder, obtained 
by calling `newBuilder`
+ * on the collection's companion object.
+ *
+ * @param loopValue the name of the loop variable that used when iterate 
the collection, and used
+ *  as input for the `lambdaFunction`
+ * @param loopIsNull the nullity of the loop variable that used when 
iterate the collection, and
+ *   used as input for the `lambdaFunction`
+ * @param loopVarDataType the data type of the loop variable that used 
when iterate the collection,
+ *and used as input for the `lambdaFunction`
+ * @param lambdaFunction A function that take the `loopVar` as input, and 
used as lambda function
+ *   to handle collection elements.
+ * @param inputData An expression that when evaluated returns a collection 
object.
+ * @param collClass The type of the resulting collection.
+ * @param builderValue The name of the builder variable used to construct 
the resulting collection.
+ */
+case class CollectObjects private(
--- End diff --

Yes, we actually can. I merged `CollectObjects` into `MapObjects` in my 
next commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...

2017-03-15 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16541
  
That seems to be the case here, yes.

What about the other benefits I mentioned (adding support for Java `List`s 
and future Scala 2.13 compatibility)? I think the codegen is also more 
straightforward/clear (and much shorter).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...

2017-03-10 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16541
  
Well, technically yes. But I would say it's a little more than that.

The current approach to deserialization of `Seq`s is to copy the data into 
an array, construct a `WrappedArray` (which extends `Seq`) and optionally copy 
the data (again) into the new collection. This needs to go through all the 
elements twice if anything other than a `Seq` (or `WrappedArray` directly) is 
requested.
This PR takes a more straightforward approach and constructs a mutable 
collection builder (which is defined for every Scala collection), adds the 
elements to it and retrieves the result.

I assumed this would be a performance improvement and am quite surprised 
that there is no difference. But I think this might be due to the improvement 
being so small as to drown in the usual operation overhead. Sadly, I do not 
have the resources to measure the operation on larger amounts of data, having 
the benchmarks fail on larger collections on my setup.

I am also not that familiar with Spark's internals to determine whether 
@kiszk's suggestion would improve operations in other ways, eg. during 
operations in the cluster environment. That is why I decided to implement it 
but keep it separate from my proposed changes for the time being.

As to other benefits that this PR would bring other than peformance 
improvements:

* I would like to implement Java `List`s support in a manner similar to 
what I did with `Map`s in #16986 (I would also ask you to take a look at that 
when you have the time) by just slightly altering the code.
* I didn't know this when I implemented this but Scala 2.13 will introduce 
a major [collections 
rework](https://www.scala-lang.org/blog/2017/02/28/collections-rework.html) 
that will change the way the `to` method (used for conversions in the current 
solution) works. This will require a rewrite whereas I believe the method used 
here will remain largely the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...

2017-03-04 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16541
  
Also please note the [UnsafeArrayData-producing 
branch](https://github.com/michalsenkyr/spark/compare/dataset-seq-builder...michalsenkyr:dataset-seq-builder-unsafe)
 that is not yet merged into this branch. I'd like to get somebody's opinion on 
that before I do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...

2017-03-04 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16541
  
Would it be possible for somebody to review this PR for me? I have a few 
ideas that are dependent on this and I'd like to get to work on them. Most 
notably support for Java Lists.
Maybe @cloud-fan could take a look at this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16986: [SPARK-18891][SQL] Support for Map collection types

2017-02-26 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16986
  
Added support for Java Maps with support for pre-allocation (capacity 
argument on constructor) and sensible defaults for interfaces/abstract classes. 
Also includes implicit encoders.
Updated codegen in description (only a cosmetic change) and added codegen 
for Java Map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16986: [SPARK-18891][SQL] Support for Map collection typ...

2017-02-18 Thread michalsenkyr
GitHub user michalsenkyr opened a pull request:

https://github.com/apache/spark/pull/16986

[SPARK-18891][SQL] Support for Map collection types

## What changes were proposed in this pull request?

Add support for arbitrary Scala `Map` types in deserialization as well as a 
generic implicit encoder.

Used the builder approach as in #16541 to construct any provided `Map` type 
upon deserialization.

Please note that this PR also adds (ignored) tests for issue [SPARK-19104 
CompileException with Map and Case Class in Spark 
2.1.0](https://issues.apache.org/jira/browse/SPARK-19104) but doesn't solve it.

Resulting codegen for `Seq(Map(1 -> 
2)).toDS().map(identity).queryExecution.debug.codegen`:

```
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private boolean CollectObjectsToMap_loopIsNull1;
/* 010 */   private int CollectObjectsToMap_loopValue0;
/* 011 */   private boolean CollectObjectsToMap_loopIsNull3;
/* 012 */   private int CollectObjectsToMap_loopValue2;
/* 013 */   private UnsafeRow deserializetoobject_result;
/* 014 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
deserializetoobject_holder;
/* 015 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
deserializetoobject_rowWriter;
/* 016 */   private scala.collection.immutable.Map mapelements_argValue;
/* 017 */   private UnsafeRow mapelements_result;
/* 018 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
mapelements_holder;
/* 019 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
mapelements_rowWriter;
/* 020 */   private UnsafeRow serializefromobject_result;
/* 021 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
serializefromobject_holder;
/* 022 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
serializefromobject_rowWriter;
/* 023 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter 
serializefromobject_arrayWriter;
/* 024 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter 
serializefromobject_arrayWriter1;
/* 025 */
/* 026 */   public GeneratedIterator(Object[] references) {
/* 027 */ this.references = references;
/* 028 */   }
/* 029 */
/* 030 */   public void init(int index, scala.collection.Iterator[] inputs) 
{
/* 031 */ partitionIndex = index;
/* 032 */ this.inputs = inputs;
/* 033 */ wholestagecodegen_init_0();
/* 034 */ wholestagecodegen_init_1();
/* 035 */
/* 036 */   }
/* 037 */
/* 038 */   private void wholestagecodegen_init_0() {
/* 039 */ inputadapter_input = inputs[0];
/* 040 */
/* 041 */ deserializetoobject_result = new UnsafeRow(1);
/* 042 */ this.deserializetoobject_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result,
 32);
/* 043 */ this.deserializetoobject_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder,
 1);
/* 044 */
/* 045 */ mapelements_result = new UnsafeRow(1);
/* 046 */ this.mapelements_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result,
 32);
/* 047 */ this.mapelements_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder,
 1);
/* 048 */ serializefromobject_result = new UnsafeRow(1);
/* 049 */ this.serializefromobject_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result,
 32);
/* 050 */ this.serializefromobject_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder,
 1);
/* 051 */ this.serializefromobject_arrayWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 052 */
/* 053 */   }
/* 054 */
/* 055 */   private void wholestagecodegen_init_1() {
/* 056 */ this.serializefromobject_arrayWriter1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 057 */
/* 058 */   }
/* 059 */
/* 060 */   protected void processNext() throws java.io.IOException {
/* 061 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 062 */   InternalRow inputadapter_row = (InternalRow) 
inputadapter

[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...

2017-02-02 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16541
  
Apologies for taking so long.

I tried modifying the serialization logic as best as I could to serialize 
into `UnsafeArrayData` ([branch 
diff](https://github.com/michalsenkyr/spark/compare/dataset-seq-builder...michalsenkyr:dataset-seq-builder-unsafe)).
 I had to first convert into an array to use `fromPrimitiveArray` on the 
result. That's probably the reason why the benchmark came up slightly worse:

```
OpenJDK 64-Bit Server VM 1.8.0_121-b13 on Linux 4.9.6-1-ARCH
AMD A10-4600M APU with Radeon(tm) HD Graphics
collect: Best/Avg Time(ms)Rate(M/s)   
Per Row(ns)   Relative


Seq256 /  287  0,0  
255670,1   1,0X
List   161 /  220  0,0  
161091,7   1,6X
mutable.Queue  304 /  324  0,0  
303823,3   0,8X
```

I am not entirely sure how `GenericArrayData` and `UnsafeArrayData` is 
handled on transformations and shuffles though, so it's possible that more 
complex tests will reveal better performance. However, I'm not sure that I can 
test this properly on my single-machine setup. I'd definitely be interested in 
benchmark results on a cluster setup.

Generated code:

```
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private boolean CollectObjects_loopIsNull1;
/* 010 */   private int CollectObjects_loopValue0;
/* 011 */   private UnsafeRow deserializetoobject_result;
/* 012 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
deserializetoobject_holder;
/* 013 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
deserializetoobject_rowWriter;
/* 014 */   private scala.collection.immutable.List mapelements_argValue;
/* 015 */   private UnsafeRow mapelements_result;
/* 016 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
mapelements_holder;
/* 017 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
mapelements_rowWriter;
/* 018 */   private scala.collection.immutable.List 
serializefromobject_argValue;
/* 019 */   private UnsafeRow serializefromobject_result;
/* 020 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
serializefromobject_holder;
/* 021 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
serializefromobject_rowWriter;
/* 022 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter 
serializefromobject_arrayWriter;
/* 023 */
/* 024 */   public GeneratedIterator(Object[] references) {
/* 025 */ this.references = references;
/* 026 */   }
/* 027 */
/* 028 */   public void init(int index, scala.collection.Iterator[] inputs) 
{
/* 029 */ partitionIndex = index;
/* 030 */ this.inputs = inputs;
/* 031 */ inputadapter_input = inputs[0];
/* 032 */
/* 033 */ deserializetoobject_result = new UnsafeRow(1);
/* 034 */ this.deserializetoobject_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result,
 32);
/* 035 */ this.deserializetoobject_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder,
 1);
/* 036 */
/* 037 */ mapelements_result = new UnsafeRow(1);
/* 038 */ this.mapelements_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result,
 32);
/* 039 */ this.mapelements_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder,
 1);
/* 040 */
/* 041 */ serializefromobject_result = new UnsafeRow(1);
/* 042 */ this.serializefromobject_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result,
 32);
/* 043 */ this.serializefromobject_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder,
 1);
/* 044 */ this.serializefromobject_arrayWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 045 */
/* 046 */   }
/* 047 */
/* 048 */   protected void

[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...

2017-01-18 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16541
  
I added the benchmarks based on the code you provided but I am getting 
almost the same results before and after the optimization (see description). So 
either the added benefit is really small or I didn't write/tune the benchmarks 
quite right. I would appreciate if you could take a look at them.
I will also take a look at the `UnsafeArrayData` optimization later and try 
to include it in my next commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...

2017-01-15 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16541
  
Added benchmarks.

I didn't find any standardized way of benchmarking codegen so I wrote a 
simple script for Spark Shell. Benchmarks were run on a laptop so the 
collections couldn't be too large. Nevertheless, the benchmarks are 
consistently (even if not significantly) faster.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16546: [WIP][SQL] Put check in ExpressionEncoder.fromRow...

2017-01-12 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16546#discussion_r95927063
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -120,6 +120,32 @@ object ScalaReflection extends ScalaReflection {
   }
 
   /**
+   * Returns the element type for Seq[_] and its subclass.
+   */
+  def getElementTypeForSeq(t: `Type`): Option[`Type`] = {
+if (!(t <:< localTypeOf[Seq[_]])) {
+  return None
+}
+val TypeRef(_, _, elementTypeList) = t
--- End diff --

This would probably be better solved by using `match` and `typeParams`. 
Something like this:

```scala
t match {
  case TypeRef(_, _, Seq(elementType)) => Some(elementType)
  case _ =>
t.baseClasses.find { c =>
  val cType = c.asClass.toType
  cType <:< localTypeOf[Seq[_]] && cType.typeParams.nonEmpty
}.map(t.baseType(_).typeParams.head)
}
```

Also not sure whether types with more than one type parameter are handled 
correctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16541: [SPARK-19088][SQL] Optimize sequence type deseria...

2017-01-12 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16541#discussion_r95923927
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -589,6 +590,171 @@ case class MapObjects private(
   }
 }
 
+object CollectObjects {
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+
+  /**
+   * Construct an instance of CollectObjects case class.
+   *
+   * @param function The function applied on the collection elements.
+   * @param inputData An expression that when evaluated returns a 
collection object.
+   * @param elementType The data type of elements in the collection.
+   * @param collClass The type of the resulting collection.
+   */
+  def apply(
+ function: Expression => Expression,
+ inputData: Expression,
+ elementType: DataType,
+ collClass: Class[_]): CollectObjects = {
+val loopValue = "CollectObjects_loopValue" + curId.getAndIncrement()
+val loopIsNull = "CollectObjects_loopIsNull" + curId.getAndIncrement()
+val loopVar = LambdaVariable(loopValue, loopIsNull, elementType)
+val builderValue = "CollectObjects_builderValue" + 
curId.getAndIncrement()
+CollectObjects(loopValue, loopIsNull, elementType, function(loopVar), 
inputData,
+  collClass, builderValue)
+  }
+}
+
+/**
+ * An equivalent to the [[MapObjects]] case class but returning an 
ObjectType containing
+ * a Scala collection constructed using the associated builder, obtained 
by calling `newBuilder`
+ * on the collection's companion object.
+ *
+ * @param loopValue the name of the loop variable that used when iterate 
the collection, and used
+ *  as input for the `lambdaFunction`
+ * @param loopIsNull the nullity of the loop variable that used when 
iterate the collection, and
+ *   used as input for the `lambdaFunction`
+ * @param loopVarDataType the data type of the loop variable that used 
when iterate the collection,
+ *and used as input for the `lambdaFunction`
+ * @param lambdaFunction A function that take the `loopVar` as input, and 
used as lambda function
+ *   to handle collection elements.
+ * @param inputData An expression that when evaluated returns a collection 
object.
+ * @param collClass The type of the resulting collection.
+ * @param builderValue The name of the builder variable used to construct 
the resulting collection.
+ */
+case class CollectObjects private(
+loopValue: String,
+loopIsNull: String,
+loopVarDataType: DataType,
+lambdaFunction: Expression,
+inputData: Expression,
+collClass: Class[_],
+builderValue: String) extends Expression with NonSQLExpression {
+
+  override def nullable: Boolean = inputData.nullable
+
+  override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
+
+  override def eval(input: InternalRow): Any =
+throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+
+  override def dataType: DataType = ObjectType(collClass)
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val collObjectName = s"${collClass.getName}$$.MODULE$$"
+val getBuilderVar = s"$collObjectName.newBuilder()"
--- End diff --

Actually, I would have to modify `serializeFor` as you did 
[here](https://github.com/apache/spark/pull/16546/files#diff-89643554d9757dd3e91abff1cc6096c7L520)
 in order to extract the required element type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16541: [SPARK-19088][SQL] Optimize sequence type deseria...

2017-01-12 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16541#discussion_r95921320
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -589,6 +590,171 @@ case class MapObjects private(
   }
 }
 
+object CollectObjects {
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+
+  /**
+   * Construct an instance of CollectObjects case class.
+   *
+   * @param function The function applied on the collection elements.
+   * @param inputData An expression that when evaluated returns a 
collection object.
+   * @param elementType The data type of elements in the collection.
+   * @param collClass The type of the resulting collection.
+   */
+  def apply(
+ function: Expression => Expression,
+ inputData: Expression,
+ elementType: DataType,
+ collClass: Class[_]): CollectObjects = {
+val loopValue = "CollectObjects_loopValue" + curId.getAndIncrement()
+val loopIsNull = "CollectObjects_loopIsNull" + curId.getAndIncrement()
+val loopVar = LambdaVariable(loopValue, loopIsNull, elementType)
+val builderValue = "CollectObjects_builderValue" + 
curId.getAndIncrement()
+CollectObjects(loopValue, loopIsNull, elementType, function(loopVar), 
inputData,
+  collClass, builderValue)
+  }
+}
+
+/**
+ * An equivalent to the [[MapObjects]] case class but returning an 
ObjectType containing
+ * a Scala collection constructed using the associated builder, obtained 
by calling `newBuilder`
+ * on the collection's companion object.
+ *
+ * @param loopValue the name of the loop variable that used when iterate 
the collection, and used
+ *  as input for the `lambdaFunction`
+ * @param loopIsNull the nullity of the loop variable that used when 
iterate the collection, and
+ *   used as input for the `lambdaFunction`
+ * @param loopVarDataType the data type of the loop variable that used 
when iterate the collection,
+ *and used as input for the `lambdaFunction`
+ * @param lambdaFunction A function that take the `loopVar` as input, and 
used as lambda function
+ *   to handle collection elements.
+ * @param inputData An expression that when evaluated returns a collection 
object.
+ * @param collClass The type of the resulting collection.
+ * @param builderValue The name of the builder variable used to construct 
the resulting collection.
+ */
+case class CollectObjects private(
+loopValue: String,
+loopIsNull: String,
+loopVarDataType: DataType,
+lambdaFunction: Expression,
+inputData: Expression,
+collClass: Class[_],
+builderValue: String) extends Expression with NonSQLExpression {
+
+  override def nullable: Boolean = inputData.nullable
+
+  override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
+
+  override def eval(input: InternalRow): Any =
+throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+
+  override def dataType: DataType = ObjectType(collClass)
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val collObjectName = s"${collClass.getName}$$.MODULE$$"
+val getBuilderVar = s"$collObjectName.newBuilder()"
--- End diff --

I added the `Seq` builder fallback. However, there is presently no 
collection that Spark supports that doesn't provide a builder. You can try it 
out on your branch with `Range`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16541: [SPARK-19088][SQL] Optimize sequence type deseria...

2017-01-12 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16541#discussion_r95909808
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -589,6 +590,171 @@ case class MapObjects private(
   }
 }
 
+object CollectObjects {
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+
+  /**
+   * Construct an instance of CollectObjects case class.
+   *
+   * @param function The function applied on the collection elements.
+   * @param inputData An expression that when evaluated returns a 
collection object.
+   * @param elementType The data type of elements in the collection.
+   * @param collClass The type of the resulting collection.
+   */
+  def apply(
+ function: Expression => Expression,
+ inputData: Expression,
+ elementType: DataType,
+ collClass: Class[_]): CollectObjects = {
+val loopValue = "CollectObjects_loopValue" + curId.getAndIncrement()
+val loopIsNull = "CollectObjects_loopIsNull" + curId.getAndIncrement()
+val loopVar = LambdaVariable(loopValue, loopIsNull, elementType)
+val builderValue = "CollectObjects_builderValue" + 
curId.getAndIncrement()
+CollectObjects(loopValue, loopIsNull, elementType, function(loopVar), 
inputData,
+  collClass, builderValue)
+  }
+}
+
+/**
+ * An equivalent to the [[MapObjects]] case class but returning an 
ObjectType containing
+ * a Scala collection constructed using the associated builder, obtained 
by calling `newBuilder`
+ * on the collection's companion object.
+ *
+ * @param loopValue the name of the loop variable that used when iterate 
the collection, and used
+ *  as input for the `lambdaFunction`
+ * @param loopIsNull the nullity of the loop variable that used when 
iterate the collection, and
+ *   used as input for the `lambdaFunction`
+ * @param loopVarDataType the data type of the loop variable that used 
when iterate the collection,
+ *and used as input for the `lambdaFunction`
+ * @param lambdaFunction A function that take the `loopVar` as input, and 
used as lambda function
+ *   to handle collection elements.
+ * @param inputData An expression that when evaluated returns a collection 
object.
+ * @param collClass The type of the resulting collection.
+ * @param builderValue The name of the builder variable used to construct 
the resulting collection.
+ */
+case class CollectObjects private(
+loopValue: String,
+loopIsNull: String,
+loopVarDataType: DataType,
+lambdaFunction: Expression,
+inputData: Expression,
+collClass: Class[_],
+builderValue: String) extends Expression with NonSQLExpression {
+
+  override def nullable: Boolean = inputData.nullable
+
+  override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
+
+  override def eval(input: InternalRow): Any =
+throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+
+  override def dataType: DataType = ObjectType(collClass)
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val collObjectName = s"${collClass.getName}$$.MODULE$$"
+val getBuilderVar = s"$collObjectName.newBuilder()"
--- End diff --

Interesting. I didn't realize `Range` breaks that rule.
Furthermore, it would be practically impossible to deserialize back into 
`Range`. So I guess the best way to do this will be to use a general `Seq` 
builder if the collection doesn't provide its own.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...

2017-01-11 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16541
  
Added codegen comparison for a simple `List` dataset.
I will also prepare a benchmark and add some results later. Those will be 
for `List`, `mutable.Queue` and `Seq`. Where `List` and `mutable.Queue` should 
benefit from the change (one less pass) and `Seq` should be approximately same 
(as it is a supertype of `WrappedArray` and therefore skips the final 
conversion in the original approach).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16541: [SPARK-19088][SQL] Optimize sequence type deseria...

2017-01-11 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16541#discussion_r95667424
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -589,6 +590,171 @@ case class MapObjects private(
   }
 }
 
+object CollectObjects {
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+
+  /**
+   * Construct an instance of CollectObjects case class.
+   *
+   * @param function The function applied on the collection elements.
+   * @param inputData An expression that when evaluated returns a 
collection object.
+   * @param elementType The data type of elements in the collection.
+   * @param collClass The type of the resulting collection.
+   */
+  def apply(
+ function: Expression => Expression,
+ inputData: Expression,
+ elementType: DataType,
+ collClass: Class[_]): CollectObjects = {
+val loopValue = "CollectObjects_loopValue" + curId.getAndIncrement()
+val loopIsNull = "CollectObjects_loopIsNull" + curId.getAndIncrement()
+val loopVar = LambdaVariable(loopValue, loopIsNull, elementType)
+val builderValue = "CollectObjects_builderValue" + 
curId.getAndIncrement()
+CollectObjects(loopValue, loopIsNull, elementType, function(loopVar), 
inputData,
+  collClass, builderValue)
+  }
+}
+
+/**
+ * An equivalent to the [[MapObjects]] case class but returning an 
ObjectType containing
+ * a Scala collection constructed using the associated builder, obtained 
by calling `newBuilder`
+ * on the collection's companion object.
+ *
+ * @param loopValue the name of the loop variable that used when iterate 
the collection, and used
+ *  as input for the `lambdaFunction`
+ * @param loopIsNull the nullity of the loop variable that used when 
iterate the collection, and
+ *   used as input for the `lambdaFunction`
+ * @param loopVarDataType the data type of the loop variable that used 
when iterate the collection,
+ *and used as input for the `lambdaFunction`
+ * @param lambdaFunction A function that take the `loopVar` as input, and 
used as lambda function
+ *   to handle collection elements.
+ * @param inputData An expression that when evaluated returns a collection 
object.
+ * @param collClass The type of the resulting collection.
+ * @param builderValue The name of the builder variable used to construct 
the resulting collection.
+ */
+case class CollectObjects private(
+loopValue: String,
+loopIsNull: String,
+loopVarDataType: DataType,
+lambdaFunction: Expression,
+inputData: Expression,
+collClass: Class[_],
+builderValue: String) extends Expression with NonSQLExpression {
+
+  override def nullable: Boolean = inputData.nullable
+
+  override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
+
+  override def eval(input: InternalRow): Any =
+throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+
+  override def dataType: DataType = ObjectType(collClass)
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val collObjectName = s"${collClass.getName}$$.MODULE$$"
+val getBuilderVar = s"$collObjectName.newBuilder()"
--- End diff --

Yes, it does. As [`TraversableLike` 
documentation](http://www.scala-lang.org/api/2.12.1/scala/collection/TraversableLike.html)
 specifies:

> Collection classes mixing in this trait ... also need to provide a method 
**newBuilder** which creates a builder for collections of the same kind.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...

2017-01-10 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16541
  
Also, the new `CollectObjects` copies quite a bit of code from 
`MapObjects`. Should I move the code into a common trait in order to reduce 
duplicity or should I leave it as is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16541: [SPARK-19088][SQL] Optimize sequence type deseria...

2017-01-10 Thread michalsenkyr
GitHub user michalsenkyr opened a pull request:

https://github.com/apache/spark/pull/16541

[SPARK-19088][SQL] Optimize sequence type deserialization codegen

## What changes were proposed in this pull request?

Optimization of arbitrary Scala sequence deserialization introduced by 
#16240.

The previous implementation constructed an array which was then converted 
by `to`. This required two passes in most cases.

This implementation attempts to remedy that by using `Builder`s provided by 
the `newBuilder` method on every Scala collection's companion object to build 
the resulting collection directly.

## How was this patch tested?

```bash
./build/mvn -DskipTests clean package && ./dev/run-tests
```

Additionally in Spark Shell:

```scala
case class QueueClass(q: scala.collection.immutable.Queue[Int])

spark.createDataset(Seq(List(1,2,3))).map(x => 
QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect
```

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michalsenkyr/spark dataset-seq-builder

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16541.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16541


commit 4aaef15844848e16dc68d4be4d0a013c8ddcd7d7
Author: Michal Senkyr <mike.sen...@gmail.com>
Date:   2017-01-10T23:00:34Z

Rewrote sequence deserialization implementation to use builders




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...

2017-01-05 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16240
  
Not sure how to run MiMa tests locally so I tried my best to figure out 
what was necessary. Hope this fixes it.
The downside of the fix is that I had to restore the original methods in 
`SQLImplicits`. I removed the `implicit` keyword and added deprecation 
annotations as only the new methods should be used from now on. Old code 
importing the methods explicitly should be fine now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16240: [SPARK-16792][SQL] Dataset containing a Case Clas...

2017-01-03 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16240#discussion_r94504343
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala ---
@@ -130,6 +130,30 @@ class DatasetPrimitiveSuite extends QueryTest with 
SharedSQLContext {
 checkDataset(Seq(Array(Tuple1(1))).toDS(), Array(Tuple1(1)))
   }
 
+  test("arbitrary sequences") {
--- End diff --

I added some sequence-product combination tests.
Nested sequences were never supported (tried on master and 2.0.2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16240: [SPARK-16792][SQL] Dataset containing a Case Clas...

2016-12-24 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16240#discussion_r93816269
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala 
---
@@ -100,31 +97,36 @@ abstract class SQLImplicits {
   // Seqs
 
   /** @since 1.6.1 */
-  implicit def newIntSeqEncoder: Encoder[Seq[Int]] = ExpressionEncoder()
+  implicit def newIntSeqEncoder[T <: Seq[Int] : TypeTag]: Encoder[T] = 
ExpressionEncoder()
 
   /** @since 1.6.1 */
-  implicit def newLongSeqEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
+  implicit def newLongSeqEncoder[T <: Seq[Long] : TypeTag]: Encoder[T] = 
ExpressionEncoder()
 
   /** @since 1.6.1 */
-  implicit def newDoubleSeqEncoder: Encoder[Seq[Double]] = 
ExpressionEncoder()
+  implicit def newDoubleSeqEncoder[T <: Seq[Double] : TypeTag]: Encoder[T] 
= ExpressionEncoder()
 
   /** @since 1.6.1 */
-  implicit def newFloatSeqEncoder: Encoder[Seq[Float]] = 
ExpressionEncoder()
+  implicit def newFloatSeqEncoder[T <: Seq[Float] : TypeTag]: Encoder[T] = 
ExpressionEncoder()
 
   /** @since 1.6.1 */
-  implicit def newByteSeqEncoder: Encoder[Seq[Byte]] = ExpressionEncoder()
+  implicit def newByteSeqEncoder[T <: Seq[Byte] : TypeTag]: Encoder[T] = 
ExpressionEncoder()
 
   /** @since 1.6.1 */
-  implicit def newShortSeqEncoder: Encoder[Seq[Short]] = 
ExpressionEncoder()
+  implicit def newShortSeqEncoder[T <: Seq[Short] : TypeTag]: Encoder[T] = 
ExpressionEncoder()
 
   /** @since 1.6.1 */
-  implicit def newBooleanSeqEncoder: Encoder[Seq[Boolean]] = 
ExpressionEncoder()
+  implicit def newBooleanSeqEncoder[T <: Seq[Boolean] : TypeTag]: 
Encoder[T] = ExpressionEncoder()
 
   /** @since 1.6.1 */
-  implicit def newStringSeqEncoder: Encoder[Seq[String]] = 
ExpressionEncoder()
+  implicit def newStringSeqEncoder[T <: Seq[String] : TypeTag]: Encoder[T] 
= ExpressionEncoder()
 
   /** @since 1.6.1 */
-  implicit def newProductSeqEncoder[A <: Product : TypeTag]: 
Encoder[Seq[A]] = ExpressionEncoder()
+  implicit def newProductSeqEncoder[A <: Product, T <: Seq[A] : TypeTag]: 
Encoder[T] =
--- End diff --

You are right. `Seq` is declared covariant on `T` so it works and solves 
all the problems I was having. Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16240: [SPARK-16792][SQL] Dataset containing a Case Clas...

2016-12-23 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16240#discussion_r93807181
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -312,12 +312,46 @@ object ScalaReflection extends ScalaReflection {
   "array",
   ObjectType(classOf[Array[Any]]))
 
-StaticInvoke(
+val wrappedArray = StaticInvoke(
   scala.collection.mutable.WrappedArray.getClass,
   ObjectType(classOf[Seq[_]]),
   "make",
   array :: Nil)
 
+if (localTypeOf[scala.collection.mutable.WrappedArray[_]] <:< 
t.erasure) {
+  wrappedArray
+} else {
+  // Convert to another type using `to`
+  val cls = mirror.runtimeClass(t.typeSymbol.asClass)
+  import scala.collection.generic.CanBuildFrom
+  import scala.reflect.ClassTag
+  import scala.util.{Try, Success}
--- End diff --

Done. I tried looking up the code style you mentioned, but only found the 
[Databricks' Scala Code Style 
Guide](https://github.com/databricks/scala-style-guide#exception-handling-try-vs-try).
 And that is not mentioned in the Spark docs as far as I know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16240: [SPARK-16792][SQL] Dataset containing a Case Clas...

2016-12-23 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16240#discussion_r93805236
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala 
---
@@ -100,31 +97,36 @@ abstract class SQLImplicits {
   // Seqs
 
   /** @since 1.6.1 */
-  implicit def newIntSeqEncoder: Encoder[Seq[Int]] = ExpressionEncoder()
+  implicit def newIntSeqEncoder[T <: Seq[Int] : TypeTag]: Encoder[T] = 
ExpressionEncoder()
 
   /** @since 1.6.1 */
-  implicit def newLongSeqEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
+  implicit def newLongSeqEncoder[T <: Seq[Long] : TypeTag]: Encoder[T] = 
ExpressionEncoder()
 
   /** @since 1.6.1 */
-  implicit def newDoubleSeqEncoder: Encoder[Seq[Double]] = 
ExpressionEncoder()
+  implicit def newDoubleSeqEncoder[T <: Seq[Double] : TypeTag]: Encoder[T] 
= ExpressionEncoder()
 
   /** @since 1.6.1 */
-  implicit def newFloatSeqEncoder: Encoder[Seq[Float]] = 
ExpressionEncoder()
+  implicit def newFloatSeqEncoder[T <: Seq[Float] : TypeTag]: Encoder[T] = 
ExpressionEncoder()
 
   /** @since 1.6.1 */
-  implicit def newByteSeqEncoder: Encoder[Seq[Byte]] = ExpressionEncoder()
+  implicit def newByteSeqEncoder[T <: Seq[Byte] : TypeTag]: Encoder[T] = 
ExpressionEncoder()
 
   /** @since 1.6.1 */
-  implicit def newShortSeqEncoder: Encoder[Seq[Short]] = 
ExpressionEncoder()
+  implicit def newShortSeqEncoder[T <: Seq[Short] : TypeTag]: Encoder[T] = 
ExpressionEncoder()
 
   /** @since 1.6.1 */
-  implicit def newBooleanSeqEncoder: Encoder[Seq[Boolean]] = 
ExpressionEncoder()
+  implicit def newBooleanSeqEncoder[T <: Seq[Boolean] : TypeTag]: 
Encoder[T] = ExpressionEncoder()
 
   /** @since 1.6.1 */
-  implicit def newStringSeqEncoder: Encoder[Seq[String]] = 
ExpressionEncoder()
+  implicit def newStringSeqEncoder[T <: Seq[String] : TypeTag]: Encoder[T] 
= ExpressionEncoder()
 
   /** @since 1.6.1 */
-  implicit def newProductSeqEncoder[A <: Product : TypeTag]: 
Encoder[Seq[A]] = ExpressionEncoder()
+  implicit def newProductSeqEncoder[A <: Product : TypeTag, T <: Seq[A] : 
TypeTag]: Encoder[T] =
--- End diff --

This one is the same as all the other ones, just with Product subclasses. 
If you were concerned about the `TypeTag` on `A`, it was actually not needed as 
`T`'s tag already contains all the information. I just tested it to be sure and 
removed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...

2016-12-22 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16240
  
I actually read that but IDEA complained when I tried to place the 
`Product` encoder into a separate trait. So I opted for specificity.
However, I tried it again right now and even though IDEA still complains, 
scalac compiles it, all the tests pass and it all works. So I will change it.
I will also test whether this fixes the `toDS`-specific problems before 
pushing the changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...

2016-12-22 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16329#discussion_r93682192
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql
+
+// $example on:typed_custom_aggregation$
+import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.SparkSession
+// $example off:typed_custom_aggregation$
+
+object UserDefinedTypedAggregation {
+
+  // $example on:typed_custom_aggregation$
+  case class Employee(name: String, salary: Long)
+  case class Average(var sum: Long, var count: Long)
+
+  object MyAverage extends Aggregator[Employee, Average, Double] {
+// A zero value for this aggregation. Should satisfy the property that 
any b + zero = b
+def zero: Average = Average(0L, 0L)
+// Combine two values to produce a new value. For performance, the 
function may modify `buffer`
+// and return it instead of constructing a new object
+def reduce(buffer: Average, employee: Employee): Average = {
+  buffer.sum += employee.salary
+  buffer.count += 1
+  buffer
+}
+// Merge two intermediate values
+def merge(b1: Average, b2: Average): Average = Average(b1.sum + 
b2.sum, b1.count + b2.count)
--- End diff --

Personally, I prefer consistency. When I saw this, I immediately wondered 
whether there is a specific reason you did it this way.
I'd rather see both methods use the same paradigm. In this case probably 
the immutable one as the option of mutability is already mentioned in the 
comment above.
Or you can mention it again in the comment on this method if you want to 
provide examples of both. This way it just seems a little confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL progra...

2016-12-21 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16329
  
If you are having trouble building Javadoc, try switching to Java 7 
temporarily. Java 8 introduced stricter Javadoc rules that may fail the docs 
build. Unfortunately Jenkins doesn't, so new errors get introduced over time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...

2016-12-20 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16240
  
None of them. The compilation will fail. That is why I had to provide those 
additional implicits.

```
scala> class Test[T]
defined class Test

scala> implicit def test1[T <: Seq[String]]: Test[T] = null
test1: [T <: Seq[String]]=> Test[T]

scala> implicit def test2[T <: Product]: Test[T] = null
test2: [T <: Product]=> Test[T]

scala> def test[T : Test](t: T) = null
test: [T](t: T)(implicit evidence$1: Test[T])Null

scala> test(List("abc"))
:31: error: ambiguous implicit values:
 both method test1 of type [T <: Seq[String]]=> Test[T]
 and method test2 of type [T <: Product]=> Test[T]
 match expected type Test[List[String]]
   test(List("abc"))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16157: [SPARK-18723][DOC] Expanded programming guide informatio...

2016-12-15 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16157
  
Sorry for the delay. You are probably right that the partitioning is 
primarily determined by data locality and that it is therefore appropriate in 
some cases and shouldn't be worded such as to imply it isn't usually 
appropriate. I will think about how to word it more appropriately and remove 
the implementation specifics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...

2016-12-11 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16240
  
Possible optimization: Instead of conversions using `to`, we can use 
`Builder`s. This way we could get rid of the conversion overhead. This would 
require adding a new codegen method that would operate similarly to 
`MapObjects` but use a provided `Builder` to build the collection directly.

I will wait for a response to this PR before attempting any more 
modifications.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...

2016-12-10 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16240
  
Added support for arbitrary sequences.

Now also Queues, ArrayBuffers and such can be used in datasets (all are 
serialized into ArrayType).

I had to alter and add new implicit encoders into `SQLImplicits`. The new 
encoders are for `Seq` with `Product` combination (essentially only `List`) to 
disambiguate between `Seq` and `Product` encoders.

However, I encountered a problem with implicits. When constructing a 
complex Dataset using `Seq.toDS` that includes a `Product` (like a case class) 
and a sequence, the encoder doesn't seem to be created. When constructed with 
`spark.createDataset` or when transforming an existing dataset, there is no 
problem.

I added a workaround by defining a specific implicit just for `Seq`s. This 
makes the problem go away for existing usages, however other collections cannot 
be constructed by `Seq.toDS` unless `newProductSeqEncoder[A, T]` is created 
with the correct type parameters.

If anybody knows how to fix this, let me know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...

2016-12-09 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16240
  
I would like to add that the conversion is specific to `List[_]`. I can add 
support for arbitrary sequence types through the use of `CanBuildFrom` if it is 
desirable.

We can also support `Set`s in this way by serializing into arrays. See 
SPARK-17414


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16240: [SPARK-16792][SQL] Dataset containing a Case Clas...

2016-12-09 Thread michalsenkyr
GitHub user michalsenkyr opened a pull request:

https://github.com/apache/spark/pull/16240

[SPARK-16792][SQL] Dataset containing a Case Class with a List type causes 
a CompileException (converting sequence to list)

## What changes were proposed in this pull request?

Added a `toList` call at the end of the code generated by 
`ScalaReflection.deserializerFor` if the requested type is `List[_]`

Care was taken to preserve the original deserialization for `Seq[_]` to 
avoid the overhead of `toList` in cases where it is not needed

Also fixes [SPARK-16815] Dataset[List[T]] leads to ArrayStoreException

## How was this patch tested?
```bash
./build/mvn -DskipTests clean package && ./dev/run-tests
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michalsenkyr/spark sql-caseclass-list-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16240.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16240


commit c47f1895711d5093e4e677ad7ac5f02fe9eb3b61
Author: Michal Senkyr <mike.sen...@gmail.com>
Date:   2016-12-09T22:36:49Z

Added call to toList if deserializing into List

commit 8c15b475fb053aef19906d6a465309d299ca7b4d
Author: Michal Senkyr <mike.sen...@gmail.com>
Date:   2016-12-09T23:30:49Z

Added unit test




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16157: [SPARK-18723][DOC] Expanded programming guide inf...

2016-12-09 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16157#discussion_r91716176
  
--- Diff: docs/programming-guide.md ---
@@ -347,7 +347,7 @@ Some notes on reading files with Spark:
 
 Apart from text files, Spark's Scala API also supports several other data 
formats:
 
-* `SparkContext.wholeTextFiles` lets you read a directory containing 
multiple small text files, and returns each of them as (filename, content) 
pairs. This is in contrast with `textFile`, which would return one record per 
line in each file.
+* `SparkContext.wholeTextFiles` lets you read a directory containing 
multiple small text files, and returns each of them as (filename, content) 
pairs. This is in contrast with `textFile`, which would return one record per 
line in each file. It takes an optional second argument for controlling the 
minimal number of partitions (by default this is 2). It uses 
[CombineFileInputFormat](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.html)
 internally in order to process large numbers of small files effectively by 
grouping files on the same executor into a single partition. This can lead to 
sub-optimal partitioning when the file sets would benefit from residing in 
multiple partitions (e.g., larger partitions would not fit in memory, files are 
replicated but a large subset is locally reachable from a single executor, 
subsequent transformations would benefit from multi-core processing). In those 
cases, set the `minPartitions` argume
 nt to enforce splitting.
--- End diff --

Yes, it is different in what the elements are. However, there is no 
indication that the partitioning policy differs that much. I always understood 
`textFile`'s partitioning policy as "we will split it if we can" up to 
individual blocks. `wholeTextFiles`' partitioning seems to be more like "we 
will merge it if we can" up to the executor boundary. The polar opposite.

This manifests in the way the developer has to think about handling 
partitions. In `textFile` it is generally safe to let Spark figure it all out 
without sacrificing much in performance, whereas in `wholeTextFiles` you may 
frequently run into performance problems due to having too few partitions.

An alternate solution in this case would be to unite the partitioning 
policies of the `textFile` and `wholeTextFiles` methods by having the latter 
also "split if we can". In this case up to the individual files (presently 
achievable by setting minPartitions to an arbitrary large number). Therefore 
each file would, by default, have its own partition. However, this approach 
would mean a significant change which might break existing applications.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16201: [SPARK-3359][DOCS] Fix greater-than symbols in Ja...

2016-12-07 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16201#discussion_r91429780
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
 ---
@@ -81,7 +81,7 @@ class DecisionTreeClassifier @Since("1.4.0") (
* E.g. 10 means that the cache will get checkpointed every 10 
iterations.
* This is only used if cacheNodeIds is true and if the checkpoint 
directory is set in
* [[org.apache.spark.SparkContext]].
-   * Must be >= 1.
+   * Must be {@literal >= 1}.
--- End diff --

Yes, I read about that but figured that the literal text is probably meant 
to be a logical part of code. Which, in this case, is `>= 1`.

Making just `>` literal would make sense if you want to minimize the 
cosmetic differences between the characters. However, as I did not know whether 
the resulting code block would retain the same styling in the future, I opted 
for logical separation. If scaladoc later changed the styling of literals to, 
for example, the GitHub style for code, the result would look very bad. `>= 1` 
would still look relatively good.

But if the `{@literal >}` style is widely used in the Spark project, I can 
change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16201: [SPARK-3359][DOCS] Fix greater-than symbols in Javadoc t...

2016-12-07 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16201
  
This time I inspected both the generated Javadoc and Scaladoc. It should be 
fine now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16201: [SPARK-3359][DOCS] Fix grater-than symbols in Jav...

2016-12-07 Thread michalsenkyr
GitHub user michalsenkyr opened a pull request:

https://github.com/apache/spark/pull/16201

[SPARK-3359][DOCS] Fix grater-than symbols in Javadoc to allow building 
with Java 8

## What changes were proposed in this pull request?

The API documentation build was failing when using Java 8 due to incorrect 
character `>` in Javadoc.

Replace `>` with `` in Javadoc to allow the build to pass.

## How was this patch tested?

Documentation was built and inspected manually to ensure it still displays 
correctly in the browser

```
cd docs && jekyll serve
```

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michalsenkyr/spark javadoc8-gt-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16201.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16201


commit 84a7163184e88b827328557c5c9e8882558afef7
Author: Michal Senkyr <mike.sen...@gmail.com>
Date:   2016-12-07T23:56:00Z

Replace > with 




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16157: [SPARK-18723][DOC] Expanded programming guide inf...

2016-12-07 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16157#discussion_r91405625
  
--- Diff: docs/programming-guide.md ---
@@ -347,7 +347,7 @@ Some notes on reading files with Spark:
 
 Apart from text files, Spark's Scala API also supports several other data 
formats:
 
-* `SparkContext.wholeTextFiles` lets you read a directory containing 
multiple small text files, and returns each of them as (filename, content) 
pairs. This is in contrast with `textFile`, which would return one record per 
line in each file.
+* `SparkContext.wholeTextFiles` lets you read a directory containing 
multiple small text files, and returns each of them as (filename, content) 
pairs. This is in contrast with `textFile`, which would return one record per 
line in each file. It takes an optional second argument for controlling the 
minimal number of partitions (by default this is 2). It uses 
[CombineFileInputFormat](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.html)
 internally in order to process large numbers of small files effectively by 
grouping files on the same executor into a single partition. This can lead to 
sub-optimal partitioning when the file sets would benefit from residing in 
multiple partitions (e.g., larger partitions would not fit in memory, files are 
replicated but a large subset is locally reachable from a single executor, 
subsequent transformations would benefit from multi-core processing). In those 
cases, set the `minPartitions` argume
 nt to enforce splitting.
--- End diff --

Also, the number of files in this case is irrelevant. Even with, say, 5 
large files we got only 2 partitions by default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16157: [SPARK-18723][DOC] Expanded programming guide informatio...

2016-12-07 Thread michalsenkyr
Github user michalsenkyr commented on the issue:

https://github.com/apache/spark/pull/16157
  
I added a few more sentences describing the cases in which the user might 
want to use the argument. However, I am afraid this might be a little too 
descriptive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16157: [SPARK-18723][DOC] Expanded programming guide inf...

2016-12-05 Thread michalsenkyr
Github user michalsenkyr commented on a diff in the pull request:

https://github.com/apache/spark/pull/16157#discussion_r90975179
  
--- Diff: docs/programming-guide.md ---
@@ -347,7 +347,7 @@ Some notes on reading files with Spark:
 
 Apart from text files, Spark's Scala API also supports several other data 
formats:
 
-* `SparkContext.wholeTextFiles` lets you read a directory containing 
multiple small text files, and returns each of them as (filename, content) 
pairs. This is in contrast with `textFile`, which would return one record per 
line in each file.
+* `SparkContext.wholeTextFiles` lets you read a directory containing 
multiple small text files, and returns each of them as (filename, content) 
pairs. This is in contrast with `textFile`, which would return one record per 
line in each file. It takes an optional second argument for controlling the 
minimal number of partitions (by default this is 2). It uses 
[CombineFileInputFormat](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.html)
 internally in order to process large numbers of small files effectively by 
grouping files on the same node into a single split. (This can lead to 
non-optimal partitioning. It is therefore advisable to set the minimal number 
of partitions explicitly.)
--- End diff --

Yes, you are right. A 'node' should probably be an 'executor'. Also 'split' 
should probably be replaced by 'partition' to prevent confusion. I used Hadoop 
terminology because it is used in the linked Hadoop documentation, but Spark 
terminology may be more appropriate here.

As I understand it, this behavior is guaranteed as setting minPartitions 
directly sets maxSplitSize by dividing the length of the file 
([code](https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala#L57)).
 Please note that 
[minSplitSize](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.html#setMinSplitSizeNode-long-)
 is only used for leftover data.

As the programming guide already contains quite a detailed description of 
partitioning on `textFile`, I thought it would make sense to mention it in 
`wholeTextFiles` as well. Especially when the behavior differs that much.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16157: [SPARK-18723][DOC] Expanded programming guide inf...

2016-12-05 Thread michalsenkyr
GitHub user michalsenkyr opened a pull request:

https://github.com/apache/spark/pull/16157

[SPARK-18723][DOC] Expanded programming guide information on wholeTex…

## What changes were proposed in this pull request?

Add additional information to wholeTextFiles in the Programming Guide. Also 
explain partitioning policy difference in relation to textFile and its impact 
on performance.

Also added reference to the underlying CombineFileInputFormat

## How was this patch tested?

Manual build of documentation and inspection in browser

```
cd docs
SKIP_API=1 jekyll serve --watch
```

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michalsenkyr/spark wholeTextFilesExpandedDocs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16157.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16157


commit 487710b79ffbed8eccbd531531149a838eb2fbb2
Author: Michal Senkyr <mike.sen...@gmail.com>
Date:   2016-12-05T21:16:23Z

[SPARK-18723][DOC] Expanded programming guide information on wholeTextFiles

## What changes were proposed in this pull request?

Add additional information to wholeTextFiles in the Programming Guide. Also 
explain partitioning policy difference in relation to textFile and its impact 
on performance.

Also added reference to the underlying CombineFileInputFormat

## How was this patch tested?

Manual build of documentation and inspection in browser

```
cd docs
SKIP_API=1 jekyll serve --watch
```

Author: Michal Senkyr <mike.sen...@gmail.com>




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org