beliefer commented on code in PR #41939: URL: https://github.com/apache/spark/pull/41939#discussion_r1277422584
########## sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala: ########## @@ -95,111 +95,23 @@ case class WindowExec( ) protected override def doExecute(): RDD[InternalRow] = { - // Unwrap the window expressions and window frame factories from the map. - val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1) - val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray - val inMemoryThreshold = conf.windowExecBufferInMemoryThreshold - val spillThreshold = conf.windowExecBufferSpillThreshold val spillSize = longMetric("spillSize") - // Start processing. - child.execute().mapPartitions { stream => - new Iterator[InternalRow] { - - // Get all relevant projections. - val result = createResultProjection(expressions) - val grouping = UnsafeProjection.create(partitionSpec, child.output) - - // Manage the stream and the grouping. - var nextRow: UnsafeRow = null - var nextGroup: UnsafeRow = null - var nextRowAvailable: Boolean = false - private[this] def fetchNextRow(): Unit = { - nextRowAvailable = stream.hasNext - if (nextRowAvailable) { - nextRow = stream.next().asInstanceOf[UnsafeRow] - nextGroup = grouping(nextRow) - } else { - nextRow = null - nextGroup = null - } - } - fetchNextRow() - - // Manage the current partition. - val buffer: ExternalAppendOnlyUnsafeRowArray = - new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) - - var bufferIterator: Iterator[UnsafeRow] = _ - - val windowFunctionResult = new SpecificInternalRow(expressions.map(_.dataType)) - val frames = factories.map(_(windowFunctionResult)) - val numFrames = frames.length - private[this] def fetchNextPartition(): Unit = { - // Collect all the rows in the current partition. - // Before we start to fetch new input rows, make a copy of nextGroup. - val currentGroup = nextGroup.copy() - - // clear last partition - buffer.clear() - - while (nextRowAvailable && nextGroup == currentGroup) { - buffer.add(nextRow) - fetchNextRow() - } - - // Setup the frames. - var i = 0 - while (i < numFrames) { - frames(i).prepare(buffer) - i += 1 - } + val evaluatorFactory = + new WindowEvaluatorFactory( + windowExpression, + partitionSpec, + orderSpec, + child.output, + spillSize) - // Setup iteration - rowIndex = 0 - bufferIterator = buffer.generateIterator() - } - - // Iteration - var rowIndex = 0 - - override final def hasNext: Boolean = { - val found = (bufferIterator != null && bufferIterator.hasNext) || nextRowAvailable - if (!found) { - // clear final partition - buffer.clear() - spillSize += buffer.spillSize - } - found - } - - val join = new JoinedRow - override final def next(): InternalRow = { - // Load the next partition if we need to. - if ((bufferIterator == null || !bufferIterator.hasNext) && nextRowAvailable) { - fetchNextPartition() - } - - if (bufferIterator.hasNext) { - val current = bufferIterator.next() - - // Get the results for the window frames. - var i = 0 - while (i < numFrames) { - frames(i).write(rowIndex, current) - i += 1 - } - - // 'Merge' the input row with the window function result - join(current, windowFunctionResult) - rowIndex += 1 - - // Return the projection. - result(join) - } else { - throw new NoSuchElementException - } - } + // Start processing. + if (conf.usePartitionEvaluator) { + child.execute().mapPartitionsWithEvaluator(evaluatorFactory) + } else { + child.execute().mapPartitions { iter => + val evaluator = evaluatorFactory.createEvaluator() + evaluator.eval(0, iter) Review Comment: Thank you for the reminder. I will take a look! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org