spark git commit: [SPARK-16288][SQL] Implement inline table generating function

2016-07-07 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 bb4b0419b -> e32c29d86


[SPARK-16288][SQL] Implement inline table generating function

This PR implements `inline` table generating function.

Pass the Jenkins tests with new testcase.

Author: Dongjoon Hyun 

Closes #13976 from dongjoon-hyun/SPARK-16288.

(cherry picked from commit 88134e736829f5f93a82879c08cb191f175ff8af)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e32c29d8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e32c29d8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e32c29d8

Branch: refs/heads/branch-2.0
Commit: e32c29d86d4cc7ebe8e485c4221b5a10366b3d7d
Parents: bb4b041
Author: Dongjoon Hyun 
Authored: Mon Jul 4 01:57:45 2016 +0800
Committer: Reynold Xin 
Committed: Thu Jul 7 21:08:45 2016 -0700

--
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../sql/catalyst/expressions/generators.scala   | 35 
 .../expressions/GeneratorExpressionSuite.scala  | 59 +--
 .../spark/sql/GeneratorFunctionSuite.scala  | 60 
 .../spark/sql/hive/HiveSessionCatalog.scala |  5 +-
 5 files changed, 124 insertions(+), 36 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e32c29d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index e7f335f..021bec7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -165,6 +165,7 @@ object FunctionRegistry {
 expression[Explode]("explode"),
 expression[Greatest]("greatest"),
 expression[If]("if"),
+expression[Inline]("inline"),
 expression[IsNaN]("isnan"),
 expression[IfNull]("ifnull"),
 expression[IsNull]("isnull"),

http://git-wip-us.apache.org/repos/asf/spark/blob/e32c29d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index 4e91cc5..99b97c8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -195,3 +195,38 @@ case class Explode(child: Expression) extends 
ExplodeBase(child, position = fals
   extended = "> SELECT _FUNC_(array(10,20));\n  0\t10\n  1\t20")
 // scalastyle:on line.size.limit
 case class PosExplode(child: Expression) extends ExplodeBase(child, position = 
true)
+
+/**
+ * Explodes an array of structs into a table.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(a) - Explodes an array of structs into a table.",
+  extended = "> SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b')));\n  
[1,a]\n  [2,b]")
+case class Inline(child: Expression) extends UnaryExpression with Generator 
with CodegenFallback {
+
+  override def children: Seq[Expression] = child :: Nil
+
+  override def checkInputDataTypes(): TypeCheckResult = child.dataType match {
+case ArrayType(et, _) if et.isInstanceOf[StructType] =>
+  TypeCheckResult.TypeCheckSuccess
+case _ =>
+  TypeCheckResult.TypeCheckFailure(
+s"input to function $prettyName should be array of struct type, not 
${child.dataType}")
+  }
+
+  override def elementSchema: StructType = child.dataType match {
+case ArrayType(et : StructType, _) => et
+  }
+
+  private lazy val numFields = elementSchema.fields.length
+
+  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+val inputArray = child.eval(input).asInstanceOf[ArrayData]
+if (inputArray == null) {
+  Nil
+} else {
+  for (i <- 0 until inputArray.numElements())
+yield inputArray.getStruct(i, numFields)
+}
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e32c29d8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
 

spark git commit: [SPARK-16288][SQL] Implement inline table generating function

2016-07-03 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 54b27c179 -> 88134e736


[SPARK-16288][SQL] Implement inline table generating function

## What changes were proposed in this pull request?

This PR implements `inline` table generating function.

## How was this patch tested?

Pass the Jenkins tests with new testcase.

Author: Dongjoon Hyun 

Closes #13976 from dongjoon-hyun/SPARK-16288.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88134e73
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88134e73
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88134e73

Branch: refs/heads/master
Commit: 88134e736829f5f93a82879c08cb191f175ff8af
Parents: 54b27c1
Author: Dongjoon Hyun 
Authored: Mon Jul 4 01:57:45 2016 +0800
Committer: Wenchen Fan 
Committed: Mon Jul 4 01:57:45 2016 +0800

--
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../sql/catalyst/expressions/generators.scala   | 35 
 .../expressions/GeneratorExpressionSuite.scala  | 59 +--
 .../spark/sql/GeneratorFunctionSuite.scala  | 60 
 .../spark/sql/hive/HiveSessionCatalog.scala |  5 +-
 5 files changed, 124 insertions(+), 36 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/88134e73/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index e7f335f..021bec7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -165,6 +165,7 @@ object FunctionRegistry {
 expression[Explode]("explode"),
 expression[Greatest]("greatest"),
 expression[If]("if"),
+expression[Inline]("inline"),
 expression[IsNaN]("isnan"),
 expression[IfNull]("ifnull"),
 expression[IsNull]("isnull"),

http://git-wip-us.apache.org/repos/asf/spark/blob/88134e73/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index 4e91cc5..99b97c8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -195,3 +195,38 @@ case class Explode(child: Expression) extends 
ExplodeBase(child, position = fals
   extended = "> SELECT _FUNC_(array(10,20));\n  0\t10\n  1\t20")
 // scalastyle:on line.size.limit
 case class PosExplode(child: Expression) extends ExplodeBase(child, position = 
true)
+
+/**
+ * Explodes an array of structs into a table.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(a) - Explodes an array of structs into a table.",
+  extended = "> SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b')));\n  
[1,a]\n  [2,b]")
+case class Inline(child: Expression) extends UnaryExpression with Generator 
with CodegenFallback {
+
+  override def children: Seq[Expression] = child :: Nil
+
+  override def checkInputDataTypes(): TypeCheckResult = child.dataType match {
+case ArrayType(et, _) if et.isInstanceOf[StructType] =>
+  TypeCheckResult.TypeCheckSuccess
+case _ =>
+  TypeCheckResult.TypeCheckFailure(
+s"input to function $prettyName should be array of struct type, not 
${child.dataType}")
+  }
+
+  override def elementSchema: StructType = child.dataType match {
+case ArrayType(et : StructType, _) => et
+  }
+
+  private lazy val numFields = elementSchema.fields.length
+
+  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+val inputArray = child.eval(input).asInstanceOf[ArrayData]
+if (inputArray == null) {
+  Nil
+} else {
+  for (i <- 0 until inputArray.numElements())
+yield inputArray.getStruct(i, numFields)
+}
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/88134e73/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala