Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r231416643 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala --- @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * It is very similar to [[WindowExec]] and has similar logic. The main difference is that this + * node doesn't not compute any window aggregation values. Instead, it computes the lower and + * upper bound for each window (i.e. window bounds) and pass the data and indices to python work + * to do the actual window aggregation. + * + * It currently materialize all data associated with the same partition key and pass them to + * Python. This is not strictly necessary for sliding windows and can be improved (by slicing + * data into overlapping small chunks and stitch them together). + * + * This class groups window expressions by their window boundaries so that window expressions + * with the same window boundaries can share the same window bounds. The window bounds are + * prepended to the data passed to the python worker. + * + * For example, if we have: + * avg(v) over specifiedwindowframe(RowFrame, -5, 5), + * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing), + * avg(v) over specifiedwindowframe(RowFrame, -3, 3), + * max(v) over specifiedwindowframe(RowFrame, -3, 3) + * + * The python input will look like: + * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v) + * + * where w1 is specifiedwindowframe(RowFrame, -5, 5) + * w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing) + * w3 is specifiedwindowframe(RowFrame, -3, 3) + * + * Note that w2 doesn't have bound indices in the python input because its unbounded window + * so it's bound indices will always be the same. + * + * Unbounded window also have a different eval type, because: + * (1) It doesn't have bound indices as input + * (2) The udf only needs to be evaluated once the in python worker (because the udf is + * deterministic and window bounds are the same for all windows) + * + * The logic to compute window bounds is delegated to [[WindowFunctionFrame]] and shared with + * [[WindowExec]] + * + * Note this doesn't support partial aggregation and all aggregation is computed from the entire + * window. + */ case class WindowInPandasExec( windowExpression: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], - child: SparkPlan) extends UnaryExecNode { + child: SparkPlan +) extends WindowExecBase(windowExpression, partitionSpec, orderSpec, child) { --- End diff -- nit: style ``` child: SparkPlan) extends ... ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org