[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-06-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21082


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-06-08 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r194133638
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -424,6 +424,21 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 }
   }
 
+  object Window extends Strategy {
+def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+  case PhysicalWindow(
+WindowFunctionType.SQL, windowExprs, partitionSpec, orderSpec, 
child) =>
+execution.window.WindowExec(
+  windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
+
+  case PhysicalWindow(
+WindowFunctionType.Python, windowExprs, partitionSpec, orderSpec, 
child) =>
+execution.python.WindowInPandasExec(
+  windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
--- End diff --

Added


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-06-08 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r194133573
  
--- Diff: python/pyspark/worker.py ---
@@ -128,6 +128,17 @@ def wrapped(*series):
 return lambda *a: (wrapped(*a), arrow_return_type)
 
 
+def wrap_window_agg_pandas_udf(f, return_type):
+arrow_return_type = to_arrow_type(return_type)
+
+def wrapped(*series):
+import pandas as pd
+result = f(*series)
+return pd.Series([result]).repeat(len(series[0]))
--- End diff --

Added comments to describe the function


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-06-08 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r194130366
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
 ---
@@ -34,7 +34,12 @@ object PythonUDF {
 e.isInstanceOf[PythonUDF] && 
SCALAR_TYPES.contains(e.asInstanceOf[PythonUDF].evalType)
   }
 
-  def isGroupAggPandasUDF(e: Expression): Boolean = {
+  def isGroupedAggPandasUDF(e: Expression): Boolean = {
+e.isInstanceOf[PythonUDF] &&
+  e.asInstanceOf[PythonUDF].evalType == 
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
+  }
+
+  def isWindowPandasUDF(e: Expression): Boolean = {
--- End diff --

Sounds good


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-06-08 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r194129021
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -5181,6 +5190,235 @@ def test_invalid_args(self):
 'mixture.*aggregate function.*group aggregate pandas 
UDF'):
 df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()
 
+
+@unittest.skipIf(
+not _have_pandas or not _have_pyarrow,
+_pandas_requirement_message or _pyarrow_requirement_message)
+class WindowPandasUDFTests(ReusedSQLTestCase):
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i * 1.0) + col('id') for i in 
range(20, 30)])) \
+.withColumn("v", explode(col('vs'))) \
+.drop('vs') \
+.withColumn('w', lit(1.0))
+
+@property
+def python_plus_one(self):
+from pyspark.sql.functions import udf
+return udf(lambda v: v + 1, 'double')
+
+@property
+def pandas_scalar_time_two(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+return pandas_udf(lambda v: v * 2, 'double')
+
+@property
+def pandas_agg_mean_udf(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+@pandas_udf('double', PandasUDFType.GROUPED_AGG)
+def avg(v):
+return v.mean()
+return avg
+
+@property
+def pandas_agg_max_udf(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+@pandas_udf('double', PandasUDFType.GROUPED_AGG)
+def max(v):
+return v.max()
+return max
+
+@property
+def pandas_agg_min_udf(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+@pandas_udf('double', PandasUDFType.GROUPED_AGG)
+def min(v):
+return v.min()
+return min
+
+@property
+def unbounded_window(self):
+return Window.partitionBy('id') \
+.rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)
+
+@property
+def ordered_window(self):
+return Window.partitionBy('id').orderBy('v')
+
+@property
+def unpartitioned_window(self):
+return Window.partitionBy()
--- End diff --

I think we can reply on that `Window.partitionBy()` returns unbounded 
window here, otherwise there might be too many combinations to test. But I am 
ok to add the tests for 
`Window,.partitionBy().rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)` in addition to the existing ones. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-06-06 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r193327794
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+case class WindowInPandasExec(
+windowExpression: Seq[NamedExpression],
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+child: SparkPlan) extends UnaryExecNode {
+
+  override def output: Seq[Attribute] =
+child.output ++ windowExpression.map(_.toAttribute)
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+if (partitionSpec.isEmpty) {
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
+  AllTuples :: Nil
+} else {
+  ClusteredDistribution(partitionSpec) :: Nil
+}
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
+udf.children match {
+  case Seq(u: PythonUDF) =>
+val (chained, children) = collectFunctions(u)
+(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
+  case children =>
+// There should not be any other UDFs, or the children can't be 
evaluated directly.
+assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
+(ChainedPythonFunctions(Seq(udf.func)), udf.children)
+}
+  }
+
+  /**
+   * Create the resulting projection.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param expressions unbound ordered function expressions.
+   * @return the final resulting projection.
+   */
+  private[this] def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
+val references = expressions.zipWithIndex.map{ case (e, i) =>
--- End diff --

`p{` -> `p {`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-06-06 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r193326454
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -268,3 +269,40 @@ object PhysicalAggregation {
 case _ => None
   }
 }
+
+/**
+ * An extractor used when planning physical execution of a window. This 
extractor outputs
+ * the window function type of the logical window.
+ *
+ * The input logical window must contain same type of window functions, 
which is ensured by
+ * the rule ExtractWindowExpressions in the analyzer.
+ */
+object PhysicalWindow {
+  // windowFunctionType, windowExpression, partitionSpec, orderSpec, child
+  type ReturnType =
--- End diff --

nit: `private type`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-06-06 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r193323738
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
 ---
@@ -297,6 +297,37 @@ trait WindowFunction extends Expression {
   def frame: WindowFrame = UnspecifiedFrame
 }
 
+/**
+ * Case objects that describe whether a window function is a SQL window 
function or a Python
+ * user-defined window function.
+ */
+sealed trait WindowFunctionType
+
+object WindowFunctionType {
+  case object SQL extends WindowFunctionType
+  case object Python extends WindowFunctionType
+
+  def functionType(windowExpression: NamedExpression): WindowFunctionType 
= {
+val t = windowExpression.collectFirst {
+  case _: WindowFunction | _: AggregateFunction => SQL
+  case udf: PythonUDF if PythonUDF.isWindowPandasUDF(udf) => Python
+}
+
+// Normally a window expression would either have either a SQL window 
function, a SQL
--- End diff --

`either have either` ... :-).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-06-06 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r193323414
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -424,6 +424,21 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 }
   }
 
+  object Window extends Strategy {
+def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+  case PhysicalWindow(
+WindowFunctionType.SQL, windowExprs, partitionSpec, orderSpec, 
child) =>
+execution.window.WindowExec(
+  windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
+
+  case PhysicalWindow(
+WindowFunctionType.Python, windowExprs, partitionSpec, orderSpec, 
child) =>
+execution.python.WindowInPandasExec(
+  windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
--- End diff --

tiny nit: I would add a newline below


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-06-06 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r193320743
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
 ---
@@ -34,7 +34,12 @@ object PythonUDF {
 e.isInstanceOf[PythonUDF] && 
SCALAR_TYPES.contains(e.asInstanceOf[PythonUDF].evalType)
   }
 
-  def isGroupAggPandasUDF(e: Expression): Boolean = {
+  def isGroupedAggPandasUDF(e: Expression): Boolean = {
+e.isInstanceOf[PythonUDF] &&
+  e.asInstanceOf[PythonUDF].evalType == 
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
+  }
+
+  def isWindowPandasUDF(e: Expression): Boolean = {
--- End diff --

@icexelloss, can we maybe do:

```
def isWindowPandasUDF(e: Expression): = isGroupedAggPandasUDF(e)
```

and explain why they can be same ..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-31 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r192182678
  
--- Diff: python/pyspark/worker.py ---
@@ -128,6 +128,17 @@ def wrapped(*series):
 return lambda *a: (wrapped(*a), arrow_return_type)
 
 
+def wrap_window_agg_pandas_udf(f, return_type):
+arrow_return_type = to_arrow_type(return_type)
+
+def wrapped(*series):
+import pandas as pd
+result = f(*series)
+return pd.Series([result]).repeat(len(series[0]))
--- End diff --

Yes - I tried to do this on the Java side but it's tricky and complicated 
to merging the input row and output of udf if they are not 1-1 mapping. So I 
ended up doing this..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r192151282
  
--- Diff: python/pyspark/worker.py ---
@@ -128,6 +128,17 @@ def wrapped(*series):
 return lambda *a: (wrapped(*a), arrow_return_type)
 
 
+def wrap_window_agg_pandas_udf(f, return_type):
+arrow_return_type = to_arrow_type(return_type)
+
+def wrapped(*series):
+import pandas as pd
+result = f(*series)
+return pd.Series([result]).repeat(len(series[0]))
--- End diff --

So, this place is the only place where it's diverted (by repeating?); 
therefore, needs Windows specific attribute?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r192150984
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
 ---
@@ -34,7 +34,12 @@ object PythonUDF {
 e.isInstanceOf[PythonUDF] && 
SCALAR_TYPES.contains(e.asInstanceOf[PythonUDF].evalType)
   }
 
-  def isGroupAggPandasUDF(e: Expression): Boolean = {
+  def isGroupedAggPandasUDF(e: Expression): Boolean = {
+e.isInstanceOf[PythonUDF] &&
+  e.asInstanceOf[PythonUDF].evalType == 
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
+  }
+
+  def isWindowPandasUDF(e: Expression): Boolean = {
--- End diff --

H .. this bit looks a bit odd because the condition is the same but the 
only the name is different. We should at least need to leave a comment ...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r192146812
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 ---
@@ -112,12 +113,19 @@ trait CheckAnalysis extends PredicateHelper {
 failAnalysis("An offset window function can only be evaluated 
in an ordered " +
   s"row-based window frame with a single offset: $w")
 
+  case _ @ WindowExpression(_: PythonUDF,
+WindowSpecDefinition(_, _, frame: SpecifiedWindowFrame))
+  if !frame.isUnbounded =>
+failAnalysis(s"Only unbounded window frame is supported with 
Pandas UDFs.")
--- End diff --

leading `s` seems not needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r192146449
  
--- Diff: python/pyspark/worker.py ---
@@ -128,6 +128,17 @@ def wrapped(*series):
 return lambda *a: (wrapped(*a), arrow_return_type)
 
 
+def wrap_window_agg_pandas_udf(f, return_type):
+arrow_return_type = to_arrow_type(return_type)
+
+def wrapped(*series):
+import pandas as pd
+result = f(*series)
+return pd.Series([result]).repeat(len(series[0]))
--- End diff --

Let's leave a short comment where we are here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r192142018
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -5181,6 +5190,235 @@ def test_invalid_args(self):
 'mixture.*aggregate function.*group aggregate pandas 
UDF'):
 df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()
 
+
+@unittest.skipIf(
+not _have_pandas or not _have_pyarrow,
+_pandas_requirement_message or _pyarrow_requirement_message)
+class WindowPandasUDFTests(ReusedSQLTestCase):
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i * 1.0) + col('id') for i in 
range(20, 30)])) \
+.withColumn("v", explode(col('vs'))) \
+.drop('vs') \
+.withColumn('w', lit(1.0))
+
+@property
+def python_plus_one(self):
+from pyspark.sql.functions import udf
+return udf(lambda v: v + 1, 'double')
+
+@property
+def pandas_scalar_time_two(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+return pandas_udf(lambda v: v * 2, 'double')
+
+@property
+def pandas_agg_mean_udf(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+@pandas_udf('double', PandasUDFType.GROUPED_AGG)
+def avg(v):
+return v.mean()
+return avg
+
+@property
+def pandas_agg_max_udf(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+@pandas_udf('double', PandasUDFType.GROUPED_AGG)
+def max(v):
+return v.max()
+return max
+
+@property
+def pandas_agg_min_udf(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+@pandas_udf('double', PandasUDFType.GROUPED_AGG)
+def min(v):
+return v.min()
+return min
+
+@property
+def unbounded_window(self):
+return Window.partitionBy('id') \
+.rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)
+
+@property
+def ordered_window(self):
+return Window.partitionBy('id').orderBy('v')
+
+@property
+def unpartitioned_window(self):
+return Window.partitionBy()
--- End diff --

Shall we test `Window.rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)` too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r192140997
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -5181,6 +5190,235 @@ def test_invalid_args(self):
 'mixture.*aggregate function.*group aggregate pandas 
UDF'):
 df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()
 
+
+@unittest.skipIf(
+not _have_pandas or not _have_pyarrow,
+_pandas_requirement_message or _pyarrow_requirement_message)
+class WindowPandasUDFTests(ReusedSQLTestCase):
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i * 1.0) + col('id') for i in 
range(20, 30)])) \
+.withColumn("v", explode(col('vs'))) \
+.drop('vs') \
+.withColumn('w', lit(1.0))
+
+@property
+def python_plus_one(self):
+from pyspark.sql.functions import udf
+return udf(lambda v: v + 1, 'double')
+
+@property
+def pandas_scalar_time_two(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+return pandas_udf(lambda v: v * 2, 'double')
+
+@property
+def pandas_agg_mean_udf(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+@pandas_udf('double', PandasUDFType.GROUPED_AGG)
+def avg(v):
+return v.mean()
+return avg
+
+@property
+def pandas_agg_max_udf(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+@pandas_udf('double', PandasUDFType.GROUPED_AGG)
+def max(v):
+return v.max()
+return max
+
+@property
+def pandas_agg_min_udf(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+@pandas_udf('double', PandasUDFType.GROUPED_AGG)
+def min(v):
+return v.min()
+return min
+
+@property
+def unbounded_window(self):
+return Window.partitionBy('id') \
+.rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)
+
+@property
+def ordered_window(self):
+return Window.partitionBy('id').orderBy('v')
+
+@property
+def unpartitioned_window(self):
+return Window.partitionBy()
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType, 
percent_rank, mean, max
+
+df = self.data
+w = self.unbounded_window
+
+mean_udf = self.pandas_agg_mean_udf
+
+result1 = df.withColumn('mean_v', mean_udf(df['v']).over(w))
+expected1 = df.withColumn('mean_v', mean(df['v']).over(w))
+
+result2 = df.select(mean_udf(df['v']).over(w))
+expected2 = df.select(mean(df['v']).over(w))
+
+self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
+
+def test_multiple_udfs(self):
+from pyspark.sql.functions import max, min, mean
+
+df = self.data
+w = self.unbounded_window
+
+result1 = df.withColumn('mean_v', 
self.pandas_agg_mean_udf(df['v']).over(w)) \
+.withColumn('max_v', 
self.pandas_agg_max_udf(df['v']).over(w)) \
+.withColumn('min_w', 
self.pandas_agg_min_udf(df['w']).over(w)) \
--- End diff --

Trailing `\`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r192138739
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2321,7 +2323,30 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
|  2|6.0|
+---+---+
 
-   .. seealso:: :meth:`pyspark.sql.GroupedData.agg`
+   This example shows using grouped aggregated UDFs as window 
functions. Note that only
+   unbounded window frame is supported at the moment:
+
+   >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+   >>> from pyspark.sql import Window
+   >>> df = spark.createDataFrame(
+   ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+   ... ("id", "v"))
+   >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG)  # doctest: 
+SKIP
+   ... def mean_udf(v):
+   ... return v.mean()
+   >>> w = Window.partitionBy('id')
--- End diff --

Shall we explicitly show unbounded boundaries?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-30 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r191801865
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+case class WindowInPandasExec(
+windowExpression: Seq[NamedExpression],
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+child: SparkPlan) extends UnaryExecNode {
+
+  override def output: Seq[Attribute] =
--- End diff --

I am not sure. Currently python and window are under different package 
namespaces (execution.python and execution.window). To create a common base 
class we probably need to refactor the namespace hierarchy somehow and remove 
some of the `private[python]` and `private[window]`. 

I think we will face the problem when trying to do rolling window with 
pandas UDF, because we likely want to reuse some of code under 
`execution.window` package, but I am not sure we should resolve the python and 
window namespace in this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-30 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r191740985
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+case class WindowInPandasExec(
+windowExpression: Seq[NamedExpression],
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+child: SparkPlan) extends UnaryExecNode {
+
+  override def output: Seq[Attribute] =
--- End diff --

Minor comment: should we create a common base class for native Windows and 
python Windows and ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-24 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r190633480
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2301,10 +2301,12 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
The returned scalar can be either a python primitive type, e.g., 
`int` or `float`
or a numpy data type, e.g., `numpy.int64` or `numpy.float64`.
 
-   :class:`ArrayType`, :class:`MapType` and :class:`StructType` are 
currently not supported as
-   output types.
+   :class:`MapType` and :class:`StructType` are currently not 
supported as output types.
--- End diff --

Track in: https://issues.apache.org/jira/browse/SPARK-23633


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-23 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r190340070
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1869,6 +1870,8 @@ class Analyzer(
   case window: WindowExpression => window.windowSpec
 }.distinct
 
+val windowFunctionType = WindowFunctionType.functionType(expr)
--- End diff --

Oh, now I understand that the long block is for `groupBy`, so the tuple 
will be the group key, right? I misread the block was for `map` or something. 
Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-23 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r190334520
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1869,6 +1870,8 @@ class Analyzer(
   case window: WindowExpression => window.windowSpec
 }.distinct
 
+val windowFunctionType = WindowFunctionType.functionType(expr)
--- End diff --

Inlined.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-23 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r190242582
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1869,6 +1870,8 @@ class Analyzer(
   case window: WindowExpression => window.windowSpec
 }.distinct
 
+val windowFunctionType = WindowFunctionType.functionType(expr)
--- End diff --

Ah I see. Let me inline this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-23 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r190177055
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -5181,6 +5190,235 @@ def test_invalid_args(self):
 'mixture.*aggregate function.*group aggregate pandas 
UDF'):
 df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()
 
+
+@unittest.skipIf(
+not _have_pandas or not _have_pyarrow,
+_pandas_requirement_message or _pyarrow_requirement_message)
+class WindowPandasUDFTests(ReusedSQLTestCase):
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i * 1.0) + col('id') for i in 
range(20, 30)])) \
+.withColumn("v", explode(col('vs'))) \
+.drop('vs') \
+.withColumn('w', lit(1.0))
+
+@property
+def python_plus_one(self):
--- End diff --

Sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-23 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r190176898
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1869,6 +1870,8 @@ class Analyzer(
   case window: WindowExpression => window.windowSpec
 }.distinct
 
+val windowFunctionType = WindowFunctionType.functionType(expr)
--- End diff --

Yeah, I can see it is included in the tuple, but seems like it is used for 
nothing after that and dropped at 
https://github.com/apache/spark/pull/21082/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2R1886?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-22 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r190065854
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -268,3 +269,38 @@ object PhysicalAggregation {
 case _ => None
   }
 }
+
+/**
+ * An extractor used when planning physical execution of a window. This 
extractor outputs
+ * the window function type of the logical window.
+ *
+ * The input logical window must contain same type of window functions, 
which is ensured by
+ * the rule ExtractWindowExpressions in the analyzer.
+ */
+object PhysicalWindow {
+  // windowFunctionType, windowExpression, partitionSpec, orderSpec, child
+  type ReturnType =
+(WindowFunctionType, Seq[NamedExpression], Seq[Expression], 
Seq[SortOrder], LogicalPlan)
+
+  def unapply(a: Any): Option[ReturnType] = a match {
+case expr @ logical.Window(windowExpressions, partitionSpec, 
orderSpec, child) =>
+
+  if (windowExpressions.isEmpty) {
+throw new AnalysisException(s"Window expression is empty in $expr")
--- End diff --

I added comments to explain this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-22 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r190061061
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -5181,6 +5190,235 @@ def test_invalid_args(self):
 'mixture.*aggregate function.*group aggregate pandas 
UDF'):
 df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()
 
+
+@unittest.skipIf(
+not _have_pandas or not _have_pyarrow,
+_pandas_requirement_message or _pyarrow_requirement_message)
+class WindowPandasUDFTests(ReusedSQLTestCase):
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i * 1.0) + col('id') for i in 
range(20, 30)])) \
+.withColumn("v", explode(col('vs'))) \
+.drop('vs') \
+.withColumn('w', lit(1.0))
+
+@property
+def python_plus_one(self):
--- End diff --

I agree those should be shared, but let's do it maybe in a separate PR? 
Because the refactor will probably touch many test cases and the PR is quite 
large already..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-22 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r190060687
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -268,3 +269,38 @@ object PhysicalAggregation {
 case _ => None
   }
 }
+
+/**
+ * An extractor used when planning physical execution of a window. This 
extractor outputs
+ * the window function type of the logical window.
+ *
+ * The input logical window must contain same type of window functions, 
which is ensured by
+ * the rule ExtractWindowExpressions in the analyzer.
+ */
+object PhysicalWindow {
+  // windowFunctionType, windowExpression, partitionSpec, orderSpec, child
+  type ReturnType =
+(WindowFunctionType, Seq[NamedExpression], Seq[Expression], 
Seq[SortOrder], LogicalPlan)
+
+  def unapply(a: Any): Option[ReturnType] = a match {
+case expr @ logical.Window(windowExpressions, partitionSpec, 
orderSpec, child) =>
+
+  if (windowExpressions.isEmpty) {
+throw new AnalysisException(s"Window expression is empty in $expr")
--- End diff --

Window expression should not be empty here so this should not be reached. 
This is just for safety. Let me add a comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-22 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r190060086
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1869,6 +1870,8 @@ class Analyzer(
   case window: WindowExpression => window.windowSpec
 }.distinct
 
+val windowFunctionType = WindowFunctionType.functionType(expr)
--- End diff --

This is used for grouping window functions into sql window functions and 
pandas UDFs and create different logical node for them. The value is used here 
https://github.com/apache/spark/pull/21082/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2R1886


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r189198335
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -5181,6 +5190,235 @@ def test_invalid_args(self):
 'mixture.*aggregate function.*group aggregate pandas 
UDF'):
 df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()
 
+
+@unittest.skipIf(
+not _have_pandas or not _have_pyarrow,
+_pandas_requirement_message or _pyarrow_requirement_message)
+class WindowPandasUDFTests(ReusedSQLTestCase):
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i * 1.0) + col('id') for i in 
range(20, 30)])) \
+.withColumn("v", explode(col('vs'))) \
+.drop('vs') \
+.withColumn('w', lit(1.0))
+
+@property
+def python_plus_one(self):
--- End diff --

Shall we move `pands_udf`s for tests to the common place?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r189161794
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1869,6 +1870,8 @@ class Analyzer(
   case window: WindowExpression => window.windowSpec
 }.distinct
 
+val windowFunctionType = WindowFunctionType.functionType(expr)
--- End diff --

What's this for? Seems like this is omitted soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r189216703
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -268,3 +269,38 @@ object PhysicalAggregation {
 case _ => None
   }
 }
+
+/**
+ * An extractor used when planning physical execution of a window. This 
extractor outputs
+ * the window function type of the logical window.
+ *
+ * The input logical window must contain same type of window functions, 
which is ensured by
+ * the rule ExtractWindowExpressions in the analyzer.
+ */
+object PhysicalWindow {
+  // windowFunctionType, windowExpression, partitionSpec, orderSpec, child
+  type ReturnType =
+(WindowFunctionType, Seq[NamedExpression], Seq[Expression], 
Seq[SortOrder], LogicalPlan)
+
+  def unapply(a: Any): Option[ReturnType] = a match {
+case expr @ logical.Window(windowExpressions, partitionSpec, 
orderSpec, child) =>
+
+  if (windowExpressions.isEmpty) {
+throw new AnalysisException(s"Window expression is empty in $expr")
--- End diff --

Can we reach here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-11 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r187663449
  
--- Diff: python/pyspark/worker.py ---
@@ -128,6 +128,17 @@ def wrapped(*series):
 return lambda *a: (wrapped(*a), arrow_return_type)
 
 
+def wrap_window_agg_pandas_udf(f, return_type):
+arrow_return_type = to_arrow_type(return_type)
+
+def wrapped(*series):
+import pandas as pd
+result = f(*series)
+return pd.Series([result]).repeat(len(series[0]))
--- End diff --

window aggregation results are broadcasted to each input row and therefore 
we repeat the value here to match the input rows.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-10 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r187479626
  
--- Diff: python/pyspark/worker.py ---
@@ -128,6 +128,17 @@ def wrapped(*series):
 return lambda *a: (wrapped(*a), arrow_return_type)
 
 
+def wrap_window_agg_pandas_udf(f, return_type):
+arrow_return_type = to_arrow_type(return_type)
+
+def wrapped(*series):
+import pandas as pd
+result = f(*series)
+return pd.Series([result]).repeat(len(series[0]))
--- End diff --

Just wondering why this needs to be repeated to the length of the series 
and grouped agg doesn't?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-02 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r185423199
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2301,10 +2301,12 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
The returned scalar can be either a python primitive type, e.g., 
`int` or `float`
or a numpy data type, e.g., `numpy.int64` or `numpy.float64`.
 
-   :class:`ArrayType`, :class:`MapType` and :class:`StructType` are 
currently not supported as
-   output types.
+   :class:`MapType` and :class:`StructType` are currently not 
supported as output types.
--- End diff --

Yup, I think that works too. I left a comment only because it looked 
mismatched with this api doc and the sql programming guide.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-05-01 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r185339976
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2301,10 +2301,12 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
The returned scalar can be either a python primitive type, e.g., 
`int` or `float`
or a numpy data type, e.g., `numpy.int64` or `numpy.float64`.
 
-   :class:`ArrayType`, :class:`MapType` and :class:`StructType` are 
currently not supported as
-   output types.
+   :class:`MapType` and :class:`StructType` are currently not 
supported as output types.
--- End diff --

I am leaning towards keeping this in the API doc and maybe make 
sql-programming-guide link to this.

I think most user would look for API docs first rather than 
sql-programming-guide, so it's probably a bit more convenient to have it here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r184875393
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -268,3 +269,38 @@ object PhysicalAggregation {
 case _ => None
   }
 }
+
+/**
+ * An extractor used when planning physical execution of a window. This 
extractor outputs
+ * the window function type of the logical window.
+ *
+ * The input logical window must contain same type of window functions, 
which is ensured by
+ * the rule ExtractWindowExpressions in the analyzer.
+ */
+object PhysicalWindow {
+  // windowFunctionType, windowExpression, partitionSpec, orderSpec, child
+  type ReturnType =
+(WindowFunctionType, Seq[NamedExpression], Seq[Expression], 
Seq[SortOrder], LogicalPlan)
+
+  def unapply(a: Any): Option[ReturnType] = a match {
+case expr @ logical.Window(windowExpressions, partitionSpec, 
orderSpec, child) =>
+
+  if (windowExpressions.isEmpty) {
+throw new AnalysisException(s"Window expression is empty in $expr")
+  }
+
+  val windowFunctionType = 
windowExpressions.map(WindowFunctionType.functionType)
+.reduceLeft ( (t1: WindowFunctionType, t2: WindowFunctionType) =>
--- End diff --

(BTW: 
```
.reduceLeft {
  ...
}
```
)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r184875163
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -5181,6 +5190,236 @@ def test_invalid_args(self):
 'mixture.*aggregate function.*group aggregate pandas 
UDF'):
 df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()
 
+
+@unittest.skipIf(
+not _have_pandas or not _have_pyarrow,
+_pandas_requirement_message or _pyarrow_requirement_message)
+class WindowPandasUDFTests(ReusedSQLTestCase):
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i * 1.0) + col('id') for i in 
range(20, 30)])) \
+.withColumn("v", explode(col('vs'))) \
+.drop('vs') \
+.withColumn('w', lit(1.0))
+
+@property
+def python_plus_one(self):
+from pyspark.sql.functions import udf
+return udf(lambda v: v + 1, 'double')
+
+@property
+def pandas_scalar_time_two(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+return pandas_udf(lambda v: v * 2, 'double')
+
+@property
+def pandas_agg_mean_udf(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+@pandas_udf('double', PandasUDFType.GROUPED_AGG)
+def avg(v):
+return v.mean()
+return avg
+
+@property
+def pandas_agg_max_udf(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+@pandas_udf('double', PandasUDFType.GROUPED_AGG)
+def max(v):
+return v.max()
+return max
+
+@property
+def pandas_agg_min_udf(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+@pandas_udf('double', PandasUDFType.GROUPED_AGG)
+def min(v):
+return v.min()
+return min
+
+@property
+def unbounded_window(self):
+return Window.partitionBy('id') \
+.rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)
+
+@property
+def ordered_window(self):
+return Window.partitionBy('id').orderBy('v')
+
+@property
+def unpartitioned_window(self):
+return Window.partitionBy()
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType, 
percent_rank, mean, max
+
+df = self.data
+w = self.unbounded_window
+
+mean_udf = self.pandas_agg_mean_udf
+
+result1 = df.withColumn('mean_v', mean_udf(df['v']).over(w))
+expected1 = df.withColumn('mean_v', mean(df['v']).over(w))
+
+result2 = df.select(mean_udf(df['v']).over(w))
+expected2 = df.select(mean(df['v']).over(w))
+
+self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
+
+def test_multiple_udfs(self):
+from pyspark.sql.functions import max, min, mean
+
+df = self.data
+w = self.unbounded_window
+
+result1 = df.withColumn('mean_v', 
self.pandas_agg_mean_udf(df['v']).over(w)) \
+.withColumn('max_v', 
self.pandas_agg_max_udf(df['v']).over(w)) \
+.withColumn('min_w', 
self.pandas_agg_min_udf(df['w']).over(w)) \
+
+expected1 = df.withColumn('mean_v', mean(df['v']).over(w)) \
+  .withColumn('max_v', max(df['v']).over(w)) \
+  .withColumn('min_w', min(df['w']).over(w))
+
+self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+
+def test_replace_existing(self):
+from pyspark.sql.functions import mean
+
+df = self.data
+w = self.unbounded_window
+
+result1 = df.withColumn('v', 
self.pandas_agg_mean_udf(df['v']).over(w))
+expected1 = df.withColumn('v', mean(df['v']).over(w))
+
+self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+
+def test_mixed_sql(self):
+from pyspark.sql.functions import mean
+
+df = self.data
+w = self.unbounded_window
+mean_udf = self.pandas_agg_mean_udf
+
+result1 = df.withColumn('v', mean_udf(df['v'] * 2).over(w) + 1)
+expected1 = df.withColumn('v', mean(df['v'] * 2).over(w) + 1)
+
+self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+

[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r184875140
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+case class WindowInPandasExec(
+windowExpression: Seq[NamedExpression],
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+child: SparkPlan) extends UnaryExecNode {
+
+  override def output: Seq[Attribute] =
+child.output ++ windowExpression.map(_.toAttribute)
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+if (partitionSpec.isEmpty) {
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
+  AllTuples :: Nil
+} else ClusteredDistribution(partitionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
+udf.children match {
+  case Seq(u: PythonUDF) =>
+val (chained, children) = collectFunctions(u)
+(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
+  case children =>
+// There should not be any other UDFs, or the children can't be 
evaluated directly.
+assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
+(ChainedPythonFunctions(Seq(udf.func)), udf.children)
+}
+  }
+
+  /**
+   * Create the resulting projection.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param expressions unbound ordered function expressions.
+   * @return the final resulting projection.
+   */
+  private[this] def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
+val references = expressions.zipWithIndex.map{ case (e, i) =>
+  // Results of window expressions will be on the right side of 
child's output
+  BoundReference(child.output.size + i, e.dataType, e.nullable)
+}
+val unboundToRefMap = expressions.zip(references).toMap
+val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToRefMap))
+UnsafeProjection.create(
+  child.output ++ patchedWindowExpression,
+  child.output)
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+val inputRDD = child.execute()
+
+val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+val sessionLocalTimeZone = conf.sessionLocalTimeZone
+val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
+
+// Extract window expressions and window 

[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r184875120
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+case class WindowInPandasExec(
+windowExpression: Seq[NamedExpression],
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+child: SparkPlan) extends UnaryExecNode {
+
+  override def output: Seq[Attribute] =
+child.output ++ windowExpression.map(_.toAttribute)
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+if (partitionSpec.isEmpty) {
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
+  AllTuples :: Nil
+} else ClusteredDistribution(partitionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
+udf.children match {
+  case Seq(u: PythonUDF) =>
+val (chained, children) = collectFunctions(u)
+(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
+  case children =>
+// There should not be any other UDFs, or the children can't be 
evaluated directly.
+assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
+(ChainedPythonFunctions(Seq(udf.func)), udf.children)
+}
+  }
+
+  /**
+   * Create the resulting projection.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param expressions unbound ordered function expressions.
+   * @return the final resulting projection.
+   */
+  private[this] def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
+val references = expressions.zipWithIndex.map{ case (e, i) =>
+  // Results of window expressions will be on the right side of 
child's output
+  BoundReference(child.output.size + i, e.dataType, e.nullable)
+}
+val unboundToRefMap = expressions.zip(references).toMap
+val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToRefMap))
+UnsafeProjection.create(
+  child.output ++ patchedWindowExpression,
+  child.output)
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+val inputRDD = child.execute()
+
+val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+val sessionLocalTimeZone = conf.sessionLocalTimeZone
+val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
+
+// Extract window expressions and window 

[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r184875102
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+case class WindowInPandasExec(
+windowExpression: Seq[NamedExpression],
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+child: SparkPlan) extends UnaryExecNode {
+
+  override def output: Seq[Attribute] =
+child.output ++ windowExpression.map(_.toAttribute)
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+if (partitionSpec.isEmpty) {
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
+  AllTuples :: Nil
+} else ClusteredDistribution(partitionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
+udf.children match {
+  case Seq(u: PythonUDF) =>
+val (chained, children) = collectFunctions(u)
+(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
+  case children =>
+// There should not be any other UDFs, or the children can't be 
evaluated directly.
+assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
+(ChainedPythonFunctions(Seq(udf.func)), udf.children)
+}
+  }
+
+  /**
+   * Create the resulting projection.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param expressions unbound ordered function expressions.
+   * @return the final resulting projection.
+   */
+  private[this] def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
+val references = expressions.zipWithIndex.map{ case (e, i) =>
+  // Results of window expressions will be on the right side of 
child's output
+  BoundReference(child.output.size + i, e.dataType, e.nullable)
+}
+val unboundToRefMap = expressions.zip(references).toMap
+val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToRefMap))
+UnsafeProjection.create(
+  child.output ++ patchedWindowExpression,
+  child.output)
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+val inputRDD = child.execute()
+
+val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+val sessionLocalTimeZone = conf.sessionLocalTimeZone
+val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
+
+// Extract window expressions and window 

[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r184875086
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+case class WindowInPandasExec(
+windowExpression: Seq[NamedExpression],
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+child: SparkPlan) extends UnaryExecNode {
+
+  override def output: Seq[Attribute] =
+child.output ++ windowExpression.map(_.toAttribute)
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+if (partitionSpec.isEmpty) {
+  // Only show warning when the number of bytes is larger than 100 MB?
+  logWarning("No Partition Defined for Window operation! Moving all 
data to a single "
++ "partition, this can cause serious performance degradation.")
+  AllTuples :: Nil
+} else ClusteredDistribution(partitionSpec) :: Nil
--- End diff --

nit: I would do 

```
else {

}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r184875053
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -424,6 +424,21 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 }
   }
 
+  object Window extends Strategy {
+def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+  case PhysicalWindow(
+  WindowFunctionType.SQL, windowExprs, partitionSpec, orderSpec, 
child) =>
--- End diff --

nit: indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r184875039
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -624,7 +624,9 @@ object CollapseRepartition extends Rule[LogicalPlan] {
 object CollapseWindow extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
 case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild))
-if ps1 == ps2 && os1 == os2 && 
w1.references.intersect(w2.windowOutputSet).isEmpty =>
+if ps1 == ps2 && os1 == os2 && 
w1.references.intersect(w2.windowOutputSet).isEmpty
+ && WindowFunctionType.functionType(we1.head) ==
--- End diff --

nit: indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r184875000
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 ---
@@ -112,12 +113,19 @@ trait CheckAnalysis extends PredicateHelper {
 failAnalysis("An offset window function can only be evaluated 
in an ordered " +
   s"row-based window frame with a single offset: $w")
 
+  case w @ WindowExpression(_: PythonUDF,
+  WindowSpecDefinition(_, _, frame: SpecifiedWindowFrame))
--- End diff --

indentation :-)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r184874826
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2301,10 +2301,12 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
The returned scalar can be either a python primitive type, e.g., 
`int` or `float`
or a numpy data type, e.g., `numpy.int64` or `numpy.float64`.
 
-   :class:`ArrayType`, :class:`MapType` and :class:`StructType` are 
currently not supported as
-   output types.
+   :class:`MapType` and :class:`StructType` are currently not 
supported as output types.
--- End diff --

@icexelloss, actually should we keep this note? I think this is matched 
with 
https://spark.apache.org/docs/latest/sql-programming-guide.html#supported-sql-types
 which we documented there and SQLConf.

Probably, just leaving a link could be fine. Removing out is okay to me too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-24 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r183769433
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2321,7 +2323,30 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
|  2|6.0|
+---+---+
 
-   .. seealso:: :meth:`pyspark.sql.GroupedData.agg`
+   This example shows using grouped aggregated UDFs as window 
functions. Note that only
+   unbounded window frame is supported at the moment:
+
+   >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+   >>> from pyspark.sql import Window
+   >>> df = spark.createDataFrame(
+   ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+   ... ("id", "v"))
+   >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG)  # doctest: 
+SKIP
--- End diff --

Yes exactly. The idea is that the producer of the UDF can produce a grouped 
agg udf, such as weighted mean, and the consumer can use the UDF in both 
groupby and window, similar to how SQL aggregation function work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-24 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r183768758
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -268,3 +269,38 @@ object PhysicalAggregation {
 case _ => None
   }
 }
+
+/**
+ * An extractor used when planning physical execution of a window. This 
extractor outputs
+ * the window function type of the logical window.
+ *
+ * The input logical window must contain same type of window functions, 
which is ensured by
+ * the rule ExtractWindowExpressions in the analyzer.
+ */
+object PhysicalWindow {
+  // windowFunctionType, windowExpression, partitionSpec, orderSpec, child
+  type ReturnType =
+(WindowFunctionType, Seq[NamedExpression], Seq[Expression], 
Seq[SortOrder], LogicalPlan)
+
+  def unapply(a: Any): Option[ReturnType] = a match {
+case expr @ logical.Window(windowExpressions, partitionSpec, 
orderSpec, child) =>
+
+  if (windowExpressions.isEmpty) {
+throw new AnalysisException(s"Window expression is empty in $expr")
+  }
+
+  val windowFunctionType = 
windowExpressions.map(WindowFunctionType.functionType)
+.reduceLeft ( (t1: WindowFunctionType, t2: WindowFunctionType) =>
--- End diff --

If we want to do this in Analyzer, then we would carry the 
WindowFunctionType in the logical plan. 

I did it this way to avoid changing the logical node. I am open to add 
WindowFunctionType to the logical plan though. What do other people think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r183572930
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -268,3 +269,38 @@ object PhysicalAggregation {
 case _ => None
   }
 }
+
+/**
+ * An extractor used when planning physical execution of a window. This 
extractor outputs
+ * the window function type of the logical window.
+ *
+ * The input logical window must contain same type of window functions, 
which is ensured by
+ * the rule ExtractWindowExpressions in the analyzer.
+ */
+object PhysicalWindow {
+  // windowFunctionType, windowExpression, partitionSpec, orderSpec, child
+  type ReturnType =
+(WindowFunctionType, Seq[NamedExpression], Seq[Expression], 
Seq[SortOrder], LogicalPlan)
+
+  def unapply(a: Any): Option[ReturnType] = a match {
+case expr @ logical.Window(windowExpressions, partitionSpec, 
orderSpec, child) =>
+
+  if (windowExpressions.isEmpty) {
+throw new AnalysisException(s"Window expression is empty in $expr")
+  }
+
+  val windowFunctionType = 
windowExpressions.map(WindowFunctionType.functionType)
+.reduceLeft ( (t1: WindowFunctionType, t2: WindowFunctionType) =>
--- End diff --

Should we do this analysis check in Analyzer? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r183416353
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2321,7 +2323,30 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
|  2|6.0|
+---+---+
 
-   .. seealso:: :meth:`pyspark.sql.GroupedData.agg`
+   This example shows using grouped aggregated UDFs as window 
functions. Note that only
+   unbounded window frame is supported at the moment:
+
+   >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+   >>> from pyspark.sql import Window
+   >>> df = spark.createDataFrame(
+   ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+   ... ("id", "v"))
+   >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG)  # doctest: 
+SKIP
--- End diff --

So we don't have `PandasUDFType.WINDOW_AGG` and a pandas udf defined as 
`PandasUDFType.GROUPED_AGG` can be both used with `groupby` and `Window`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r183417848
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
 ---
@@ -34,10 +34,15 @@ object PythonUDF {
 e.isInstanceOf[PythonUDF] && 
SCALAR_TYPES.contains(e.asInstanceOf[PythonUDF].evalType)
   }
 
-  def isGroupAggPandasUDF(e: Expression): Boolean = {
+  def isGroupedAggPandasUDF(e: Expression): Boolean = {
 e.isInstanceOf[PythonUDF] &&
   e.asInstanceOf[PythonUDF].evalType == 
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
   }
+
+  def isWindowPandasUDF(e: Expression): Boolean = {
+e.isInstanceOf[PythonUDF] &&
+e.asInstanceOf[PythonUDF].evalType == 
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
--- End diff --

nit: indent style.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r183412509
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2301,10 +2301,12 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
The returned scalar can be either a python primitive type, e.g., 
`int` or `float`
or a numpy data type, e.g., `numpy.int64` or `numpy.float64`.
 
-   :class:`ArrayType`, :class:`MapType` and :class:`StructType` are 
currently not supported as
-   output types.
+   :class:`MapType` and :class:`StructType` are currently not 
supported as output types.
 
-   Group aggregate UDFs are used with 
:meth:`pyspark.sql.GroupedData.agg`
+   Group aggregate UDFs are used with 
:meth:`pyspark.sql.GroupedData.agg` and
+   :meth:`pyspark.sql.Window`
+
+   This example show using grouped aggregated UDFS with groupby:
--- End diff --

typo: `shows`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r183412270
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2301,10 +2301,12 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
The returned scalar can be either a python primitive type, e.g., 
`int` or `float`
or a numpy data type, e.g., `numpy.int64` or `numpy.float64`.
 
-   :class:`ArrayType`, :class:`MapType` and :class:`StructType` are 
currently not supported as
-   output types.
+   :class:`MapType` and :class:`StructType` are currently not 
supported as output types.
 
-   Group aggregate UDFs are used with 
:meth:`pyspark.sql.GroupedData.agg`
+   Group aggregate UDFs are used with 
:meth:`pyspark.sql.GroupedData.agg` and
+   :meth:`pyspark.sql.Window`
--- End diff --

``` :class:`pyspark.sql.Window` ```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-21 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r183220435
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -5156,6 +5156,15 @@ def test_retain_group_columns(self):
 expected1 = df.groupby(df.id).agg(sum(df.v))
 self.assertPandasEqual(expected1.toPandas(), 
result1.toPandas())
 
+def test_array_type(self):
--- End diff --

This is related, but I figured its shouldn't hurt to add an array test in 
GroupedAggPandasUDFTests..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21082: [SPARK-22239][SQL][Python] Enable grouped aggrega...

2018-04-21 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21082#discussion_r183220392
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 ---
@@ -149,7 +149,7 @@ class AnalysisErrorSuite extends AnalysisTest {
   UnresolvedAttribute("a") :: Nil,
   SortOrder(UnresolvedAttribute("b"), Ascending) :: Nil,
   UnspecifiedFrame)).as('window)),
-"not supported within a window function" :: Nil)
+"does not have any window functions" :: Nil)
--- End diff --

This is because an early analysis exception is thrown by rule 
ExtractWindowExpressions


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org