[GitHub] spark pull request #18416: [SPARK-21204][SQL][WIP] Add support for Scala Set...

2017-06-28 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18416#discussion_r124712549
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala ---
@@ -339,6 +340,28 @@ class DatasetPrimitiveSuite extends QueryTest with 
SharedSQLContext {
   LHMapClass(LHMap(1 -> 2)) -> LHMap("test" -> MapClass(Map(3 -> 4
   }
 
+  test("arbitrary sets") {
--- End diff --

Added a test for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18416: [SPARK-21204][SQL][WIP] Add support for Scala Set...

2017-06-28 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18416#discussion_r124712535
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -834,6 +834,140 @@ case class CollectObjectsToMap private(
   }
 }
 
+object CollectObjectsToSet {
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+
+  /**
+   * Construct an instance of CollectObjectsToSet case class.
+   *
+   * @param function The function applied on the collection elements.
+   * @param inputData An expression that when evaluated returns a 
collection object.
+   * @param collClass The type of the resulting collection.
+   */
+  def apply(
+  function: Expression => Expression,
+  inputData: Expression,
+  collClass: Class[_]): CollectObjectsToSet = {
+val id = curId.getAndIncrement()
+val loopValue = s"CollectObjectsToSet_loopValue$id"
+val loopIsNull = s"CollectObjectsToSet_loopIsNull$id"
+val arrayType = inputData.dataType.asInstanceOf[ArrayType]
+val loopVar = LambdaVariable(loopValue, loopIsNull, 
arrayType.elementType)
+CollectObjectsToSet(
+  loopValue, loopIsNull, function(loopVar), inputData, collClass)
+  }
+}
+
+/**
+ * Expression used to convert a Catalyst Array to an external Scala `Set`.
+ * The collection is constructed using the associated builder, obtained by 
calling `newBuilder`
+ * on the collection's companion object.
+ *
+ * Notice that when we convert a Catalyst array which contains duplicated 
elements to an external
+ * Scala `Set`, the elements will be de-duplicated.
+ *
+ * @param loopValue the name of the loop variable that is used when 
iterating over the value
+ *   collection, and which is used as input for the 
`lambdaFunction`
+ * @param loopIsNull the nullability of the loop variable that is used 
when iterating over
+ *the value collection, and which is used as input 
for the
+ *`lambdaFunction`
+ * @param lmbdaFunction A function that takes the `loopValue` as input, 
and is used as
+ *a lambda function to handle collection 
elements.
+ * @param inputData An expression that when evaluated returns an array 
object.
+ * @param collClass The type of the resulting collection.
+ */
+case class CollectObjectsToSet private(
+loopValue: String,
+loopIsNull: String,
+lambdaFunction: Expression,
+inputData: Expression,
+collClass: Class[_]) extends Expression with NonSQLExpression {
+
+  override def nullable: Boolean = inputData.nullable
+
+  override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
+
+  override def eval(input: InternalRow): Any =
+throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+
+  override def dataType: DataType = ObjectType(collClass)
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+// The data with PythonUserDefinedType are actually stored with the 
data type of its sqlType.
+def inputDataType(dataType: DataType) = dataType match {
+  case p: PythonUserDefinedType => p.sqlType
+  case _ => dataType
+}
+
+val arrayType = 
inputDataType(inputData.dataType).asInstanceOf[ArrayType]
+val loopValueJavaType = ctx.javaType(arrayType.elementType)
+ctx.addMutableState("boolean", loopIsNull, "")
+ctx.addMutableState(loopValueJavaType, loopValue, "")
+val genFunction = lambdaFunction.genCode(ctx)
+
+val genInputData = inputData.genCode(ctx)
+val dataLength = ctx.freshName("dataLength")
+val loopIndex = ctx.freshName("loopIndex")
+val builderValue = ctx.freshName("builderValue")
+
+val getLength = s"${genInputData.value}.numElements()"
+val getLoopVar = ctx.getValue(genInputData.value, 
arrayType.elementType, loopIndex)
+
+// Make a copy of the data if it's unsafe-backed
+def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) =
+  s"$value instanceof ${clazz.getSimpleName}? $value.copy() : $value"
+val genFunctionValue =
+  lambdaFunction.dataType match {
+case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], 
genFunction.value)
+case ArrayType(_, _) => 
makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value)
+case MapType(_, _, _) => 
makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value)
+case _ => genFunction.value
+  }
+
+val loopNullCheck = s"$loopIsNull = 
${genInputData.value}.isNullAt($loopIndex);"

