Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22305#discussion_r231416643
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
    @@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
     import org.apache.spark.rdd.RDD
     import org.apache.spark.sql.catalyst.InternalRow
     import org.apache.spark.sql.catalyst.expressions._
    -import org.apache.spark.sql.catalyst.plans.physical._
    -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
    +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
    +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
     import org.apache.spark.sql.execution.arrow.ArrowUtils
    -import org.apache.spark.sql.types.{DataType, StructField, StructType}
    +import org.apache.spark.sql.execution.window._
    +import org.apache.spark.sql.types._
     import org.apache.spark.util.Utils
     
    +/**
    + * This class calculates and outputs windowed aggregates over the rows in 
a single partition.
    + *
    + * It is very similar to [[WindowExec]] and has similar logic. The main 
difference is that this
    + * node doesn't not compute any window aggregation values. Instead, it 
computes the lower and
    + * upper bound for each window (i.e. window bounds) and pass the data and 
indices to python work
    + * to do the actual window aggregation.
    + *
    + * It currently materialize all data associated with the same partition 
key and pass them to
    + * Python. This is not strictly necessary for sliding windows and can be 
improved (by slicing
    + * data into overlapping small chunks and stitch them together).
    + *
    + * This class groups window expressions by their window boundaries so that 
window expressions
    + * with the same window boundaries can share the same window bounds. The 
window bounds are
    + * prepended to the data passed to the python worker.
    + *
    + * For example, if we have:
    + *     avg(v) over specifiedwindowframe(RowFrame, -5, 5),
    + *     avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing),
    + *     avg(v) over specifiedwindowframe(RowFrame, -3, 3),
    + *     max(v) over specifiedwindowframe(RowFrame, -3, 3)
    + *
    + * The python input will look like:
    + * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
    + *
    + * where w1 is specifiedwindowframe(RowFrame, -5, 5)
    + *       w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing)
    + *       w3 is specifiedwindowframe(RowFrame, -3, 3)
    + *
    + * Note that w2 doesn't have bound indices in the python input because its 
unbounded window
    + * so it's bound indices will always be the same.
    + *
    + * Unbounded window also have a different eval type, because:
    + * (1) It doesn't have bound indices as input
    + * (2) The udf only needs to be evaluated once the in python worker 
(because the udf is
    + *     deterministic and window bounds are the same for all windows)
    + *
    + * The logic to compute window bounds is delegated to 
[[WindowFunctionFrame]] and shared with
    + * [[WindowExec]]
    + *
    + * Note this doesn't support partial aggregation and all aggregation is 
computed from the entire
    + * window.
    + */
     case class WindowInPandasExec(
         windowExpression: Seq[NamedExpression],
         partitionSpec: Seq[Expression],
         orderSpec: Seq[SortOrder],
    -    child: SparkPlan) extends UnaryExecNode {
    +    child: SparkPlan
    +) extends WindowExecBase(windowExpression, partitionSpec, orderSpec, 
child) {
    --- End diff --
    
    nit: style
    
    ```
        child: SparkPlan)
      extends ...
    ```



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to