ueshin commented on a change in pull request #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240464740
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##########
 @@ -144,24 +282,107 @@ case class WindowInPandasExec(
         queue.close()
       }
 
-      val inputProj = UnsafeProjection.create(allInputs, child.output)
-      val pythonInput = grouped.map { case (_, rows) =>
-        rows.map { row =>
-          queue.add(row.asInstanceOf[UnsafeRow])
-          inputProj(row)
+      val stream = iter.map { row =>
+        queue.add(row.asInstanceOf[UnsafeRow])
+        row
+      }
+
+      val pythonInput = new Iterator[Iterator[UnsafeRow]] {
+
+        // Manage the stream and the grouping.
+        var nextRow: UnsafeRow = null
+        var nextGroup: UnsafeRow = null
+        var nextRowAvailable: Boolean = false
+        private[this] def fetchNextRow() {
+          nextRowAvailable = stream.hasNext
+          if (nextRowAvailable) {
+            nextRow = stream.next().asInstanceOf[UnsafeRow]
+            nextGroup = grouping(nextRow)
+          } else {
+            nextRow = null
+            nextGroup = null
+          }
+        }
+        fetchNextRow()
+
+        // Manage the current partition.
+        val buffer: ExternalAppendOnlyUnsafeRowArray =
+          new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, 
spillThreshold)
+        var bufferIterator: Iterator[UnsafeRow] = _
+
+        val indexRow = new 
SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType))
+
+        val frames = factories.map(_(indexRow))
+
+        private[this] def fetchNextPartition() {
+          // Collect all the rows in the current partition.
+          // Before we start to fetch new input rows, make a copy of nextGroup.
+          val currentGroup = nextGroup.copy()
+
+          // clear last partition
+          buffer.clear()
+
+          while (nextRowAvailable && nextGroup == currentGroup) {
 
 Review comment:
   Oh, I see. yeah, it should work. I missed that they are `UnsafeRow`s. Thanks!
   @icexelloss Could you close #23279 and leave it as is? I am sorry for taking 
your time.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to