[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/22646#discussion_r229099482 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1115,9 +1126,38 @@ object SQLContext { }) } } -def createConverter(cls: Class[_], dataType: DataType): Any => Any = dataType match { - case struct: StructType => createStructConverter(cls, struct.map(_.dataType)) - case _ => CatalystTypeConverters.createToCatalystConverter(dataType) +def createConverter(t: Type, dataType: DataType): Any => Any = (t, dataType) match { + case (cls: Class[_], struct: StructType) => --- End diff -- Frankly, I was not really sure about serializing sets as arrays as the result stops behaving like a set, but I found a PR (#18416) where this seems to have been permitted, so I will go ahead and add that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/22646#discussion_r229093388 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1115,9 +1126,38 @@ object SQLContext { }) } } -def createConverter(cls: Class[_], dataType: DataType): Any => Any = dataType match { - case struct: StructType => createStructConverter(cls, struct.map(_.dataType)) - case _ => CatalystTypeConverters.createToCatalystConverter(dataType) +def createConverter(t: Type, dataType: DataType): Any => Any = (t, dataType) match { --- End diff -- I took a quick look at `CatalystTypeConverters` and I believe there would be a problem in not being able to reliably distinguish Java beans from other arbitrary classes. We might use setters or set fields directly to objects which would not be prepared for such manipulation, potentially creating hard to find errors. This method already assumes a Java bean so that problem is not present here. Isn't that so? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/22646#discussion_r224962460 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1115,9 +1126,38 @@ object SQLContext { }) } } -def createConverter(cls: Class[_], dataType: DataType): Any => Any = dataType match { - case struct: StructType => createStructConverter(cls, struct.map(_.dataType)) - case _ => CatalystTypeConverters.createToCatalystConverter(dataType) +def createConverter(t: Type, dataType: DataType): Any => Any = (t, dataType) match { + case (cls: Class[_], struct: StructType) => +// bean type +createStructConverter(cls, struct.map(_.dataType)) + case (arrayType: Class[_], array: ArrayType) if arrayType.isArray => +// array type +val converter = createConverter(arrayType.getComponentType, array.elementType) +value => new GenericArrayData( + (0 until JavaArray.getLength(value)).map(i => +converter(JavaArray.get(value, i))).toArray) + case (_, array: ArrayType) => +// java.util.List type +val cls = classOf[java.util.List[_]] --- End diff -- I think you are right. It should be better to change it to avoid confusion. I also agree with a separate PR for that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/22646#discussion_r223212772 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1098,12 +1099,19 @@ object SQLContext { data: Iterator[_], beanClass: Class[_], attrs: Seq[AttributeReference]): Iterator[InternalRow] = { +import scala.collection.JavaConverters._ +import java.lang.reflect.{Type, ParameterizedType, Array => JavaArray} --- End diff -- I didn't want to needlessly add those to the whole file as the reflection stuff is needed only in this method. Ditto with collection converters. But if you think it is better at the top, I'll move it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22646: [SPARK-25654][SQL] Support for nested JavaBean ar...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/22646#discussion_r223212724 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1115,8 +1123,31 @@ object SQLContext { }) } } -def createConverter(cls: Class[_], dataType: DataType): Any => Any = dataType match { - case struct: StructType => createStructConverter(cls, struct.map(_.dataType)) +def createConverter(t: Type, dataType: DataType): Any => Any = (t, dataType) match { + case (cls: Class[_], struct: StructType) => +createStructConverter(cls, struct.map(_.dataType)) + case (arrayType: Class[_], array: ArrayType) => +val converter = createConverter(arrayType.getComponentType, array.elementType) +value => new GenericArrayData( + (0 until JavaArray.getLength(value)).map(i => +converter(JavaArray.get(value, i))).toArray) + case (_, array: ArrayType) => --- End diff -- Sorry, I should have added a check for `cls.isArray` in the array case. That would make it clearer. I will also add a comment to each case with the actual type expected for that conversion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22527: [SPARK-17952][SQL] Nested Java beans support in createDa...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/22527 Thanks! I created a new PR with array, list and map support. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22646: Support for nested JavaBean arrays, lists and map...
GitHub user michalsenkyr opened a pull request: https://github.com/apache/spark/pull/22646 Support for nested JavaBean arrays, lists and maps in createDataFrame ## What changes were proposed in this pull request? Continuing from #22527, this PR seeks to add support for beans in array, list and map fields when creating DataFrames from Java beans. ## How was this patch tested? Appropriate unit tests were amended. Also manually tested in Spark shell: ``` scala> import scala.beans.BeanProperty import scala.beans.BeanProperty scala> class Nested(@BeanProperty var i: Int) extends Serializable defined class Nested scala> class Test(@BeanProperty var array: Array[Nested], @BeanProperty var list: java.util.List[Nested], @BeanProperty var map: java.util.Map[Integer, Nested]) extends Serializable defined class Test scala> import scala.collection.JavaConverters._ import scala.collection.JavaConverters._ scala> val array = Array(new Nested(1)) array: Array[Nested] = Array(Nested@757ad227) scala> val list = Seq(new Nested(2), new Nested(3)).asJava list: java.util.List[Nested] = [Nested@633dce39, Nested@4dd28982] scala> val map = Map(Int.box(1) -> new Nested(4), Int.box(2) -> new Nested(5)).asJava map: java.util.Map[Integer,Nested] = {1=Nested@57421e4e, 2=Nested@5a75bad4} scala> val df = spark.createDataFrame(Seq(new Test(array, list, map)).asJava, classOf[Test]) df: org.apache.spark.sql.DataFrame = [array: array>, list: array> ... 1 more field] scala> df.show() +-+--++ |array| list| map| +-+--++ |[[1]]|[[2], [3]]|[1 -> [4], 2 -> [5]]| +-+--++ ``` Previous behavior: ``` scala> val df = spark.createDataFrame(Seq(new Test(array, list, map)).asJava, classOf[Test]) java.lang.IllegalArgumentException: The value (Nested@3dedc8b8) of the type (Nested) cannot be converted to struct at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$1.apply(CatalystTypeConverters.scala:162) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:162) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:154) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396) at org.apache.spark.sql.SQLContext$$anonfun$createStructConverter$1$1$$anonfun$apply$1.apply(SQLContext.scala:1114) at org.apache.spark.sql.SQLContext$$anonfun$createStructConverter$1$1$$anonfun$apply$1.apply(SQLContext.scala:1113) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.sql.SQLContext$$anonfun$createStructConverter$1$1.apply(SQLContext.scala:1113) at org.apache.spark.sql.SQLContext$$anonfun$createStructConverter$1$1.apply(SQLContext.scala:1108) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$class.toStream(Iterator.scala:1320) at scala.collection.AbstractIterator.toStream(Iterator.scala:1334) at scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298)
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/22527#discussion_r222817293 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1098,16 +1098,26 @@ object SQLContext { data: Iterator[_], beanClass: Class[_], attrs: Seq[AttributeReference]): Iterator[InternalRow] = { -val extractors = - JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) -val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => - (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) +def createStructConverter(cls: Class[_], fieldTypes: Seq[DataType]): Any => InternalRow = { + val methodConverters = + JavaTypeInference.getJavaBeanReadableProperties(cls).zip(fieldTypes) + .map { case (property, fieldType) => +val method = property.getReadMethod +method -> createConverter(method.getReturnType, fieldType) + } + value => +if (value == null) null +else new GenericInternalRow( + methodConverters.map { case (method, converter) => +converter(method.invoke(value)) + }) --- End diff -- Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22527: [SPARK-17952][SQL] Nested Java beans support in createDa...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/22527 @ueshin Yes. I am already working on array/list support. Will add maps as well. It shouldn't require a rewrite now that the code is restructured, just new cases in pattern match. So I think it's ok to do in another PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/22527#discussion_r222415998 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1098,16 +1098,24 @@ object SQLContext { data: Iterator[_], beanClass: Class[_], attrs: Seq[AttributeReference]): Iterator[InternalRow] = { -val extractors = - JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) -val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => - (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) +def createStructConverter(cls: Class[_], fieldTypes: Iterator[DataType]): Any => InternalRow = { + val methodConverters = + JavaTypeInference.getJavaBeanReadableProperties(cls).iterator.zip(fieldTypes) + .map { case (property, fieldType) => +val method = property.getReadMethod +method -> createConverter(method.getReturnType, fieldType) + }.toArray + value => new GenericInternalRow( --- End diff -- You're right. Added. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/22527#discussion_r222415649 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1098,16 +1098,24 @@ object SQLContext { data: Iterator[_], beanClass: Class[_], attrs: Seq[AttributeReference]): Iterator[InternalRow] = { -val extractors = - JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) -val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => - (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) +def createStructConverter(cls: Class[_], fieldTypes: Iterator[DataType]): Any => InternalRow = { --- End diff -- I used `Iterator`s instead of `Seq`s in order to avoid creating intermediate collections. However, I agree it's more concise without that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22527: [SPARK-17952][SQL] Nested Java beans support in createDa...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/22527 I restructured the code in this commit to allow easier addition of array/list support in the future. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/22527#discussion_r222079624 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1100,13 +1101,23 @@ object SQLContext { attrs: Seq[AttributeReference]): Iterator[InternalRow] = { val extractors = JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) -val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => - (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) +val methodsToTypes = extractors.zip(attrs).map { case (e, attr) => + (e, attr.dataType) +} +def invoke(element: Any)(tuple: (Method, DataType)): Any = tuple match { --- End diff -- Good point. Thank you. Changed it in the latest commit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/22527#discussion_r219691829 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -1100,13 +1101,24 @@ object SQLContext { attrs: Seq[AttributeReference]): Iterator[InternalRow] = { val extractors = JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod) -val methodsToConverts = extractors.zip(attrs).map { case (e, attr) => - (e, CatalystTypeConverters.createToCatalystConverter(attr.dataType)) +val methodsToTypes = extractors.zip(attrs).map { case (e, attr) => + (e, attr.dataType) +} +def invoke(element: Any)(tuple: (Method, DataType)): Any = tuple match { + case (e, structType: StructType) => +val value = e.invoke(element) +val nestedExtractors = JavaTypeInference.getJavaBeanReadableProperties(value.getClass) +.map(desc => desc.getName -> desc.getReadMethod) +.toMap +new GenericInternalRow(structType.map(nestedProperty => + invoke(value)(nestedExtractors(nestedProperty.name) -> nestedProperty.dataType) +).toArray) --- End diff -- Right, we don't have to. Just checked and `JavaTypeInference.inferDataType` also uses `JavaTypeInference.getJavaBeanReadableProperties` so the order should be the same. Also double-checked manually in Spark shell with a more complex nested bean to be sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22527: [SPARK-17952][SQL] Nested Java beans support in c...
GitHub user michalsenkyr opened a pull request: https://github.com/apache/spark/pull/22527 [SPARK-17952][SQL] Nested Java beans support in createDataFrame ## What changes were proposed in this pull request? When constructing a DataFrame from a Java bean, using nested beans throws an error despite [documentation](http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection) stating otherwise. This PR aims to add that support. This PR does not yet add nested beans support in array or List fields. This can be added later or in another PR. ## How was this patch tested? Nested bean was added to the appropriate unit test. Also manually tested in Spark shell on code emulating the referenced JIRA: ``` scala> import scala.beans.BeanProperty import scala.beans.BeanProperty scala> class SubCategory(@BeanProperty var id: String, @BeanProperty var name: String) extends Serializable defined class SubCategory scala> class Category(@BeanProperty var id: String, @BeanProperty var subCategory: SubCategory) extends Serializable defined class Category scala> import scala.collection.JavaConverters._ import scala.collection.JavaConverters._ scala> spark.createDataFrame(Seq(new Category("s-111", new SubCategory("sc-111", "Sub-1"))).asJava, classOf[Category]) java.lang.IllegalArgumentException: The value (SubCategory@65130cf2) of the type (SubCategory) cannot be converted to struct at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396) at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1108) at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1108) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1108) at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:1106) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$class.toStream(Iterator.scala:1320) at scala.collection.AbstractIterator.toStream(Iterator.scala:1334) at scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298) at scala.collection.AbstractIterator.toSeq(Iterator.scala:1334) at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:423) ... 51 elided ``` New behavior: ``` scala> spark.createDataFrame(Seq(new Category("s-111", new SubCategory("sc-111", "Sub-1"))).asJava, classOf[Category]) res0: org.apache.spark.sql.DataFrame = [id: string, subCategory: struct] scala> res0.show() +-+---+ | id|subCategory| +-+---+ |s-111|[sc-111, Sub-1]| +-+---+ ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/michalsenkyr/spark SPARK-17952 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22527.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22527 commit ccea758b069c4622e9b1f71b92167c81cfcd81b8 Author: Michal Senkyr Date: 2018-09-22T18:25:36Z Add nested Java beans support to SQLContext.beansToRow --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20505: [SPARK-23251][SQL] Add checks for collection element Enc...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/20505 Yes, that is the idea. Frankly, I am not that familiar with how the compiler resolves all the implicit parameters to say confidently what is going on. But here's my take: I did a little more research and found out that the "diverging implicit expansion" error means the compiler is able to follow a resolution route that can be potentially infinite. I think it might be possible that the compiler is following my other implicit methods, trying to fit various collections inside V with the hope of satisfying the `<:<` condition some time in the future before finally giving up. Just to be sure multi-level collections work with my change, I successfully tried this: ``` scala> implicitly[Encoder[Map[Seq[Map[String, Seq[Long]]], List[Array[Map[String, Int]] res5: org.apache.spark.sql.Encoder[Map[Seq[Map[String,Seq[Long]]],List[Array[Map[String,Int] = class[value[0]: map<array<map<string,array>>,array<array<map<string,int>>>>] ``` One thing that doesn't make sense for me, however, is the information the compiler gives me when enabling implicit resolution logging via `-Xlog-implicits`: ``` scala> implicitly[Encoder[Map[String, Any]]] :25: newCheckedSetEncoder is not a valid implicit value for org.apache.spark.sql.Encoder[Map[String,Any]] because: hasMatchingSymbol reported error: polymorphic expression cannot be instantiated to expected type; found : [T[_], E]org.apache.spark.sql.Encoder[T[E]] required: org.apache.spark.sql.Encoder[Map[String,Any]] implicitly[Encoder[Map[String, Any]]] ^ :25: newCheckedSetEncoder is not a valid implicit value for org.apache.spark.sql.Encoder[String] because: hasMatchingSymbol reported error: polymorphic expression cannot be instantiated to expected type; found : [T[_], E]org.apache.spark.sql.Encoder[T[E]] required: org.apache.spark.sql.Encoder[String] implicitly[Encoder[Map[String, Any]]] ^ :25: newCheckedMapEncoder is not a valid implicit value for org.apache.spark.sql.Encoder[String] because: hasMatchingSymbol reported error: polymorphic expression cannot be instantiated to expected type; found : [T[_, _], K, V]org.apache.spark.sql.Encoder[T[K,V]] required: org.apache.spark.sql.Encoder[String] implicitly[Encoder[Map[String, Any]]] ^ :25: newCheckedSequenceEncoder is not a valid implicit value for org.apache.spark.sql.Encoder[String] because: hasMatchingSymbol reported error: polymorphic expression cannot be instantiated to expected type; found : [T[_], E]org.apache.spark.sql.Encoder[T[E]] required: org.apache.spark.sql.Encoder[String] implicitly[Encoder[Map[String, Any]]] ^ :25: materializing requested reflect.runtime.universe.type.TypeTag[A] using `package`.this.materializeTypeTag[A](scala.reflect.runtime.`package`.universe) implicitly[Encoder[Map[String, Any]]] ^ :25: newCheckedMapEncoder is not a valid implicit value for org.apache.spark.sql.Encoder[E] because: hasMatchingSymbol reported error: diverging implicit expansion for type org.apache.spark.sql.Encoder[K] starting with method newStringEncoder in class SQLImplicits implicitly[Encoder[Map[String, Any]]] ^ :25: newCheckedSetEncoder is not a valid implicit value for org.apache.spark.sql.Encoder[Any] because: hasMatchingSymbol reported error: ambiguous implicit values: both method newIntEncoder in class SQLImplicits of type => org.apache.spark.sql.Encoder[Int] and method newLongEncoder in class SQLImplicits of type => org.apache.spark.sql.Encoder[Long] match expected type org.apache.spark.sql.Encoder[E] implicitly[Encoder[Map[String, Any]]] ^ :25: newCheckedMapEncoder is not a valid implicit value for org.apache.spark.sql.Encoder[Any] because: hasMatchingSymbol reported error: diverging implicit expansion for type org.apache.spark.sql.Encoder[K] starting with method newStringEncoder in class SQLImplicits implicitly[Encoder[Map[String, Any]]] ^ :25: materializing requested reflect.runtime.universe.type.TypeTag[A] using `package`.this.materializeTypeTag[A](scala.reflect.runtime.`package`.universe) implicitly[Encoder[Map[String, Any]]] ^ :25: newCheckedMapEncoder is not a valid implicit value for org.apache.spark.sql.Encoder[E] because: hasMatchingSymbol reported error: diverging implicit expansion for type org.apache.spark.sql.Encoder[K] starting with method newStringEncoder in class SQLImplicits implicitly[Encoder[Map[String,
[GitHub] spark pull request #20505: [SPARK-23251][SQL] Add checks for collection elem...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/20505#discussion_r167437023 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala --- @@ -165,11 +165,15 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits { def newProductSeqEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = ExpressionEncoder() /** @since 2.2.0 */ - implicit def newSequenceEncoder[T <: Seq[_] : TypeTag]: Encoder[T] = ExpressionEncoder() + implicit def newSequenceEncoder[T[_], E : Encoder] --- End diff -- Just letting you know that I updated this PR with the above fix. MiMa is now passing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20505: [SPARK-23251][SQL] Add checks for collection elem...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/20505#discussion_r165903346 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala --- @@ -165,11 +165,15 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits { def newProductSeqEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = ExpressionEncoder() /** @since 2.2.0 */ - implicit def newSequenceEncoder[T <: Seq[_] : TypeTag]: Encoder[T] = ExpressionEncoder() + implicit def newSequenceEncoder[T[_], E : Encoder] --- End diff -- Looks like we are. I can add new methods and make the old ones not implicit. That should fix MiMa. Although that might add to the clutter that's already in this class. Is that OK? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20505: [SPARK-23251][SQL] Add checks for collection elem...
GitHub user michalsenkyr opened a pull request: https://github.com/apache/spark/pull/20505 [SPARK-23251][SQL] Add checks for collection element Encoders Implicit methods of `SQLImplicits` providing Encoders for collections did not check for Encoders for their elements. This resulted in unrelated error messages during run-time instead of proper failures during compilation. ## What changes were proposed in this pull request? `Seq`, `Map` and `Set` `Encoder` providers' type parameters and implicit parameters were modified to facilitate the appropriate checks. Unfortunately, this doesn't result in the desired "Unable to find encoder for type stored in a Dataset" error as the Scala compiler generates a different error when an implicit cannot be found due to missing arguments (see [ContextErrors](https://github.com/scala/scala/blob/v2.11.12/src/compiler/scala/tools/nsc/typechecker/ContextErrors.scala#L77)). `@implicitNotFound` is not used in this case. ## How was this patch tested? New behavior: ```scala scala> implicitly[Encoder[Map[String, Any]]] :25: error: diverging implicit expansion for type org.apache.spark.sql.Encoder[Map[String,Any]] starting with method newStringEncoder in class SQLImplicits implicitly[Encoder[Map[String, Any]]] ^ ``` Old behavior: ```scala scala> implicitly[Encoder[Map[String, Any]]] java.lang.ClassNotFoundException: scala.Any at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555) at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1211) at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1203) at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toJava$1.apply(TwoWayCaches.scala:49) at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19) at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16) at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toJava(TwoWayCaches.scala:44) at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1203) at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:194) at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54) at org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:700) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:84) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:65) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:64) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:512) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:445) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:445) at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:434) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71) at org.apache.spark.sql.SQLImplicits.newMapEncoder(SQLImplicits.scala:172) ... 49 elided ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/michalsenkyr/spark SPARK-23251 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20505
[GitHub] spark pull request #16986: [SPARK-18891][SQL] Support for Scala Map collecti...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16986#discussion_r121266969 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala --- @@ -258,6 +265,80 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { ListClass(List(1)) -> Queue("test" -> SeqClass(Seq(2 } + test("arbitrary maps") { +checkDataset(Seq(Map(1 -> 2)).toDS(), Map(1 -> 2)) +checkDataset(Seq(Map(1.toLong -> 2.toLong)).toDS(), Map(1.toLong -> 2.toLong)) +checkDataset(Seq(Map(1.toDouble -> 2.toDouble)).toDS(), Map(1.toDouble -> 2.toDouble)) +checkDataset(Seq(Map(1.toFloat -> 2.toFloat)).toDS(), Map(1.toFloat -> 2.toFloat)) +checkDataset(Seq(Map(1.toByte -> 2.toByte)).toDS(), Map(1.toByte -> 2.toByte)) +checkDataset(Seq(Map(1.toShort -> 2.toShort)).toDS(), Map(1.toShort -> 2.toShort)) +checkDataset(Seq(Map(true -> false)).toDS(), Map(true -> false)) +checkDataset(Seq(Map("test1" -> "test2")).toDS(), Map("test1" -> "test2")) +checkDataset(Seq(Map(Tuple1(1) -> Tuple1(2))).toDS(), Map(Tuple1(1) -> Tuple1(2))) +checkDataset(Seq(Map(1 -> Tuple1(2))).toDS(), Map(1 -> Tuple1(2))) +checkDataset(Seq(Map("test" -> 2.toLong)).toDS(), Map("test" -> 2.toLong)) + +checkDataset(Seq(LHMap(1 -> 2)).toDS(), LHMap(1 -> 2)) +checkDataset(Seq(LHMap(1.toLong -> 2.toLong)).toDS(), LHMap(1.toLong -> 2.toLong)) +checkDataset(Seq(LHMap(1.toDouble -> 2.toDouble)).toDS(), LHMap(1.toDouble -> 2.toDouble)) +checkDataset(Seq(LHMap(1.toFloat -> 2.toFloat)).toDS(), LHMap(1.toFloat -> 2.toFloat)) +checkDataset(Seq(LHMap(1.toByte -> 2.toByte)).toDS(), LHMap(1.toByte -> 2.toByte)) +checkDataset(Seq(LHMap(1.toShort -> 2.toShort)).toDS(), LHMap(1.toShort -> 2.toShort)) +checkDataset(Seq(LHMap(true -> false)).toDS(), LHMap(true -> false)) +checkDataset(Seq(LHMap("test1" -> "test2")).toDS(), LHMap("test1" -> "test2")) +checkDataset(Seq(LHMap(Tuple1(1) -> Tuple1(2))).toDS(), LHMap(Tuple1(1) -> Tuple1(2))) +checkDataset(Seq(LHMap(1 -> Tuple1(2))).toDS(), LHMap(1 -> Tuple1(2))) +checkDataset(Seq(LHMap("test" -> 2.toLong)).toDS(), LHMap("test" -> 2.toLong)) --- End diff -- Added as a separate test case (same as sequences) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16986: [SPARK-18891][SQL] Support for Scala Map collecti...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16986#discussion_r121265890 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala --- @@ -258,6 +265,80 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { ListClass(List(1)) -> Queue("test" -> SeqClass(Seq(2 } + test("arbitrary maps") { +checkDataset(Seq(Map(1 -> 2)).toDS(), Map(1 -> 2)) +checkDataset(Seq(Map(1.toLong -> 2.toLong)).toDS(), Map(1.toLong -> 2.toLong)) +checkDataset(Seq(Map(1.toDouble -> 2.toDouble)).toDS(), Map(1.toDouble -> 2.toDouble)) +checkDataset(Seq(Map(1.toFloat -> 2.toFloat)).toDS(), Map(1.toFloat -> 2.toFloat)) +checkDataset(Seq(Map(1.toByte -> 2.toByte)).toDS(), Map(1.toByte -> 2.toByte)) +checkDataset(Seq(Map(1.toShort -> 2.toShort)).toDS(), Map(1.toShort -> 2.toShort)) +checkDataset(Seq(Map(true -> false)).toDS(), Map(true -> false)) +checkDataset(Seq(Map("test1" -> "test2")).toDS(), Map("test1" -> "test2")) +checkDataset(Seq(Map(Tuple1(1) -> Tuple1(2))).toDS(), Map(Tuple1(1) -> Tuple1(2))) +checkDataset(Seq(Map(1 -> Tuple1(2))).toDS(), Map(1 -> Tuple1(2))) +checkDataset(Seq(Map("test" -> 2.toLong)).toDS(), Map("test" -> 2.toLong)) + +checkDataset(Seq(LHMap(1 -> 2)).toDS(), LHMap(1 -> 2)) +checkDataset(Seq(LHMap(1.toLong -> 2.toLong)).toDS(), LHMap(1.toLong -> 2.toLong)) +checkDataset(Seq(LHMap(1.toDouble -> 2.toDouble)).toDS(), LHMap(1.toDouble -> 2.toDouble)) +checkDataset(Seq(LHMap(1.toFloat -> 2.toFloat)).toDS(), LHMap(1.toFloat -> 2.toFloat)) +checkDataset(Seq(LHMap(1.toByte -> 2.toByte)).toDS(), LHMap(1.toByte -> 2.toByte)) +checkDataset(Seq(LHMap(1.toShort -> 2.toShort)).toDS(), LHMap(1.toShort -> 2.toShort)) +checkDataset(Seq(LHMap(true -> false)).toDS(), LHMap(true -> false)) +checkDataset(Seq(LHMap("test1" -> "test2")).toDS(), LHMap("test1" -> "test2")) +checkDataset(Seq(LHMap(Tuple1(1) -> Tuple1(2))).toDS(), LHMap(Tuple1(1) -> Tuple1(2))) +checkDataset(Seq(LHMap(1 -> Tuple1(2))).toDS(), LHMap(1 -> Tuple1(2))) +checkDataset(Seq(LHMap("test" -> 2.toLong)).toDS(), LHMap("test" -> 2.toLong)) + } + + ignore("SPARK-19104: map and product combinations") { --- End diff -- I added these tests for issue [SPARK-19104 CompileException with Map and Case Class in Spark 2.1.0](https://issues.apache.org/jira/browse/SPARK-19104) as I thought I could fix it as part of this PR. However, I found out that it was a more complicated issue than I anticipated so I left the tests there and ignored them. I can remove them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18009: [SPARK-18891][SQL] Support for specific Java List...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/18009#discussion_r121263319 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala --- @@ -28,6 +28,8 @@ case class SeqClass(s: Seq[Int]) case class ListClass(l: List[Int]) +case class JListClass(l: java.util.List[Int]) --- End diff -- I did not notice that there is separate inference code for Java classes. It would certainly be nice to unify the code for Java and Scala classes. Is this already being worked on/planned? I moved the `List` support to `JavaTypeInference` and rewrote tests accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16986: [SPARK-18891][SQL] Support for Scala Map collection type...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16986 So I tried to simplify the code as much as possible, removing unneeded parameters. I must admit I am not entirely sure about whether I am handling all the data types correctly but everything seems to work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16986: [SPARK-18891][SQL] Support for Scala Map collecti...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16986#discussion_r120010531 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -652,6 +653,299 @@ case class MapObjects private( } } +object CollectObjectsToMap { + private val curId = new java.util.concurrent.atomic.AtomicInteger() + + /** + * Construct an instance of CollectObjects case class. + * + * @param keyFunction The function applied on the key collection elements. + * @param keyInputData An expression that when evaluated returns a key collection object. + * @param keyElementType The data type of key elements in the collection. + * @param valueFunction The function applied on the value collection elements. + * @param valueInputData An expression that when evaluated returns a value collection object. + * @param valueElementType The data type of value elements in the collection. + * @param collClass The type of the resulting collection. + */ + def apply( + keyFunction: Expression => Expression, + keyInputData: Expression, + keyElementType: DataType, + valueFunction: Expression => Expression, + valueInputData: Expression, + valueElementType: DataType, + collClass: Class[_]): CollectObjectsToMap = { +val id = curId.getAndIncrement() +val keyLoopValue = s"CollectObjectsToMap_keyLoopValue$id" +val keyLoopIsNull = s"CollectObjectsToMap_keyLoopIsNull$id" +val keyLoopVar = LambdaVariable(keyLoopValue, keyLoopIsNull, keyElementType) +val valueLoopValue = s"CollectObjectsToMap_valueLoopValue$id" +val valueLoopIsNull = s"CollectObjectsToMap_valueLoopIsNull$id" +val valueLoopVar = LambdaVariable(valueLoopValue, valueLoopIsNull, valueElementType) +val tupleLoopVar = s"CollectObjectsToMap_tupleLoopValue$id" +val builderValue = s"CollectObjectsToMap_builderValue$id" +CollectObjectsToMap( + keyLoopValue, keyLoopIsNull, keyElementType, keyFunction(keyLoopVar), keyInputData, + valueLoopValue, valueLoopIsNull, valueElementType, valueFunction(valueLoopVar), + valueInputData, + tupleLoopVar, collClass, builderValue) + } +} + +/** + * An equivalent to the [[MapObjects]] case class but returning an ObjectType containing + * a Scala collection constructed using the associated builder, obtained by calling `newBuilder` + * on the collection's companion object. + * + * @param keyLoopValue the name of the loop variable that is used when iterating over the key + * collection, and which is used as input for the `keyLambdaFunction` + * @param keyLoopIsNull the nullability of the loop variable that is used when iterating over + * the key collection, and which is used as input for the `keyLambdaFunction` + * @param keyLoopVarDataType the data type of the loop variable that is used when iterating over + * the key collection, and which is used as input for the + * `keyLambdaFunction` + * @param keyLambdaFunction A function that takes the `keyLoopVar` as input, and is used as + * a lambda function to handle collection elements. + * @param keyInputData An expression that when evaluated returns a collection object. + * @param valueLoopValue the name of the loop variable that is used when iterating over the value + * collection, and which is used as input for the `valueLambdaFunction` + * @param valueLoopIsNull the nullability of the loop variable that is used when iterating over + *the value collection, and which is used as input for the + *`valueLambdaFunction` + * @param valueLoopVarDataType the data type of the loop variable that is used when iterating over + * the value collection, and which is used as input for the + * `valueLambdaFunction` + * @param valueLambdaFunction A function that takes the `valueLoopVar` as input, and is used as + *a lambda function to handle collection elements. + * @param valueInputData An expression that when evaluated returns a collection object. + * @param tupleLoopValue the name of the loop variable that holds the tuple to be added to the + * resulting map (used only for Scala Map) + * @param collClass The type of the resulting collection. + * @param builderValue The name of the builder variable used to construct the resultin
[GitHub] spark pull request #16986: [SPARK-18891][SQL] Support for Scala Map collecti...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16986#discussion_r120010461 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -652,6 +653,299 @@ case class MapObjects private( } } +object CollectObjectsToMap { + private val curId = new java.util.concurrent.atomic.AtomicInteger() + + /** + * Construct an instance of CollectObjects case class. + * + * @param keyFunction The function applied on the key collection elements. + * @param keyInputData An expression that when evaluated returns a key collection object. + * @param keyElementType The data type of key elements in the collection. + * @param valueFunction The function applied on the value collection elements. + * @param valueInputData An expression that when evaluated returns a value collection object. + * @param valueElementType The data type of value elements in the collection. + * @param collClass The type of the resulting collection. + */ + def apply( + keyFunction: Expression => Expression, + keyInputData: Expression, + keyElementType: DataType, + valueFunction: Expression => Expression, + valueInputData: Expression, + valueElementType: DataType, + collClass: Class[_]): CollectObjectsToMap = { +val id = curId.getAndIncrement() +val keyLoopValue = s"CollectObjectsToMap_keyLoopValue$id" +val keyLoopIsNull = s"CollectObjectsToMap_keyLoopIsNull$id" +val keyLoopVar = LambdaVariable(keyLoopValue, keyLoopIsNull, keyElementType) +val valueLoopValue = s"CollectObjectsToMap_valueLoopValue$id" +val valueLoopIsNull = s"CollectObjectsToMap_valueLoopIsNull$id" +val valueLoopVar = LambdaVariable(valueLoopValue, valueLoopIsNull, valueElementType) +val tupleLoopVar = s"CollectObjectsToMap_tupleLoopValue$id" +val builderValue = s"CollectObjectsToMap_builderValue$id" +CollectObjectsToMap( + keyLoopValue, keyLoopIsNull, keyElementType, keyFunction(keyLoopVar), keyInputData, + valueLoopValue, valueLoopIsNull, valueElementType, valueFunction(valueLoopVar), + valueInputData, + tupleLoopVar, collClass, builderValue) + } +} + +/** + * An equivalent to the [[MapObjects]] case class but returning an ObjectType containing + * a Scala collection constructed using the associated builder, obtained by calling `newBuilder` + * on the collection's companion object. + * + * @param keyLoopValue the name of the loop variable that is used when iterating over the key + * collection, and which is used as input for the `keyLambdaFunction` + * @param keyLoopIsNull the nullability of the loop variable that is used when iterating over + * the key collection, and which is used as input for the `keyLambdaFunction` + * @param keyLoopVarDataType the data type of the loop variable that is used when iterating over + * the key collection, and which is used as input for the + * `keyLambdaFunction` + * @param keyLambdaFunction A function that takes the `keyLoopVar` as input, and is used as + * a lambda function to handle collection elements. + * @param keyInputData An expression that when evaluated returns a collection object. + * @param valueLoopValue the name of the loop variable that is used when iterating over the value + * collection, and which is used as input for the `valueLambdaFunction` + * @param valueLoopIsNull the nullability of the loop variable that is used when iterating over + *the value collection, and which is used as input for the + *`valueLambdaFunction` + * @param valueLoopVarDataType the data type of the loop variable that is used when iterating over + * the value collection, and which is used as input for the + * `valueLambdaFunction` + * @param valueLambdaFunction A function that takes the `valueLoopVar` as input, and is used as + *a lambda function to handle collection elements. + * @param valueInputData An expression that when evaluated returns a collection object. + * @param tupleLoopValue the name of the loop variable that holds the tuple to be added to the + * resulting map (used only for Scala Map) + * @param collClass The type of the resulting collection. + * @param builderValue The name of the builder variable used to construct the resultin
[GitHub] spark pull request #16986: [SPARK-18891][SQL] Support for Scala Map collecti...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16986#discussion_r120010289 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -652,6 +653,299 @@ case class MapObjects private( } } +object CollectObjectsToMap { + private val curId = new java.util.concurrent.atomic.AtomicInteger() + + /** + * Construct an instance of CollectObjects case class. + * + * @param keyFunction The function applied on the key collection elements. + * @param keyInputData An expression that when evaluated returns a key collection object. + * @param keyElementType The data type of key elements in the collection. + * @param valueFunction The function applied on the value collection elements. + * @param valueInputData An expression that when evaluated returns a value collection object. + * @param valueElementType The data type of value elements in the collection. + * @param collClass The type of the resulting collection. + */ + def apply( + keyFunction: Expression => Expression, + keyInputData: Expression, + keyElementType: DataType, + valueFunction: Expression => Expression, + valueInputData: Expression, + valueElementType: DataType, + collClass: Class[_]): CollectObjectsToMap = { +val id = curId.getAndIncrement() +val keyLoopValue = s"CollectObjectsToMap_keyLoopValue$id" +val keyLoopIsNull = s"CollectObjectsToMap_keyLoopIsNull$id" +val keyLoopVar = LambdaVariable(keyLoopValue, keyLoopIsNull, keyElementType) +val valueLoopValue = s"CollectObjectsToMap_valueLoopValue$id" +val valueLoopIsNull = s"CollectObjectsToMap_valueLoopIsNull$id" +val valueLoopVar = LambdaVariable(valueLoopValue, valueLoopIsNull, valueElementType) +val tupleLoopVar = s"CollectObjectsToMap_tupleLoopValue$id" +val builderValue = s"CollectObjectsToMap_builderValue$id" +CollectObjectsToMap( + keyLoopValue, keyLoopIsNull, keyElementType, keyFunction(keyLoopVar), keyInputData, + valueLoopValue, valueLoopIsNull, valueElementType, valueFunction(valueLoopVar), + valueInputData, + tupleLoopVar, collClass, builderValue) + } +} + +/** + * An equivalent to the [[MapObjects]] case class but returning an ObjectType containing + * a Scala collection constructed using the associated builder, obtained by calling `newBuilder` + * on the collection's companion object. + * + * @param keyLoopValue the name of the loop variable that is used when iterating over the key + * collection, and which is used as input for the `keyLambdaFunction` + * @param keyLoopIsNull the nullability of the loop variable that is used when iterating over + * the key collection, and which is used as input for the `keyLambdaFunction` + * @param keyLoopVarDataType the data type of the loop variable that is used when iterating over + * the key collection, and which is used as input for the + * `keyLambdaFunction` + * @param keyLambdaFunction A function that takes the `keyLoopVar` as input, and is used as + * a lambda function to handle collection elements. + * @param keyInputData An expression that when evaluated returns a collection object. + * @param valueLoopValue the name of the loop variable that is used when iterating over the value + * collection, and which is used as input for the `valueLambdaFunction` + * @param valueLoopIsNull the nullability of the loop variable that is used when iterating over + *the value collection, and which is used as input for the + *`valueLambdaFunction` + * @param valueLoopVarDataType the data type of the loop variable that is used when iterating over + * the value collection, and which is used as input for the + * `valueLambdaFunction` + * @param valueLambdaFunction A function that takes the `valueLoopVar` as input, and is used as + *a lambda function to handle collection elements. + * @param valueInputData An expression that when evaluated returns a collection object. + * @param tupleLoopValue the name of the loop variable that holds the tuple to be added to the + * resulting map (used only for Scala Map) + * @param collClass The type of the resulting collection. + * @param builderValue The name of the builder variable used to construct the resultin
[GitHub] spark pull request #16986: [SPARK-18891][SQL] Support for Scala Map collecti...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16986#discussion_r120009967 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -652,6 +653,299 @@ case class MapObjects private( } } +object CollectObjectsToMap { + private val curId = new java.util.concurrent.atomic.AtomicInteger() + + /** + * Construct an instance of CollectObjects case class. + * + * @param keyFunction The function applied on the key collection elements. + * @param keyInputData An expression that when evaluated returns a key collection object. + * @param keyElementType The data type of key elements in the collection. + * @param valueFunction The function applied on the value collection elements. + * @param valueInputData An expression that when evaluated returns a value collection object. + * @param valueElementType The data type of value elements in the collection. + * @param collClass The type of the resulting collection. + */ + def apply( + keyFunction: Expression => Expression, + keyInputData: Expression, + keyElementType: DataType, + valueFunction: Expression => Expression, + valueInputData: Expression, + valueElementType: DataType, + collClass: Class[_]): CollectObjectsToMap = { +val id = curId.getAndIncrement() +val keyLoopValue = s"CollectObjectsToMap_keyLoopValue$id" +val keyLoopIsNull = s"CollectObjectsToMap_keyLoopIsNull$id" --- End diff -- Yes. A key in `MapData` cannot be null. However, since the function takes two `ArrayData`s as input, I figured that we shouldn't count on this requirement being necessarily fulfilled. As `CollectObjectsToMap` is a class separate from its usage in `ScalaReflection`, I tried to make it as generic and as similar to `MapObjects` as possible, so it can be used elsewhere without having to make sure additional preconditions are met. It also produces a generic `Map` which has implementations that can support null keys. Right now, the only check that prevents this is [here](https://github.com/apache/spark/pull/16986/files/7af9b0625a245d34943b7192532c93f2aafac635#diff-e436c96ea839dfe446837ab2a3531f93R933). If there is ever a need to support these kinds of `Map`s in the future, this should make the job easier. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16986: [SPARK-18891][SQL] Support for Scala Map collection type...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16986 That was because of my other PR that just got accepted. Just a matter of appending unit tests. I resolved the conflict from browser for now. Can rebase later if merge commits are not encouraged in PRs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16986: [SPARK-18891][SQL] Support for Map collection typ...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16986#discussion_r117363594 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -329,35 +329,19 @@ object ScalaReflection extends ScalaReflection { } UnresolvedMapObjects(mapFunction, getPath, Some(cls)) - case t if t <:< localTypeOf[Map[_, _]] => + case t if t <:< localTypeOf[Map[_, _]] || t <:< localTypeOf[java.util.Map[_, _]] => --- End diff -- Alright. Should I remove the case condition modifications in this PR or leave them as they are and remove them in the next one? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18011: [SPARK-19089][SQL] Add support for nested sequenc...
GitHub user michalsenkyr opened a pull request: https://github.com/apache/spark/pull/18011 [SPARK-19089][SQL] Add support for nested sequences ## What changes were proposed in this pull request? Replaced specific sequence encoders with generic sequence encoder to enable nesting of sequences. Does not add support for nested arrays as that cannot be solved in this way. ## How was this patch tested? ```bash build/mvn -DskipTests clean package && dev/run-tests ``` Additionally in Spark shell: ``` scala> Seq(Seq(Seq(1))).toDS.collect() res0: Array[Seq[Seq[Int]]] = Array(List(List(1))) ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/michalsenkyr/spark dataset-seq-nested Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18011.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18011 commit dd3bf0113cbf66ebf784f68d7f602c39f4a46b8b Author: Michal Senkyr <mike.sen...@gmail.com> Date: 2017-05-17T00:18:41Z Replace specific sequence encoders with generic sequence encoder --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18009: [SPARK-18891][SQL] Support for specific Java List...
GitHub user michalsenkyr opened a pull request: https://github.com/apache/spark/pull/18009 [SPARK-18891][SQL] Support for specific Java List subtypes ## What changes were proposed in this pull request? Add support for specific Java `List` subtypes in deserialization as well as a generic implicit encoder. All `List` subtypes are supported by using either the size-specifying constructor (one `int` parameter) or the default constructor. Interfaces/abstract classes use the following implementations: * `java.util.List`, `java.util.AbstractList` or `java.util.AbstractSequentialList` => `java.util.ArrayList` ## How was this patch tested? ```bash build/mvn -DskipTests clean package && dev/run-tests ``` Additionally in Spark shell: ``` scala> val jlist = new java.util.LinkedList[Int]; jlist.add(1) jlist: java.util.LinkedList[Int] = [1] res0: Boolean = true scala> Seq(jlist).toDS().map(_.element()).collect() res1: Array[Int] = Array(1) ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/michalsenkyr/spark dataset-java-lists Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18009.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18009 commit 1cd6b6b4cdb0ff8a5cc42a105c7a8d4bc16fc0c1 Author: Michal Senkyr <mike.sen...@gmail.com> Date: 2017-04-16T15:54:40Z Add arbitrary Java List serialization/deserialization support --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16986: [SPARK-18891][SQL] Support for Map collection types
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16986 Rebased onto the current master and integrated a few minor changes from the code review of #16541 in case anyone is still interested in this feature --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16541 @ueshin Thanks for the fix --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16541 Thanks. Made the suggested changes in my latest commit. I also encountered a minor problem when doing final testing. When using a collection type that is a type alias (e.g., scala.List), the companion object's `newBuilder` could not be found. Fixed it by dealiasing the collection type before obtaining the companion object. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16541: [SPARK-19088][SQL] Optimize sequence type deseria...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16541#discussion_r106810993 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -589,6 +590,170 @@ case class MapObjects private( } } +object CollectObjects { + private val curId = new java.util.concurrent.atomic.AtomicInteger() + + /** + * Construct an instance of CollectObjects case class. + * + * @param function The function applied on the collection elements. + * @param inputData An expression that when evaluated returns a collection object. + * @param elementType The data type of elements in the collection. + * @param collClass The type of the resulting collection. + */ + def apply( + function: Expression => Expression, + inputData: Expression, + elementType: DataType, + collClass: Class[_]): CollectObjects = { +val loopValue = "CollectObjects_loopValue" + curId.getAndIncrement() +val loopIsNull = "CollectObjects_loopIsNull" + curId.getAndIncrement() +val loopVar = LambdaVariable(loopValue, loopIsNull, elementType) +val builderValue = "CollectObjects_builderValue" + curId.getAndIncrement() --- End diff -- Altered vals in `MapObjects` to share the same `curId` instead --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16541: [SPARK-19088][SQL] Optimize sequence type deseria...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16541#discussion_r106810940 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SequenceBenchmark.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.util.Benchmark + +/** + * Benchmark [[Seq]], [[List]] and [[scala.collection.mutable.Queue]] serialization + * performance. + * To run this: + * 1. replace ignore(...) with test(...) + * 2. build/sbt "sql/test-only *benchmark.SequenceBenchmark" + * + * Benchmarks in this file are skipped in normal builds. + */ +class SequenceBenchmark extends BenchmarkBase { --- End diff -- Removed but it's still useful to know that the change didn't affect performance in any negative way. It introduced a different approach to collection construction after all. Should I also remove the results from PR description? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16541: [SPARK-19088][SQL] Optimize sequence type deseria...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16541#discussion_r106810789 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -589,6 +590,170 @@ case class MapObjects private( } } +object CollectObjects { + private val curId = new java.util.concurrent.atomic.AtomicInteger() + + /** + * Construct an instance of CollectObjects case class. + * + * @param function The function applied on the collection elements. + * @param inputData An expression that when evaluated returns a collection object. + * @param elementType The data type of elements in the collection. + * @param collClass The type of the resulting collection. + */ + def apply( + function: Expression => Expression, + inputData: Expression, + elementType: DataType, + collClass: Class[_]): CollectObjects = { +val loopValue = "CollectObjects_loopValue" + curId.getAndIncrement() +val loopIsNull = "CollectObjects_loopIsNull" + curId.getAndIncrement() +val loopVar = LambdaVariable(loopValue, loopIsNull, elementType) +val builderValue = "CollectObjects_builderValue" + curId.getAndIncrement() +CollectObjects(loopValue, loopIsNull, elementType, function(loopVar), inputData, + collClass, builderValue) + } +} + +/** + * An equivalent to the [[MapObjects]] case class but returning an ObjectType containing + * a Scala collection constructed using the associated builder, obtained by calling `newBuilder` + * on the collection's companion object. + * + * @param loopValue the name of the loop variable that used when iterate the collection, and used + * as input for the `lambdaFunction` + * @param loopIsNull the nullity of the loop variable that used when iterate the collection, and + * used as input for the `lambdaFunction` + * @param loopVarDataType the data type of the loop variable that used when iterate the collection, + *and used as input for the `lambdaFunction` + * @param lambdaFunction A function that take the `loopVar` as input, and used as lambda function + * to handle collection elements. + * @param inputData An expression that when evaluated returns a collection object. + * @param collClass The type of the resulting collection. + * @param builderValue The name of the builder variable used to construct the resulting collection. + */ +case class CollectObjects private( --- End diff -- Yes, we actually can. I merged `CollectObjects` into `MapObjects` in my next commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16541 That seems to be the case here, yes. What about the other benefits I mentioned (adding support for Java `List`s and future Scala 2.13 compatibility)? I think the codegen is also more straightforward/clear (and much shorter). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16541 Well, technically yes. But I would say it's a little more than that. The current approach to deserialization of `Seq`s is to copy the data into an array, construct a `WrappedArray` (which extends `Seq`) and optionally copy the data (again) into the new collection. This needs to go through all the elements twice if anything other than a `Seq` (or `WrappedArray` directly) is requested. This PR takes a more straightforward approach and constructs a mutable collection builder (which is defined for every Scala collection), adds the elements to it and retrieves the result. I assumed this would be a performance improvement and am quite surprised that there is no difference. But I think this might be due to the improvement being so small as to drown in the usual operation overhead. Sadly, I do not have the resources to measure the operation on larger amounts of data, having the benchmarks fail on larger collections on my setup. I am also not that familiar with Spark's internals to determine whether @kiszk's suggestion would improve operations in other ways, eg. during operations in the cluster environment. That is why I decided to implement it but keep it separate from my proposed changes for the time being. As to other benefits that this PR would bring other than peformance improvements: * I would like to implement Java `List`s support in a manner similar to what I did with `Map`s in #16986 (I would also ask you to take a look at that when you have the time) by just slightly altering the code. * I didn't know this when I implemented this but Scala 2.13 will introduce a major [collections rework](https://www.scala-lang.org/blog/2017/02/28/collections-rework.html) that will change the way the `to` method (used for conversions in the current solution) works. This will require a rewrite whereas I believe the method used here will remain largely the same. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16541 Also please note the [UnsafeArrayData-producing branch](https://github.com/michalsenkyr/spark/compare/dataset-seq-builder...michalsenkyr:dataset-seq-builder-unsafe) that is not yet merged into this branch. I'd like to get somebody's opinion on that before I do it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16541 Would it be possible for somebody to review this PR for me? I have a few ideas that are dependent on this and I'd like to get to work on them. Most notably support for Java Lists. Maybe @cloud-fan could take a look at this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16986: [SPARK-18891][SQL] Support for Map collection types
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16986 Added support for Java Maps with support for pre-allocation (capacity argument on constructor) and sensible defaults for interfaces/abstract classes. Also includes implicit encoders. Updated codegen in description (only a cosmetic change) and added codegen for Java Map. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16986: [SPARK-18891][SQL] Support for Map collection typ...
GitHub user michalsenkyr opened a pull request: https://github.com/apache/spark/pull/16986 [SPARK-18891][SQL] Support for Map collection types ## What changes were proposed in this pull request? Add support for arbitrary Scala `Map` types in deserialization as well as a generic implicit encoder. Used the builder approach as in #16541 to construct any provided `Map` type upon deserialization. Please note that this PR also adds (ignored) tests for issue [SPARK-19104 CompileException with Map and Case Class in Spark 2.1.0](https://issues.apache.org/jira/browse/SPARK-19104) but doesn't solve it. Resulting codegen for `Seq(Map(1 -> 2)).toDS().map(identity).queryExecution.debug.codegen`: ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private boolean CollectObjectsToMap_loopIsNull1; /* 010 */ private int CollectObjectsToMap_loopValue0; /* 011 */ private boolean CollectObjectsToMap_loopIsNull3; /* 012 */ private int CollectObjectsToMap_loopValue2; /* 013 */ private UnsafeRow deserializetoobject_result; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 016 */ private scala.collection.immutable.Map mapelements_argValue; /* 017 */ private UnsafeRow mapelements_result; /* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 019 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 020 */ private UnsafeRow serializefromobject_result; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter; /* 024 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter1; /* 025 */ /* 026 */ public GeneratedIterator(Object[] references) { /* 027 */ this.references = references; /* 028 */ } /* 029 */ /* 030 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 031 */ partitionIndex = index; /* 032 */ this.inputs = inputs; /* 033 */ wholestagecodegen_init_0(); /* 034 */ wholestagecodegen_init_1(); /* 035 */ /* 036 */ } /* 037 */ /* 038 */ private void wholestagecodegen_init_0() { /* 039 */ inputadapter_input = inputs[0]; /* 040 */ /* 041 */ deserializetoobject_result = new UnsafeRow(1); /* 042 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32); /* 043 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 044 */ /* 045 */ mapelements_result = new UnsafeRow(1); /* 046 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32); /* 047 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 048 */ serializefromobject_result = new UnsafeRow(1); /* 049 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32); /* 050 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 051 */ this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 052 */ /* 053 */ } /* 054 */ /* 055 */ private void wholestagecodegen_init_1() { /* 056 */ this.serializefromobject_arrayWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 057 */ /* 058 */ } /* 059 */ /* 060 */ protected void processNext() throws java.io.IOException { /* 061 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 062 */ InternalRow inputadapter_row = (InternalRow) inputadapter
[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16541 Apologies for taking so long. I tried modifying the serialization logic as best as I could to serialize into `UnsafeArrayData` ([branch diff](https://github.com/michalsenkyr/spark/compare/dataset-seq-builder...michalsenkyr:dataset-seq-builder-unsafe)). I had to first convert into an array to use `fromPrimitiveArray` on the result. That's probably the reason why the benchmark came up slightly worse: ``` OpenJDK 64-Bit Server VM 1.8.0_121-b13 on Linux 4.9.6-1-ARCH AMD A10-4600M APU with Radeon(tm) HD Graphics collect: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative Seq256 / 287 0,0 255670,1 1,0X List 161 / 220 0,0 161091,7 1,6X mutable.Queue 304 / 324 0,0 303823,3 0,8X ``` I am not entirely sure how `GenericArrayData` and `UnsafeArrayData` is handled on transformations and shuffles though, so it's possible that more complex tests will reveal better performance. However, I'm not sure that I can test this properly on my single-machine setup. I'd definitely be interested in benchmark results on a cluster setup. Generated code: ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private boolean CollectObjects_loopIsNull1; /* 010 */ private int CollectObjects_loopValue0; /* 011 */ private UnsafeRow deserializetoobject_result; /* 012 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 013 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 014 */ private scala.collection.immutable.List mapelements_argValue; /* 015 */ private UnsafeRow mapelements_result; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 018 */ private scala.collection.immutable.List serializefromobject_argValue; /* 019 */ private UnsafeRow serializefromobject_result; /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter; /* 023 */ /* 024 */ public GeneratedIterator(Object[] references) { /* 025 */ this.references = references; /* 026 */ } /* 027 */ /* 028 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 029 */ partitionIndex = index; /* 030 */ this.inputs = inputs; /* 031 */ inputadapter_input = inputs[0]; /* 032 */ /* 033 */ deserializetoobject_result = new UnsafeRow(1); /* 034 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32); /* 035 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 036 */ /* 037 */ mapelements_result = new UnsafeRow(1); /* 038 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32); /* 039 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 040 */ /* 041 */ serializefromobject_result = new UnsafeRow(1); /* 042 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32); /* 043 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 044 */ this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 045 */ /* 046 */ } /* 047 */ /* 048 */ protected void
[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16541 I added the benchmarks based on the code you provided but I am getting almost the same results before and after the optimization (see description). So either the added benefit is really small or I didn't write/tune the benchmarks quite right. I would appreciate if you could take a look at them. I will also take a look at the `UnsafeArrayData` optimization later and try to include it in my next commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16541 Added benchmarks. I didn't find any standardized way of benchmarking codegen so I wrote a simple script for Spark Shell. Benchmarks were run on a laptop so the collections couldn't be too large. Nevertheless, the benchmarks are consistently (even if not significantly) faster. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16546: [WIP][SQL] Put check in ExpressionEncoder.fromRow...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16546#discussion_r95927063 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -120,6 +120,32 @@ object ScalaReflection extends ScalaReflection { } /** + * Returns the element type for Seq[_] and its subclass. + */ + def getElementTypeForSeq(t: `Type`): Option[`Type`] = { +if (!(t <:< localTypeOf[Seq[_]])) { + return None +} +val TypeRef(_, _, elementTypeList) = t --- End diff -- This would probably be better solved by using `match` and `typeParams`. Something like this: ```scala t match { case TypeRef(_, _, Seq(elementType)) => Some(elementType) case _ => t.baseClasses.find { c => val cType = c.asClass.toType cType <:< localTypeOf[Seq[_]] && cType.typeParams.nonEmpty }.map(t.baseType(_).typeParams.head) } ``` Also not sure whether types with more than one type parameter are handled correctly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16541: [SPARK-19088][SQL] Optimize sequence type deseria...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16541#discussion_r95923927 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -589,6 +590,171 @@ case class MapObjects private( } } +object CollectObjects { + private val curId = new java.util.concurrent.atomic.AtomicInteger() + + /** + * Construct an instance of CollectObjects case class. + * + * @param function The function applied on the collection elements. + * @param inputData An expression that when evaluated returns a collection object. + * @param elementType The data type of elements in the collection. + * @param collClass The type of the resulting collection. + */ + def apply( + function: Expression => Expression, + inputData: Expression, + elementType: DataType, + collClass: Class[_]): CollectObjects = { +val loopValue = "CollectObjects_loopValue" + curId.getAndIncrement() +val loopIsNull = "CollectObjects_loopIsNull" + curId.getAndIncrement() +val loopVar = LambdaVariable(loopValue, loopIsNull, elementType) +val builderValue = "CollectObjects_builderValue" + curId.getAndIncrement() +CollectObjects(loopValue, loopIsNull, elementType, function(loopVar), inputData, + collClass, builderValue) + } +} + +/** + * An equivalent to the [[MapObjects]] case class but returning an ObjectType containing + * a Scala collection constructed using the associated builder, obtained by calling `newBuilder` + * on the collection's companion object. + * + * @param loopValue the name of the loop variable that used when iterate the collection, and used + * as input for the `lambdaFunction` + * @param loopIsNull the nullity of the loop variable that used when iterate the collection, and + * used as input for the `lambdaFunction` + * @param loopVarDataType the data type of the loop variable that used when iterate the collection, + *and used as input for the `lambdaFunction` + * @param lambdaFunction A function that take the `loopVar` as input, and used as lambda function + * to handle collection elements. + * @param inputData An expression that when evaluated returns a collection object. + * @param collClass The type of the resulting collection. + * @param builderValue The name of the builder variable used to construct the resulting collection. + */ +case class CollectObjects private( +loopValue: String, +loopIsNull: String, +loopVarDataType: DataType, +lambdaFunction: Expression, +inputData: Expression, +collClass: Class[_], +builderValue: String) extends Expression with NonSQLExpression { + + override def nullable: Boolean = inputData.nullable + + override def children: Seq[Expression] = lambdaFunction :: inputData :: Nil + + override def eval(input: InternalRow): Any = +throw new UnsupportedOperationException("Only code-generated evaluation is supported") + + override def dataType: DataType = ObjectType(collClass) + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val collObjectName = s"${collClass.getName}$$.MODULE$$" +val getBuilderVar = s"$collObjectName.newBuilder()" --- End diff -- Actually, I would have to modify `serializeFor` as you did [here](https://github.com/apache/spark/pull/16546/files#diff-89643554d9757dd3e91abff1cc6096c7L520) in order to extract the required element type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16541: [SPARK-19088][SQL] Optimize sequence type deseria...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16541#discussion_r95921320 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -589,6 +590,171 @@ case class MapObjects private( } } +object CollectObjects { + private val curId = new java.util.concurrent.atomic.AtomicInteger() + + /** + * Construct an instance of CollectObjects case class. + * + * @param function The function applied on the collection elements. + * @param inputData An expression that when evaluated returns a collection object. + * @param elementType The data type of elements in the collection. + * @param collClass The type of the resulting collection. + */ + def apply( + function: Expression => Expression, + inputData: Expression, + elementType: DataType, + collClass: Class[_]): CollectObjects = { +val loopValue = "CollectObjects_loopValue" + curId.getAndIncrement() +val loopIsNull = "CollectObjects_loopIsNull" + curId.getAndIncrement() +val loopVar = LambdaVariable(loopValue, loopIsNull, elementType) +val builderValue = "CollectObjects_builderValue" + curId.getAndIncrement() +CollectObjects(loopValue, loopIsNull, elementType, function(loopVar), inputData, + collClass, builderValue) + } +} + +/** + * An equivalent to the [[MapObjects]] case class but returning an ObjectType containing + * a Scala collection constructed using the associated builder, obtained by calling `newBuilder` + * on the collection's companion object. + * + * @param loopValue the name of the loop variable that used when iterate the collection, and used + * as input for the `lambdaFunction` + * @param loopIsNull the nullity of the loop variable that used when iterate the collection, and + * used as input for the `lambdaFunction` + * @param loopVarDataType the data type of the loop variable that used when iterate the collection, + *and used as input for the `lambdaFunction` + * @param lambdaFunction A function that take the `loopVar` as input, and used as lambda function + * to handle collection elements. + * @param inputData An expression that when evaluated returns a collection object. + * @param collClass The type of the resulting collection. + * @param builderValue The name of the builder variable used to construct the resulting collection. + */ +case class CollectObjects private( +loopValue: String, +loopIsNull: String, +loopVarDataType: DataType, +lambdaFunction: Expression, +inputData: Expression, +collClass: Class[_], +builderValue: String) extends Expression with NonSQLExpression { + + override def nullable: Boolean = inputData.nullable + + override def children: Seq[Expression] = lambdaFunction :: inputData :: Nil + + override def eval(input: InternalRow): Any = +throw new UnsupportedOperationException("Only code-generated evaluation is supported") + + override def dataType: DataType = ObjectType(collClass) + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val collObjectName = s"${collClass.getName}$$.MODULE$$" +val getBuilderVar = s"$collObjectName.newBuilder()" --- End diff -- I added the `Seq` builder fallback. However, there is presently no collection that Spark supports that doesn't provide a builder. You can try it out on your branch with `Range`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16541: [SPARK-19088][SQL] Optimize sequence type deseria...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16541#discussion_r95909808 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -589,6 +590,171 @@ case class MapObjects private( } } +object CollectObjects { + private val curId = new java.util.concurrent.atomic.AtomicInteger() + + /** + * Construct an instance of CollectObjects case class. + * + * @param function The function applied on the collection elements. + * @param inputData An expression that when evaluated returns a collection object. + * @param elementType The data type of elements in the collection. + * @param collClass The type of the resulting collection. + */ + def apply( + function: Expression => Expression, + inputData: Expression, + elementType: DataType, + collClass: Class[_]): CollectObjects = { +val loopValue = "CollectObjects_loopValue" + curId.getAndIncrement() +val loopIsNull = "CollectObjects_loopIsNull" + curId.getAndIncrement() +val loopVar = LambdaVariable(loopValue, loopIsNull, elementType) +val builderValue = "CollectObjects_builderValue" + curId.getAndIncrement() +CollectObjects(loopValue, loopIsNull, elementType, function(loopVar), inputData, + collClass, builderValue) + } +} + +/** + * An equivalent to the [[MapObjects]] case class but returning an ObjectType containing + * a Scala collection constructed using the associated builder, obtained by calling `newBuilder` + * on the collection's companion object. + * + * @param loopValue the name of the loop variable that used when iterate the collection, and used + * as input for the `lambdaFunction` + * @param loopIsNull the nullity of the loop variable that used when iterate the collection, and + * used as input for the `lambdaFunction` + * @param loopVarDataType the data type of the loop variable that used when iterate the collection, + *and used as input for the `lambdaFunction` + * @param lambdaFunction A function that take the `loopVar` as input, and used as lambda function + * to handle collection elements. + * @param inputData An expression that when evaluated returns a collection object. + * @param collClass The type of the resulting collection. + * @param builderValue The name of the builder variable used to construct the resulting collection. + */ +case class CollectObjects private( +loopValue: String, +loopIsNull: String, +loopVarDataType: DataType, +lambdaFunction: Expression, +inputData: Expression, +collClass: Class[_], +builderValue: String) extends Expression with NonSQLExpression { + + override def nullable: Boolean = inputData.nullable + + override def children: Seq[Expression] = lambdaFunction :: inputData :: Nil + + override def eval(input: InternalRow): Any = +throw new UnsupportedOperationException("Only code-generated evaluation is supported") + + override def dataType: DataType = ObjectType(collClass) + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val collObjectName = s"${collClass.getName}$$.MODULE$$" +val getBuilderVar = s"$collObjectName.newBuilder()" --- End diff -- Interesting. I didn't realize `Range` breaks that rule. Furthermore, it would be practically impossible to deserialize back into `Range`. So I guess the best way to do this will be to use a general `Seq` builder if the collection doesn't provide its own. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16541 Added codegen comparison for a simple `List` dataset. I will also prepare a benchmark and add some results later. Those will be for `List`, `mutable.Queue` and `Seq`. Where `List` and `mutable.Queue` should benefit from the change (one less pass) and `Seq` should be approximately same (as it is a supertype of `WrappedArray` and therefore skips the final conversion in the original approach). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16541: [SPARK-19088][SQL] Optimize sequence type deseria...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16541#discussion_r95667424 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -589,6 +590,171 @@ case class MapObjects private( } } +object CollectObjects { + private val curId = new java.util.concurrent.atomic.AtomicInteger() + + /** + * Construct an instance of CollectObjects case class. + * + * @param function The function applied on the collection elements. + * @param inputData An expression that when evaluated returns a collection object. + * @param elementType The data type of elements in the collection. + * @param collClass The type of the resulting collection. + */ + def apply( + function: Expression => Expression, + inputData: Expression, + elementType: DataType, + collClass: Class[_]): CollectObjects = { +val loopValue = "CollectObjects_loopValue" + curId.getAndIncrement() +val loopIsNull = "CollectObjects_loopIsNull" + curId.getAndIncrement() +val loopVar = LambdaVariable(loopValue, loopIsNull, elementType) +val builderValue = "CollectObjects_builderValue" + curId.getAndIncrement() +CollectObjects(loopValue, loopIsNull, elementType, function(loopVar), inputData, + collClass, builderValue) + } +} + +/** + * An equivalent to the [[MapObjects]] case class but returning an ObjectType containing + * a Scala collection constructed using the associated builder, obtained by calling `newBuilder` + * on the collection's companion object. + * + * @param loopValue the name of the loop variable that used when iterate the collection, and used + * as input for the `lambdaFunction` + * @param loopIsNull the nullity of the loop variable that used when iterate the collection, and + * used as input for the `lambdaFunction` + * @param loopVarDataType the data type of the loop variable that used when iterate the collection, + *and used as input for the `lambdaFunction` + * @param lambdaFunction A function that take the `loopVar` as input, and used as lambda function + * to handle collection elements. + * @param inputData An expression that when evaluated returns a collection object. + * @param collClass The type of the resulting collection. + * @param builderValue The name of the builder variable used to construct the resulting collection. + */ +case class CollectObjects private( +loopValue: String, +loopIsNull: String, +loopVarDataType: DataType, +lambdaFunction: Expression, +inputData: Expression, +collClass: Class[_], +builderValue: String) extends Expression with NonSQLExpression { + + override def nullable: Boolean = inputData.nullable + + override def children: Seq[Expression] = lambdaFunction :: inputData :: Nil + + override def eval(input: InternalRow): Any = +throw new UnsupportedOperationException("Only code-generated evaluation is supported") + + override def dataType: DataType = ObjectType(collClass) + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val collObjectName = s"${collClass.getName}$$.MODULE$$" +val getBuilderVar = s"$collObjectName.newBuilder()" --- End diff -- Yes, it does. As [`TraversableLike` documentation](http://www.scala-lang.org/api/2.12.1/scala/collection/TraversableLike.html) specifies: > Collection classes mixing in this trait ... also need to provide a method **newBuilder** which creates a builder for collections of the same kind. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16541: [SPARK-19088][SQL] Optimize sequence type deserializatio...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16541 Also, the new `CollectObjects` copies quite a bit of code from `MapObjects`. Should I move the code into a common trait in order to reduce duplicity or should I leave it as is? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16541: [SPARK-19088][SQL] Optimize sequence type deseria...
GitHub user michalsenkyr opened a pull request: https://github.com/apache/spark/pull/16541 [SPARK-19088][SQL] Optimize sequence type deserialization codegen ## What changes were proposed in this pull request? Optimization of arbitrary Scala sequence deserialization introduced by #16240. The previous implementation constructed an array which was then converted by `to`. This required two passes in most cases. This implementation attempts to remedy that by using `Builder`s provided by the `newBuilder` method on every Scala collection's companion object to build the resulting collection directly. ## How was this patch tested? ```bash ./build/mvn -DskipTests clean package && ./dev/run-tests ``` Additionally in Spark Shell: ```scala case class QueueClass(q: scala.collection.immutable.Queue[Int]) spark.createDataset(Seq(List(1,2,3))).map(x => QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/michalsenkyr/spark dataset-seq-builder Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16541.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16541 commit 4aaef15844848e16dc68d4be4d0a013c8ddcd7d7 Author: Michal Senkyr <mike.sen...@gmail.com> Date: 2017-01-10T23:00:34Z Rewrote sequence deserialization implementation to use builders --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16240 Not sure how to run MiMa tests locally so I tried my best to figure out what was necessary. Hope this fixes it. The downside of the fix is that I had to restore the original methods in `SQLImplicits`. I removed the `implicit` keyword and added deprecation annotations as only the new methods should be used from now on. Old code importing the methods explicitly should be fine now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16240: [SPARK-16792][SQL] Dataset containing a Case Clas...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16240#discussion_r94504343 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala --- @@ -130,6 +130,30 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { checkDataset(Seq(Array(Tuple1(1))).toDS(), Array(Tuple1(1))) } + test("arbitrary sequences") { --- End diff -- I added some sequence-product combination tests. Nested sequences were never supported (tried on master and 2.0.2). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16240: [SPARK-16792][SQL] Dataset containing a Case Clas...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16240#discussion_r93816269 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala --- @@ -100,31 +97,36 @@ abstract class SQLImplicits { // Seqs /** @since 1.6.1 */ - implicit def newIntSeqEncoder: Encoder[Seq[Int]] = ExpressionEncoder() + implicit def newIntSeqEncoder[T <: Seq[Int] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newLongSeqEncoder: Encoder[Seq[Long]] = ExpressionEncoder() + implicit def newLongSeqEncoder[T <: Seq[Long] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newDoubleSeqEncoder: Encoder[Seq[Double]] = ExpressionEncoder() + implicit def newDoubleSeqEncoder[T <: Seq[Double] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newFloatSeqEncoder: Encoder[Seq[Float]] = ExpressionEncoder() + implicit def newFloatSeqEncoder[T <: Seq[Float] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newByteSeqEncoder: Encoder[Seq[Byte]] = ExpressionEncoder() + implicit def newByteSeqEncoder[T <: Seq[Byte] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newShortSeqEncoder: Encoder[Seq[Short]] = ExpressionEncoder() + implicit def newShortSeqEncoder[T <: Seq[Short] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newBooleanSeqEncoder: Encoder[Seq[Boolean]] = ExpressionEncoder() + implicit def newBooleanSeqEncoder[T <: Seq[Boolean] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newStringSeqEncoder: Encoder[Seq[String]] = ExpressionEncoder() + implicit def newStringSeqEncoder[T <: Seq[String] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newProductSeqEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = ExpressionEncoder() + implicit def newProductSeqEncoder[A <: Product, T <: Seq[A] : TypeTag]: Encoder[T] = --- End diff -- You are right. `Seq` is declared covariant on `T` so it works and solves all the problems I was having. Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16240: [SPARK-16792][SQL] Dataset containing a Case Clas...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16240#discussion_r93807181 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -312,12 +312,46 @@ object ScalaReflection extends ScalaReflection { "array", ObjectType(classOf[Array[Any]])) -StaticInvoke( +val wrappedArray = StaticInvoke( scala.collection.mutable.WrappedArray.getClass, ObjectType(classOf[Seq[_]]), "make", array :: Nil) +if (localTypeOf[scala.collection.mutable.WrappedArray[_]] <:< t.erasure) { + wrappedArray +} else { + // Convert to another type using `to` + val cls = mirror.runtimeClass(t.typeSymbol.asClass) + import scala.collection.generic.CanBuildFrom + import scala.reflect.ClassTag + import scala.util.{Try, Success} --- End diff -- Done. I tried looking up the code style you mentioned, but only found the [Databricks' Scala Code Style Guide](https://github.com/databricks/scala-style-guide#exception-handling-try-vs-try). And that is not mentioned in the Spark docs as far as I know. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16240: [SPARK-16792][SQL] Dataset containing a Case Clas...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16240#discussion_r93805236 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala --- @@ -100,31 +97,36 @@ abstract class SQLImplicits { // Seqs /** @since 1.6.1 */ - implicit def newIntSeqEncoder: Encoder[Seq[Int]] = ExpressionEncoder() + implicit def newIntSeqEncoder[T <: Seq[Int] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newLongSeqEncoder: Encoder[Seq[Long]] = ExpressionEncoder() + implicit def newLongSeqEncoder[T <: Seq[Long] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newDoubleSeqEncoder: Encoder[Seq[Double]] = ExpressionEncoder() + implicit def newDoubleSeqEncoder[T <: Seq[Double] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newFloatSeqEncoder: Encoder[Seq[Float]] = ExpressionEncoder() + implicit def newFloatSeqEncoder[T <: Seq[Float] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newByteSeqEncoder: Encoder[Seq[Byte]] = ExpressionEncoder() + implicit def newByteSeqEncoder[T <: Seq[Byte] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newShortSeqEncoder: Encoder[Seq[Short]] = ExpressionEncoder() + implicit def newShortSeqEncoder[T <: Seq[Short] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newBooleanSeqEncoder: Encoder[Seq[Boolean]] = ExpressionEncoder() + implicit def newBooleanSeqEncoder[T <: Seq[Boolean] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newStringSeqEncoder: Encoder[Seq[String]] = ExpressionEncoder() + implicit def newStringSeqEncoder[T <: Seq[String] : TypeTag]: Encoder[T] = ExpressionEncoder() /** @since 1.6.1 */ - implicit def newProductSeqEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = ExpressionEncoder() + implicit def newProductSeqEncoder[A <: Product : TypeTag, T <: Seq[A] : TypeTag]: Encoder[T] = --- End diff -- This one is the same as all the other ones, just with Product subclasses. If you were concerned about the `TypeTag` on `A`, it was actually not needed as `T`'s tag already contains all the information. I just tested it to be sure and removed it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16240 I actually read that but IDEA complained when I tried to place the `Product` encoder into a separate trait. So I opted for specificity. However, I tried it again right now and even though IDEA still complains, scalac compiles it, all the tests pass and it all works. So I will change it. I will also test whether this fixes the `toDS`-specific problems before pushing the changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16329#discussion_r93682192 --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql + +// $example on:typed_custom_aggregation$ +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.SparkSession +// $example off:typed_custom_aggregation$ + +object UserDefinedTypedAggregation { + + // $example on:typed_custom_aggregation$ + case class Employee(name: String, salary: Long) + case class Average(var sum: Long, var count: Long) + + object MyAverage extends Aggregator[Employee, Average, Double] { +// A zero value for this aggregation. Should satisfy the property that any b + zero = b +def zero: Average = Average(0L, 0L) +// Combine two values to produce a new value. For performance, the function may modify `buffer` +// and return it instead of constructing a new object +def reduce(buffer: Average, employee: Employee): Average = { + buffer.sum += employee.salary + buffer.count += 1 + buffer +} +// Merge two intermediate values +def merge(b1: Average, b2: Average): Average = Average(b1.sum + b2.sum, b1.count + b2.count) --- End diff -- Personally, I prefer consistency. When I saw this, I immediately wondered whether there is a specific reason you did it this way. I'd rather see both methods use the same paradigm. In this case probably the immutable one as the option of mutability is already mentioned in the comment above. Or you can mention it again in the comment on this method if you want to provide examples of both. This way it just seems a little confusing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL progra...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16329 If you are having trouble building Javadoc, try switching to Java 7 temporarily. Java 8 introduced stricter Javadoc rules that may fail the docs build. Unfortunately Jenkins doesn't, so new errors get introduced over time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16240 None of them. The compilation will fail. That is why I had to provide those additional implicits. ``` scala> class Test[T] defined class Test scala> implicit def test1[T <: Seq[String]]: Test[T] = null test1: [T <: Seq[String]]=> Test[T] scala> implicit def test2[T <: Product]: Test[T] = null test2: [T <: Product]=> Test[T] scala> def test[T : Test](t: T) = null test: [T](t: T)(implicit evidence$1: Test[T])Null scala> test(List("abc")) :31: error: ambiguous implicit values: both method test1 of type [T <: Seq[String]]=> Test[T] and method test2 of type [T <: Product]=> Test[T] match expected type Test[List[String]] test(List("abc")) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16157: [SPARK-18723][DOC] Expanded programming guide informatio...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16157 Sorry for the delay. You are probably right that the partitioning is primarily determined by data locality and that it is therefore appropriate in some cases and shouldn't be worded such as to imply it isn't usually appropriate. I will think about how to word it more appropriately and remove the implementation specifics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16240 Possible optimization: Instead of conversions using `to`, we can use `Builder`s. This way we could get rid of the conversion overhead. This would require adding a new codegen method that would operate similarly to `MapObjects` but use a provided `Builder` to build the collection directly. I will wait for a response to this PR before attempting any more modifications. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16240 Added support for arbitrary sequences. Now also Queues, ArrayBuffers and such can be used in datasets (all are serialized into ArrayType). I had to alter and add new implicit encoders into `SQLImplicits`. The new encoders are for `Seq` with `Product` combination (essentially only `List`) to disambiguate between `Seq` and `Product` encoders. However, I encountered a problem with implicits. When constructing a complex Dataset using `Seq.toDS` that includes a `Product` (like a case class) and a sequence, the encoder doesn't seem to be created. When constructed with `spark.createDataset` or when transforming an existing dataset, there is no problem. I added a workaround by defining a specific implicit just for `Seq`s. This makes the problem go away for existing usages, however other collections cannot be constructed by `Seq.toDS` unless `newProductSeqEncoder[A, T]` is created with the correct type parameters. If anybody knows how to fix this, let me know. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16240: [SPARK-16792][SQL] Dataset containing a Case Class with ...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16240 I would like to add that the conversion is specific to `List[_]`. I can add support for arbitrary sequence types through the use of `CanBuildFrom` if it is desirable. We can also support `Set`s in this way by serializing into arrays. See SPARK-17414 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16240: [SPARK-16792][SQL] Dataset containing a Case Clas...
GitHub user michalsenkyr opened a pull request: https://github.com/apache/spark/pull/16240 [SPARK-16792][SQL] Dataset containing a Case Class with a List type causes a CompileException (converting sequence to list) ## What changes were proposed in this pull request? Added a `toList` call at the end of the code generated by `ScalaReflection.deserializerFor` if the requested type is `List[_]` Care was taken to preserve the original deserialization for `Seq[_]` to avoid the overhead of `toList` in cases where it is not needed Also fixes [SPARK-16815] Dataset[List[T]] leads to ArrayStoreException ## How was this patch tested? ```bash ./build/mvn -DskipTests clean package && ./dev/run-tests ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/michalsenkyr/spark sql-caseclass-list-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16240.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16240 commit c47f1895711d5093e4e677ad7ac5f02fe9eb3b61 Author: Michal Senkyr <mike.sen...@gmail.com> Date: 2016-12-09T22:36:49Z Added call to toList if deserializing into List commit 8c15b475fb053aef19906d6a465309d299ca7b4d Author: Michal Senkyr <mike.sen...@gmail.com> Date: 2016-12-09T23:30:49Z Added unit test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16157: [SPARK-18723][DOC] Expanded programming guide inf...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16157#discussion_r91716176 --- Diff: docs/programming-guide.md --- @@ -347,7 +347,7 @@ Some notes on reading files with Spark: Apart from text files, Spark's Scala API also supports several other data formats: -* `SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file. +* `SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file. It takes an optional second argument for controlling the minimal number of partitions (by default this is 2). It uses [CombineFileInputFormat](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.html) internally in order to process large numbers of small files effectively by grouping files on the same executor into a single partition. This can lead to sub-optimal partitioning when the file sets would benefit from residing in multiple partitions (e.g., larger partitions would not fit in memory, files are replicated but a large subset is locally reachable from a single executor, subsequent transformations would benefit from multi-core processing). In those cases, set the `minPartitions` argume nt to enforce splitting. --- End diff -- Yes, it is different in what the elements are. However, there is no indication that the partitioning policy differs that much. I always understood `textFile`'s partitioning policy as "we will split it if we can" up to individual blocks. `wholeTextFiles`' partitioning seems to be more like "we will merge it if we can" up to the executor boundary. The polar opposite. This manifests in the way the developer has to think about handling partitions. In `textFile` it is generally safe to let Spark figure it all out without sacrificing much in performance, whereas in `wholeTextFiles` you may frequently run into performance problems due to having too few partitions. An alternate solution in this case would be to unite the partitioning policies of the `textFile` and `wholeTextFiles` methods by having the latter also "split if we can". In this case up to the individual files (presently achievable by setting minPartitions to an arbitrary large number). Therefore each file would, by default, have its own partition. However, this approach would mean a significant change which might break existing applications. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16201: [SPARK-3359][DOCS] Fix greater-than symbols in Ja...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16201#discussion_r91429780 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala --- @@ -81,7 +81,7 @@ class DecisionTreeClassifier @Since("1.4.0") ( * E.g. 10 means that the cache will get checkpointed every 10 iterations. * This is only used if cacheNodeIds is true and if the checkpoint directory is set in * [[org.apache.spark.SparkContext]]. - * Must be >= 1. + * Must be {@literal >= 1}. --- End diff -- Yes, I read about that but figured that the literal text is probably meant to be a logical part of code. Which, in this case, is `>= 1`. Making just `>` literal would make sense if you want to minimize the cosmetic differences between the characters. However, as I did not know whether the resulting code block would retain the same styling in the future, I opted for logical separation. If scaladoc later changed the styling of literals to, for example, the GitHub style for code, the result would look very bad. `>= 1` would still look relatively good. But if the `{@literal >}` style is widely used in the Spark project, I can change it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16201: [SPARK-3359][DOCS] Fix greater-than symbols in Javadoc t...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16201 This time I inspected both the generated Javadoc and Scaladoc. It should be fine now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16201: [SPARK-3359][DOCS] Fix grater-than symbols in Jav...
GitHub user michalsenkyr opened a pull request: https://github.com/apache/spark/pull/16201 [SPARK-3359][DOCS] Fix grater-than symbols in Javadoc to allow building with Java 8 ## What changes were proposed in this pull request? The API documentation build was failing when using Java 8 due to incorrect character `>` in Javadoc. Replace `>` with `` in Javadoc to allow the build to pass. ## How was this patch tested? Documentation was built and inspected manually to ensure it still displays correctly in the browser ``` cd docs && jekyll serve ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/michalsenkyr/spark javadoc8-gt-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16201.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16201 commit 84a7163184e88b827328557c5c9e8882558afef7 Author: Michal Senkyr <mike.sen...@gmail.com> Date: 2016-12-07T23:56:00Z Replace > with --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16157: [SPARK-18723][DOC] Expanded programming guide inf...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16157#discussion_r91405625 --- Diff: docs/programming-guide.md --- @@ -347,7 +347,7 @@ Some notes on reading files with Spark: Apart from text files, Spark's Scala API also supports several other data formats: -* `SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file. +* `SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file. It takes an optional second argument for controlling the minimal number of partitions (by default this is 2). It uses [CombineFileInputFormat](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.html) internally in order to process large numbers of small files effectively by grouping files on the same executor into a single partition. This can lead to sub-optimal partitioning when the file sets would benefit from residing in multiple partitions (e.g., larger partitions would not fit in memory, files are replicated but a large subset is locally reachable from a single executor, subsequent transformations would benefit from multi-core processing). In those cases, set the `minPartitions` argume nt to enforce splitting. --- End diff -- Also, the number of files in this case is irrelevant. Even with, say, 5 large files we got only 2 partitions by default. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16157: [SPARK-18723][DOC] Expanded programming guide informatio...
Github user michalsenkyr commented on the issue: https://github.com/apache/spark/pull/16157 I added a few more sentences describing the cases in which the user might want to use the argument. However, I am afraid this might be a little too descriptive. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16157: [SPARK-18723][DOC] Expanded programming guide inf...
Github user michalsenkyr commented on a diff in the pull request: https://github.com/apache/spark/pull/16157#discussion_r90975179 --- Diff: docs/programming-guide.md --- @@ -347,7 +347,7 @@ Some notes on reading files with Spark: Apart from text files, Spark's Scala API also supports several other data formats: -* `SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file. +* `SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file. It takes an optional second argument for controlling the minimal number of partitions (by default this is 2). It uses [CombineFileInputFormat](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.html) internally in order to process large numbers of small files effectively by grouping files on the same node into a single split. (This can lead to non-optimal partitioning. It is therefore advisable to set the minimal number of partitions explicitly.) --- End diff -- Yes, you are right. A 'node' should probably be an 'executor'. Also 'split' should probably be replaced by 'partition' to prevent confusion. I used Hadoop terminology because it is used in the linked Hadoop documentation, but Spark terminology may be more appropriate here. As I understand it, this behavior is guaranteed as setting minPartitions directly sets maxSplitSize by dividing the length of the file ([code](https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala#L57)). Please note that [minSplitSize](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.html#setMinSplitSizeNode-long-) is only used for leftover data. As the programming guide already contains quite a detailed description of partitioning on `textFile`, I thought it would make sense to mention it in `wholeTextFiles` as well. Especially when the behavior differs that much. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16157: [SPARK-18723][DOC] Expanded programming guide inf...
GitHub user michalsenkyr opened a pull request: https://github.com/apache/spark/pull/16157 [SPARK-18723][DOC] Expanded programming guide information on wholeTex⦠## What changes were proposed in this pull request? Add additional information to wholeTextFiles in the Programming Guide. Also explain partitioning policy difference in relation to textFile and its impact on performance. Also added reference to the underlying CombineFileInputFormat ## How was this patch tested? Manual build of documentation and inspection in browser ``` cd docs SKIP_API=1 jekyll serve --watch ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/michalsenkyr/spark wholeTextFilesExpandedDocs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16157.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16157 commit 487710b79ffbed8eccbd531531149a838eb2fbb2 Author: Michal Senkyr <mike.sen...@gmail.com> Date: 2016-12-05T21:16:23Z [SPARK-18723][DOC] Expanded programming guide information on wholeTextFiles ## What changes were proposed in this pull request? Add additional information to wholeTextFiles in the Programming Guide. Also explain partitioning policy difference in relation to textFile and its impact on performance. Also added reference to the underlying CombineFileInputFormat ## How was this patch tested? Manual build of documentation and inspection in browser ``` cd docs SKIP_API=1 jekyll serve --watch ``` Author: Michal Senkyr <mike.sen...@gmail.com> --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org