[GitHub] spark pull request #18416: [SPARK-21204][SQL][WIP] Add support for Scala Set...

2017-06-27 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18416#discussion_r124332920
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala ---
@@ -339,6 +340,28 @@ class DatasetPrimitiveSuite extends QueryTest with 
SharedSQLContext {
   LHMapClass(LHMap(1 -> 2)) -> LHMap("test" -> MapClass(Map(3 -> 4
   }
 
+  test("arbitrary sets") {
--- End diff --

Better to test null cases?
```
Seq(Seq(Some(1), None), Seq(Some(2))).toDF("c").as[Set[Int]]
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18416: [SPARK-21204][SQL][WIP] Add support for Scala Set...

2017-06-27 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18416#discussion_r124332023
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -834,6 +834,140 @@ case class CollectObjectsToMap private(
   }
 }
 
+object CollectObjectsToSet {
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+
+  /**
+   * Construct an instance of CollectObjectsToSet case class.
+   *
+   * @param function The function applied on the collection elements.
+   * @param inputData An expression that when evaluated returns a 
collection object.
+   * @param collClass The type of the resulting collection.
+   */
+  def apply(
+  function: Expression => Expression,
+  inputData: Expression,
+  collClass: Class[_]): CollectObjectsToSet = {
+val id = curId.getAndIncrement()
+val loopValue = s"CollectObjectsToSet_loopValue$id"
+val loopIsNull = s"CollectObjectsToSet_loopIsNull$id"
+val arrayType = inputData.dataType.asInstanceOf[ArrayType]
+val loopVar = LambdaVariable(loopValue, loopIsNull, 
arrayType.elementType)
+CollectObjectsToSet(
+  loopValue, loopIsNull, function(loopVar), inputData, collClass)
+  }
+}
+
+/**
+ * Expression used to convert a Catalyst Array to an external Scala `Set`.
+ * The collection is constructed using the associated builder, obtained by 
calling `newBuilder`
+ * on the collection's companion object.
+ *
+ * Notice that when we convert a Catalyst array which contains duplicated 
elements to an external
+ * Scala `Set`, the elements will be de-duplicated.
+ *
+ * @param loopValue the name of the loop variable that is used when 
iterating over the value
+ *   collection, and which is used as input for the 
`lambdaFunction`
+ * @param loopIsNull the nullability of the loop variable that is used 
when iterating over
+ *the value collection, and which is used as input 
for the
+ *`lambdaFunction`
+ * @param lmbdaFunction A function that takes the `loopValue` as input, 
and is used as
+ *a lambda function to handle collection 
elements.
+ * @param inputData An expression that when evaluated returns an array 
object.
+ * @param collClass The type of the resulting collection.
+ */
+case class CollectObjectsToSet private(
+loopValue: String,
+loopIsNull: String,
+lambdaFunction: Expression,
+inputData: Expression,
+collClass: Class[_]) extends Expression with NonSQLExpression {
+
+  override def nullable: Boolean = inputData.nullable
+
+  override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
+
+  override def eval(input: InternalRow): Any =
+throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+
+  override def dataType: DataType = ObjectType(collClass)
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+// The data with PythonUserDefinedType are actually stored with the 
data type of its sqlType.
+def inputDataType(dataType: DataType) = dataType match {
+  case p: PythonUserDefinedType => p.sqlType
+  case _ => dataType
+}
+
+val arrayType = 
inputDataType(inputData.dataType).asInstanceOf[ArrayType]
+val loopValueJavaType = ctx.javaType(arrayType.elementType)
+ctx.addMutableState("boolean", loopIsNull, "")
+ctx.addMutableState(loopValueJavaType, loopValue, "")
+val genFunction = lambdaFunction.genCode(ctx)
+
+val genInputData = inputData.genCode(ctx)
+val dataLength = ctx.freshName("dataLength")
+val loopIndex = ctx.freshName("loopIndex")
+val builderValue = ctx.freshName("builderValue")
+
+val getLength = s"${genInputData.value}.numElements()"
+val getLoopVar = ctx.getValue(genInputData.value, 
arrayType.elementType, loopIndex)
+
+// Make a copy of the data if it's unsafe-backed
+def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) =
+  s"$value instanceof ${clazz.getSimpleName}? $value.copy() : $value"
+val genFunctionValue =
+  lambdaFunction.dataType match {
+case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], 
genFunction.value)
+case ArrayType(_, _) => 
makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value)
+case MapType(_, _, _) => 
makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value)
+case _ => genFunction.value
+  }
+
+val loopNullCheck = s"$loopIsNull = 
${genInputData.value}.isNullAt($loopIndex);"

[GitHub] spark pull request #18416: [SPARK-21204][SQL][WIP] Add support for Scala Set...

2017-06-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18416#discussion_r123920728
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -992,6 +1123,128 @@ case class ExternalMapToCatalyst private(
   }
 }
 
+object ExternalSetToCatalystArray {
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+
+  def apply(
+  inputSet: Expression,
+  elementType: DataType,
+  elementConverter: Expression => Expression,
+  elementNullable: Boolean): ExternalSetToCatalystArray = {
+val id = curId.getAndIncrement()
+val elementName = "ExternalSetToCatalystArray_element" + id
+val elementIsNull = "ExternalSetToCatalystArray_element_isNull" + id
+
+ExternalSetToCatalystArray(
+  elementName,
+  elementIsNull,
+  elementType,
+  elementConverter(LambdaVariable(elementName, elementIsNull, 
elementType, elementNullable)),
+  inputSet
+)
+  }
+}
+
+/**
+ * Converts a Scala/Java set object into catalyst array format, by 
applying the converter when
+ * iterate the set.
+ *
+ * @param element the name of the set element variable that used when 
iterate the set, and used as
+ *input for the `elementConverter`
+ * @param elementIsNull the nullability of the element variable that used 
when iterate the set, and
+ *used as input for the `elementConverter`
+ * @param elementType the data type of the element variable that used when 
iterate the set, and
+ *  used as input for the `elementConverter`
+ * @param elementConverter A function that take the `element` as input, 
and converts it to catalyst
+ *   array format.
+ * @param child An expression that when evaluated returns the input set 
object.
+ */
+case class ExternalSetToCatalystArray private(
+element: String,
+elementIsNull: String,
+elementType: DataType,
+elementConverter: Expression,
+child: Expression)
+  extends UnaryExpression with NonSQLExpression {
+
+  override def foldable: Boolean = false
+
+  override def dataType: ArrayType = ArrayType(
+elementType = elementConverter.dataType, containsNull = 
elementConverter.nullable)
+
+  override def eval(input: InternalRow): Any =
+throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+val inputSet = child.genCode(ctx)
+val genElementConverter = elementConverter.genCode(ctx)
+val length = ctx.freshName("length")
+val index = ctx.freshName("index")
+
+val iter = ctx.freshName("iter")
+val (defineIterator, defineElement) = child.dataType match {
+  case ObjectType(cls) if 
classOf[java.util.Set[_]].isAssignableFrom(cls) =>
+val javaIteratorCls = classOf[java.util.Iterator[_]].getName
--- End diff --

I'd prefer to leave java set support to other PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18416: [SPARK-21204][SQL][WIP] Add support for Scala Set...

2017-06-25 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/18416

[SPARK-21204][SQL][WIP] Add support for Scala Set collection types in 
serialization

## What changes were proposed in this pull request?

Currently we can't produce a `Dataset` containing `Set` in SparkSQL.

There's no corresponding internal data type in SparkSQL for a `Set`.  A 
`Set` will be serialized to an array.

## How was this patch tested?

Added unit tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 SPARK-21204

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18416.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18416


commit db1b91ed3fd0979841d40a20e7db0f64c62a947d
Author: Liang-Chi Hsieh 
Date:   2017-06-25T10:21:27Z

Add support for Scala Set collection types in serialization.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org