beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r544781741



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##########
@@ -147,31 +150,115 @@ class FrameLessOffsetWindowFunctionFrame(
     expressions: Array[OffsetWindowFunction],
     inputSchema: Seq[Attribute],
     newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-    offset: Int)
+    offset: Int,
+    ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
     target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head
+
+  /** Cache some UnsafeRow that will be used many times. */
+  private val rowBuffer = new ArrayBuffer[UnsafeRow]
+
   override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
     input = rows
     inputIterator = input.generateIterator()
     // drain the first few rows if offset is larger than zero
     inputIndex = 0
-    while (inputIndex < offset) {
-      if (inputIterator.hasNext) inputIterator.next()
-      inputIndex += 1
+    if (!ignoreNulls || offset < 0) {
+      while (inputIndex < offset) {
+        if (inputIterator.hasNext) inputIterator.next()
+        inputIndex += 1
+      }
+      inputIndex = offset
     }
-    inputIndex = offset
+  }
+
+  private val doWrite = if (ignoreNulls && offset > 0) {
+    // For illustration, here is one example: the input data contains six rows,
+    // and the input values of each row are: null, x, null, y, null, z, null.
+    // We use Lead(input, 1) with IGNORE NULLS and the process is as follows:
+    // 1. current row -> null, row buffer: [x], output: x;
+    // 2. current row -> x, row buffer: [null, y], output: y;
+    // 3. current row -> null, row buffer: [y], output: y;
+    // 4. current row -> y, row buffer: [null, z], output: z;
+    // 5. current row -> null, row buffer: [z], output: z;
+    // 6. current row -> z, row buffer: [null], output: null;
+    // 7. current row -> null, row buffer: [], output: null;
+    (index: Int, current: InternalRow) =>
+      while (inputIndex <= index) {
+        if (inputIterator.hasNext) inputIterator.next()
+        inputIndex += 1
+      }
+      while (rowBuffer.filterNot(_ == null).size < offset && inputIndex < 
input.length) {
+        val r = WindowFunctionFrame.getNextOrNull(inputIterator)
+        if (r.isNullAt(idx)) {
+          rowBuffer += null
+        } else {
+          rowBuffer += r
+        }
+        inputIndex += 1
+      }
+      if (rowBuffer.filterNot(_ == null).size == offset) {
+        projection(rowBuffer.filterNot(_ == null).last)
+        rowBuffer.remove(0)
+      } else {
+        // Use default values since the offset row whose input value is not 
null does not exist.
+        fillDefaultValue(current)
+      }
+  } else if (ignoreNulls && offset < 0) {
+    // For illustration, here is one example: the input data contains six rows,
+    // and the input values of each row are: null, x, null, y, null, z, null.
+    // We use Lag(input, 1) with IGNORE NULLS and the process is as follows:
+    // 1. current row -> null, row buffer: [], output: null;
+    // 2. current row -> x, row buffer: [], output: null;
+    // 3. current row -> null, row buffer: [x], output: x;
+    // 4. current row -> y, row buffer: [x], output: x;
+    // 5. current row -> null, row buffer: [y], output: y;
+    // 6. current row -> z, row buffer: [y], output: y;
+    // 7. current row -> null, row buffer: [z], output: z;
+    val maxSize = Math.abs(offset)
+    (index: Int, current: InternalRow) =>
+      if (inputIndex >= 0 && inputIndex < input.length) {
+        while (inputIndex < index) {
+          val r = WindowFunctionFrame.getNextOrNull(inputIterator)
+          if (!r.isNullAt(idx)) {
+            if (rowBuffer.size == maxSize) {
+              rowBuffer.remove(0)
+            }
+            rowBuffer += r
+          }
+          inputIndex += 1
+        }
+        if (rowBuffer.size == maxSize) {
+          projection(rowBuffer.head)
+        } else {
+          // Use default values since the offset row whose input value is not 
null does not exist.
+          fillDefaultValue(current)
+        }
+      } else {
+        // Use default values since the offset row does not exist.
+        fillDefaultValue(current)
+        inputIndex += 1
+      }
+  } else {

Review comment:
       Return current row itself. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to