[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-23 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r548399696



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -153,26 +153,24 @@ class FrameLessOffsetWindowFunctionFrame(
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
-  assert(expressions.toSeq.filterNot(_.input.isInstanceOf[Attribute]).isEmpty)
-
-  /** The input expression of Lead/Lag. */
-  private lazy val inputExpression = expressions.toSeq.map(_.input).head
-
-  /** The index of input expression in the row. */
-  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head
-
   /** Holder the UnsafeRow where the input operator by function is not null. */
   private var nextSelectedRow = EmptyRow
 
   // The number of rows skipped to get the next UnsafeRow where the input 
operator by function
   // is not null.
   private var skippedNonNullCount = 0
 
+  /** Create the projection to determine whether input is null. */
+  private val project = 
UnsafeProjection.create(Seq(IsNull(expressions.head.input)), inputSchema)
+
+  /** Check if the output value of the first index is null. */
+  private val nullCheck: InternalRow => Boolean = row => 
project(row).getBoolean(0)

Review comment:
   OK





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-22 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r547714735



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -147,31 +148,133 @@ class FrameLessOffsetWindowFunctionFrame(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head
+
+  /** Holder the UnsafeRow where the input operator by function is not null. */
+  private var nextSelectedRow = EmptyRow
+
+  /**
+   *  The number of UnsafeRows skipped to get the next UnsafeRow where
+   *  the input operator by function is not null.
+   */
+  private var skipNonNullCount = 0
+
+  /** find the offset row whose input is not null */
+  private def findNextRowWithNonNullInput(): Unit = {
+while (skipNonNullCount < offset && inputIndex < input.length) {
+  val r = WindowFunctionFrame.getNextOrNull(inputIterator)
+  if (!r.isNullAt(idx)) {
+nextSelectedRow = r
+skipNonNullCount += 1
+  }
+  inputIndex += 1
+}
+  }
+
   override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
 input = rows
 inputIterator = input.generateIterator()
 // drain the first few rows if offset is larger than zero
 inputIndex = 0
-while (inputIndex < offset) {
-  if (inputIterator.hasNext) inputIterator.next()
-  inputIndex += 1
+if (ignoreNulls) {
+  findNextRowWithNonNullInput
+} else {
+  while (inputIndex < offset) {
+if (inputIterator.hasNext) inputIterator.next()
+inputIndex += 1
+  }
+  inputIndex = offset
 }
-inputIndex = offset
+  }
+
+  private val doWrite = if (ignoreNulls && offset > 0) {
+// For illustration, here is one example: the input data contains six rows,
+// and the input values of each row are: null, x, null, null, y, null, z, 
null.
+// We use Lead(input, 2) with IGNORE NULLS and the process is as follows:
+// 1. current row -> null, next selected row -> y, output: y;
+// 2. current row -> x, next selected row -> z, output: z;
+// 3. current row -> null, next selected row -> z, output: z;
+// 4. current row -> null, next selected row -> z, output: z;
+// 5. current row -> y, next selected row -> empty, output: null;
+// ... next selected row is empty, all following return null.
+(current: InternalRow) =>
+  if (current.isNullAt(idx)) {
+if (nextSelectedRow == EmptyRow) {
+  // Use default values since the offset row whose input value is not 
null does not exist.
+  fillDefaultValue(current)
+} else {
+  projection(nextSelectedRow)
+}
+  } else {
+skipNonNullCount -= 1
+findNextRowWithNonNullInput
+if (skipNonNullCount == offset) {
+  projection(nextSelectedRow)
+} else {
+  // Use default values since the offset row whose input value is not 
null does not exist.
+  fillDefaultValue(current)
+  nextSelectedRow = EmptyRow
+}
+  }
+  } else if (ignoreNulls && offset < 0) {
+// For illustration, here is one example: the input data contains six rows,
+// and the input values of each row are: null, x, null, null, y, null, z, 
null.
+// We use Lag(input, 1) with IGNORE NULLS and the process is as follows:
+// 1. current row -> null, next selected row -> empty, output: null;
+// 2. current row -> x, next selected row -> empty, output: null;
+// 3. current row -> null, next selected row -> x, output: x;
+// 4. current row -> null, next selected row -> x, output: x;
+// 5. current row -> y, next selected row -> x, output: x;
+// 6. current row -> null, next selected row -> y, output: y;
+// 7. current row -> z, next selected row -> y, output: y;
+// 8. current row -> z, next selected row -> z, output: z;
+val absOffset = Math.abs(offset)
+(current: InternalRow) =>
+  if (nextSelectedRow == EmptyRow && skipNonNullCount == absOffset) {

Review comment:
   Thank you





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-22 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r547714479



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -147,31 +148,133 @@ class FrameLessOffsetWindowFunctionFrame(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head
+
+  /** Holder the UnsafeRow where the input operator by function is not null. */
+  private var nextSelectedRow = EmptyRow
+
+  /**
+   *  The number of UnsafeRows skipped to get the next UnsafeRow where
+   *  the input operator by function is not null.
+   */
+  private var skipNonNullCount = 0
+
+  /** find the offset row whose input is not null */
+  private def findNextRowWithNonNullInput(): Unit = {
+while (skipNonNullCount < offset && inputIndex < input.length) {
+  val r = WindowFunctionFrame.getNextOrNull(inputIterator)
+  if (!r.isNullAt(idx)) {
+nextSelectedRow = r
+skipNonNullCount += 1
+  }
+  inputIndex += 1
+}
+  }
+
   override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
 input = rows
 inputIterator = input.generateIterator()
 // drain the first few rows if offset is larger than zero
 inputIndex = 0
-while (inputIndex < offset) {
-  if (inputIterator.hasNext) inputIterator.next()
-  inputIndex += 1
+if (ignoreNulls) {
+  findNextRowWithNonNullInput
+} else {
+  while (inputIndex < offset) {
+if (inputIterator.hasNext) inputIterator.next()
+inputIndex += 1
+  }
+  inputIndex = offset
 }
-inputIndex = offset
+  }
+
+  private val doWrite = if (ignoreNulls && offset > 0) {
+// For illustration, here is one example: the input data contains six rows,
+// and the input values of each row are: null, x, null, null, y, null, z, 
null.
+// We use Lead(input, 2) with IGNORE NULLS and the process is as follows:
+// 1. current row -> null, next selected row -> y, output: y;
+// 2. current row -> x, next selected row -> z, output: z;
+// 3. current row -> null, next selected row -> z, output: z;
+// 4. current row -> null, next selected row -> z, output: z;
+// 5. current row -> y, next selected row -> empty, output: null;
+// ... next selected row is empty, all following return null.
+(current: InternalRow) =>
+  if (current.isNullAt(idx)) {
+if (nextSelectedRow == EmptyRow) {
+  // Use default values since the offset row whose input value is not 
null does not exist.
+  fillDefaultValue(current)
+} else {
+  projection(nextSelectedRow)
+}
+  } else {
+skipNonNullCount -= 1
+findNextRowWithNonNullInput
+if (skipNonNullCount == offset) {
+  projection(nextSelectedRow)
+} else {
+  // Use default values since the offset row whose input value is not 
null does not exist.
+  fillDefaultValue(current)
+  nextSelectedRow = EmptyRow
+}
+  }
+  } else if (ignoreNulls && offset < 0) {
+// For illustration, here is one example: the input data contains six rows,
+// and the input values of each row are: null, x, null, null, y, null, z, 
null.
+// We use Lag(input, 1) with IGNORE NULLS and the process is as follows:
+// 1. current row -> null, next selected row -> empty, output: null;
+// 2. current row -> x, next selected row -> empty, output: null;
+// 3. current row -> null, next selected row -> x, output: x;
+// 4. current row -> null, next selected row -> x, output: x;
+// 5. current row -> y, next selected row -> x, output: x;
+// 6. current row -> null, next selected row -> y, output: y;
+// 7. current row -> z, next selected row -> y, output: y;
+// 8. current row -> z, next selected row -> z, output: z;
+val absOffset = Math.abs(offset)
+(current: InternalRow) =>
+  if (nextSelectedRow == EmptyRow && skipNonNullCount == absOffset) {
+do {
+  val r = WindowFunctionFrame.getNextOrNull(inputIterator)
+  if (!r.isNullAt(idx)) {
+nextSelectedRow = r
+  }
+  inputIndex += 1
+} while (nextSelectedRow == EmptyRow && inputIndex < input.length)
+  }
+  if (nextSelectedRow == EmptyRow) {
+// Use default values since the offset row whose input value is not 

[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-22 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r547713527



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -147,31 +148,133 @@ class FrameLessOffsetWindowFunctionFrame(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head
+
+  /** Holder the UnsafeRow where the input operator by function is not null. */
+  private var nextSelectedRow = EmptyRow
+
+  /**
+   *  The number of UnsafeRows skipped to get the next UnsafeRow where
+   *  the input operator by function is not null.
+   */
+  private var skipNonNullCount = 0
+
+  /** find the offset row whose input is not null */
+  private def findNextRowWithNonNullInput(): Unit = {
+while (skipNonNullCount < offset && inputIndex < input.length) {
+  val r = WindowFunctionFrame.getNextOrNull(inputIterator)
+  if (!r.isNullAt(idx)) {
+nextSelectedRow = r
+skipNonNullCount += 1
+  }
+  inputIndex += 1
+}
+  }
+
   override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
 input = rows
 inputIterator = input.generateIterator()
 // drain the first few rows if offset is larger than zero
 inputIndex = 0
-while (inputIndex < offset) {
-  if (inputIterator.hasNext) inputIterator.next()
-  inputIndex += 1
+if (ignoreNulls) {
+  findNextRowWithNonNullInput
+} else {
+  while (inputIndex < offset) {
+if (inputIterator.hasNext) inputIterator.next()
+inputIndex += 1
+  }
+  inputIndex = offset
 }
-inputIndex = offset
+  }
+
+  private val doWrite = if (ignoreNulls && offset > 0) {
+// For illustration, here is one example: the input data contains six rows,
+// and the input values of each row are: null, x, null, null, y, null, z, 
null.
+// We use Lead(input, 2) with IGNORE NULLS and the process is as follows:
+// 1. current row -> null, next selected row -> y, output: y;
+// 2. current row -> x, next selected row -> z, output: z;
+// 3. current row -> null, next selected row -> z, output: z;
+// 4. current row -> null, next selected row -> z, output: z;
+// 5. current row -> y, next selected row -> empty, output: null;
+// ... next selected row is empty, all following return null.
+(current: InternalRow) =>
+  if (current.isNullAt(idx)) {
+if (nextSelectedRow == EmptyRow) {
+  // Use default values since the offset row whose input value is not 
null does not exist.
+  fillDefaultValue(current)
+} else {
+  projection(nextSelectedRow)
+}
+  } else {
+skipNonNullCount -= 1
+findNextRowWithNonNullInput
+if (skipNonNullCount == offset) {
+  projection(nextSelectedRow)
+} else {
+  // Use default values since the offset row whose input value is not 
null does not exist.
+  fillDefaultValue(current)
+  nextSelectedRow = EmptyRow
+}
+  }
+  } else if (ignoreNulls && offset < 0) {
+// For illustration, here is one example: the input data contains six rows,
+// and the input values of each row are: null, x, null, null, y, null, z, 
null.
+// We use Lag(input, 1) with IGNORE NULLS and the process is as follows:
+// 1. current row -> null, next selected row -> empty, output: null;
+// 2. current row -> x, next selected row -> empty, output: null;
+// 3. current row -> null, next selected row -> x, output: x;
+// 4. current row -> null, next selected row -> x, output: x;
+// 5. current row -> y, next selected row -> x, output: x;
+// 6. current row -> null, next selected row -> y, output: y;
+// 7. current row -> z, next selected row -> y, output: y;
+// 8. current row -> z, next selected row -> z, output: z;
+val absOffset = Math.abs(offset)
+(current: InternalRow) =>
+  if (nextSelectedRow == EmptyRow && skipNonNullCount == absOffset) {
+do {
+  val r = WindowFunctionFrame.getNextOrNull(inputIterator)
+  if (!r.isNullAt(idx)) {
+nextSelectedRow = r
+  }
+  inputIndex += 1
+} while (nextSelectedRow == EmptyRow && inputIndex < input.length)
+  }
+  if (nextSelectedRow == EmptyRow) {
+// Use default values since the offset row whose input value is not 

[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-22 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r547627200



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -147,31 +148,133 @@ class FrameLessOffsetWindowFunctionFrame(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head
+
+  /** Holder the UnsafeRow where the input operator by function is not null. */
+  private var nextSelectedRow = EmptyRow
+
+  /**
+   *  The number of UnsafeRows skipped to get the next UnsafeRow where
+   *  the input operator by function is not null.
+   */
+  private var skipNonNullCount = 0
+
+  /** find the offset row whose input is not null */
+  private def findNextRowWithNonNullInput(): Unit = {
+while (skipNonNullCount < offset && inputIndex < input.length) {
+  val r = WindowFunctionFrame.getNextOrNull(inputIterator)
+  if (!r.isNullAt(idx)) {
+nextSelectedRow = r
+skipNonNullCount += 1
+  }
+  inputIndex += 1
+}
+  }
+
   override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
 input = rows
 inputIterator = input.generateIterator()
 // drain the first few rows if offset is larger than zero
 inputIndex = 0
-while (inputIndex < offset) {
-  if (inputIterator.hasNext) inputIterator.next()
-  inputIndex += 1
+if (ignoreNulls) {
+  findNextRowWithNonNullInput
+} else {
+  while (inputIndex < offset) {
+if (inputIterator.hasNext) inputIterator.next()
+inputIndex += 1
+  }
+  inputIndex = offset
 }
-inputIndex = offset
+  }
+
+  private val doWrite = if (ignoreNulls && offset > 0) {
+// For illustration, here is one example: the input data contains six rows,
+// and the input values of each row are: null, x, null, null, y, null, z, 
null.
+// We use Lead(input, 2) with IGNORE NULLS and the process is as follows:
+// 1. current row -> null, next selected row -> y, output: y;
+// 2. current row -> x, next selected row -> z, output: z;
+// 3. current row -> null, next selected row -> z, output: z;
+// 4. current row -> null, next selected row -> z, output: z;
+// 5. current row -> y, next selected row -> empty, output: null;
+// ... next selected row is empty, all following return null.
+(current: InternalRow) =>
+  if (current.isNullAt(idx)) {
+if (nextSelectedRow == EmptyRow) {
+  // Use default values since the offset row whose input value is not 
null does not exist.
+  fillDefaultValue(current)
+} else {
+  projection(nextSelectedRow)
+}
+  } else {
+skipNonNullCount -= 1
+findNextRowWithNonNullInput
+if (skipNonNullCount == offset) {
+  projection(nextSelectedRow)
+} else {
+  // Use default values since the offset row whose input value is not 
null does not exist.
+  fillDefaultValue(current)
+  nextSelectedRow = EmptyRow
+}
+  }
+  } else if (ignoreNulls && offset < 0) {
+// For illustration, here is one example: the input data contains six rows,
+// and the input values of each row are: null, x, null, null, y, null, z, 
null.
+// We use Lag(input, 1) with IGNORE NULLS and the process is as follows:

Review comment:
   OK





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-22 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r547627102



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -147,31 +148,133 @@ class FrameLessOffsetWindowFunctionFrame(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head
+
+  /** Holder the UnsafeRow where the input operator by function is not null. */
+  private var nextSelectedRow = EmptyRow
+
+  /**
+   *  The number of UnsafeRows skipped to get the next UnsafeRow where
+   *  the input operator by function is not null.
+   */
+  private var skipNonNullCount = 0
+
+  /** find the offset row whose input is not null */
+  private def findNextRowWithNonNullInput(): Unit = {
+while (skipNonNullCount < offset && inputIndex < input.length) {
+  val r = WindowFunctionFrame.getNextOrNull(inputIterator)
+  if (!r.isNullAt(idx)) {
+nextSelectedRow = r
+skipNonNullCount += 1
+  }
+  inputIndex += 1
+}
+  }
+
   override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
 input = rows
 inputIterator = input.generateIterator()
 // drain the first few rows if offset is larger than zero
 inputIndex = 0
-while (inputIndex < offset) {
-  if (inputIterator.hasNext) inputIterator.next()
-  inputIndex += 1
+if (ignoreNulls) {
+  findNextRowWithNonNullInput
+} else {
+  while (inputIndex < offset) {
+if (inputIterator.hasNext) inputIterator.next()
+inputIndex += 1
+  }
+  inputIndex = offset
 }
-inputIndex = offset
+  }
+
+  private val doWrite = if (ignoreNulls && offset > 0) {
+// For illustration, here is one example: the input data contains six rows,
+// and the input values of each row are: null, x, null, null, y, null, z, 
null.
+// We use Lead(input, 2) with IGNORE NULLS and the process is as follows:
+// 1. current row -> null, next selected row -> y, output: y;
+// 2. current row -> x, next selected row -> z, output: z;
+// 3. current row -> null, next selected row -> z, output: z;
+// 4. current row -> null, next selected row -> z, output: z;
+// 5. current row -> y, next selected row -> empty, output: null;
+// ... next selected row is empty, all following return null.
+(current: InternalRow) =>
+  if (current.isNullAt(idx)) {

Review comment:
   OK





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-22 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r547623075



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -147,31 +148,133 @@ class FrameLessOffsetWindowFunctionFrame(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head
+
+  /** Holder the UnsafeRow where the input operator by function is not null. */
+  private var nextSelectedRow = EmptyRow
+
+  /**
+   *  The number of UnsafeRows skipped to get the next UnsafeRow where

Review comment:
   OK





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-22 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r547622869



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -147,31 +148,133 @@ class FrameLessOffsetWindowFunctionFrame(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head
+
+  /** Holder the UnsafeRow where the input operator by function is not null. */
+  private var nextSelectedRow = EmptyRow
+
+  /**
+   *  The number of UnsafeRows skipped to get the next UnsafeRow where
+   *  the input operator by function is not null.
+   */
+  private var skipNonNullCount = 0

Review comment:
   OK





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-22 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r547622745



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -147,31 +148,133 @@ class FrameLessOffsetWindowFunctionFrame(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head
+
+  /** Holder the UnsafeRow where the input operator by function is not null. */
+  private var nextSelectedRow = EmptyRow
+
+  /**

Review comment:
   OK





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-22 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r547622526



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -147,31 +148,133 @@ class FrameLessOffsetWindowFunctionFrame(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head
+
+  /** Holder the UnsafeRow where the input operator by function is not null. */
+  private var nextSelectedRow = EmptyRow
+
+  /**
+   *  The number of UnsafeRows skipped to get the next UnsafeRow where
+   *  the input operator by function is not null.
+   */
+  private var skipNonNullCount = 0
+
+  /** find the offset row whose input is not null */
+  private def findNextRowWithNonNullInput(): Unit = {
+while (skipNonNullCount < offset && inputIndex < input.length) {
+  val r = WindowFunctionFrame.getNextOrNull(inputIterator)
+  if (!r.isNullAt(idx)) {
+nextSelectedRow = r
+skipNonNullCount += 1
+  }
+  inputIndex += 1
+}
+  }
+
   override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
 input = rows
 inputIterator = input.generateIterator()
 // drain the first few rows if offset is larger than zero
 inputIndex = 0
-while (inputIndex < offset) {
-  if (inputIterator.hasNext) inputIterator.next()
-  inputIndex += 1
+if (ignoreNulls) {
+  findNextRowWithNonNullInput

Review comment:
   OK

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -147,31 +148,133 @@ class FrameLessOffsetWindowFunctionFrame(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head
+
+  /** Holder the UnsafeRow where the input operator by function is not null. */
+  private var nextSelectedRow = EmptyRow
+
+  /**
+   *  The number of UnsafeRows skipped to get the next UnsafeRow where
+   *  the input operator by function is not null.
+   */
+  private var skipNonNullCount = 0
+
+  /** find the offset row whose input is not null */
+  private def findNextRowWithNonNullInput(): Unit = {
+while (skipNonNullCount < offset && inputIndex < input.length) {
+  val r = WindowFunctionFrame.getNextOrNull(inputIterator)
+  if (!r.isNullAt(idx)) {
+nextSelectedRow = r
+skipNonNullCount += 1
+  }
+  inputIndex += 1
+}
+  }
+
   override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
 input = rows
 inputIterator = input.generateIterator()
 // drain the first few rows if offset is larger than zero
 inputIndex = 0
-while (inputIndex < offset) {
-  if (inputIterator.hasNext) inputIterator.next()
-  inputIndex += 1
+if (ignoreNulls) {
+  findNextRowWithNonNullInput
+} else {
+  while (inputIndex < offset) {
+if (inputIterator.hasNext) inputIterator.next()
+inputIndex += 1
+  }
+  inputIndex = offset
 }
-inputIndex = offset
+  }
+
+  private val doWrite = if (ignoreNulls && offset > 0) {
+// For illustration, here is one example: the input data contains six rows,
+// and the input values of each row are: null, x, null, null, y, null, z, 
null.
+// We use Lead(input, 2) with IGNORE NULLS and the process is as follows:
+// 1. current row -> null, next selected row -> y, output: y;
+// 2. current row -> x, next selected row -> z, output: z;
+// 3. current row -> null, next selected row -> z, output: z;
+// 4. current row -> null, next selected row -> z, output: z;
+// 5. current row -> y, next selected row -> empty, output: null;
+// ... next selected row is empty, all following return null.
+(current: InternalRow) =>
+  if (current.isNullAt(idx)) {
+if (nextSelectedRow == EmptyRow) {
+  // Use default values 

[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-22 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r547622370



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -147,31 +148,133 @@ class FrameLessOffsetWindowFunctionFrame(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head
+
+  /** Holder the UnsafeRow where the input operator by function is not null. */
+  private var nextSelectedRow = EmptyRow
+
+  /**
+   *  The number of UnsafeRows skipped to get the next UnsafeRow where
+   *  the input operator by function is not null.
+   */
+  private var skipNonNullCount = 0
+
+  /** find the offset row whose input is not null */
+  private def findNextRowWithNonNullInput(): Unit = {
+while (skipNonNullCount < offset && inputIndex < input.length) {
+  val r = WindowFunctionFrame.getNextOrNull(inputIterator)
+  if (!r.isNullAt(idx)) {
+nextSelectedRow = r
+skipNonNullCount += 1
+  }
+  inputIndex += 1
+}
+  }
+
   override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
 input = rows
 inputIterator = input.generateIterator()
 // drain the first few rows if offset is larger than zero
 inputIndex = 0
-while (inputIndex < offset) {
-  if (inputIterator.hasNext) inputIterator.next()
-  inputIndex += 1
+if (ignoreNulls) {
+  findNextRowWithNonNullInput
+} else {
+  while (inputIndex < offset) {
+if (inputIterator.hasNext) inputIterator.next()
+inputIndex += 1
+  }
+  inputIndex = offset
 }
-inputIndex = offset
+  }
+
+  private val doWrite = if (ignoreNulls && offset > 0) {
+// For illustration, here is one example: the input data contains six rows,
+// and the input values of each row are: null, x, null, null, y, null, z, 
null.
+// We use Lead(input, 2) with IGNORE NULLS and the process is as follows:

Review comment:
   OK





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-22 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r547621589



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -147,31 +148,133 @@ class FrameLessOffsetWindowFunctionFrame(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head

Review comment:
   Because the child plan of `WindowExec` exists a project assign alias 
(e.g. w0) to `concat(value, key)`. So the `inputAttrs` is `[key, order, value, 
w0]`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-22 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r547621589



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -147,31 +148,133 @@ class FrameLessOffsetWindowFunctionFrame(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head

Review comment:
   Because the child plan of `WindowExec` exists a project assign alias 
(e.g. w0) to `concat(value, key)`. So the inputAttrs is `[key, order, value, 
w0]`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-22 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r547621589



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -147,31 +148,133 @@ class FrameLessOffsetWindowFunctionFrame(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head

Review comment:
   Because the child plan of `WindowExec` exists a project assign alias 
(e.g. w0) to `concat(value, key)`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-18 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r545746333



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -147,31 +150,115 @@ class FrameLessOffsetWindowFunctionFrame(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head
+
+  /** Cache some UnsafeRow that will be used many times. */
+  private val rowBuffer = new ArrayBuffer[UnsafeRow]

Review comment:
   Good idea.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-17 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r545000493



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
##
@@ -700,6 +700,44 @@ class DataFrameWindowFunctionsSuite extends QueryTest
 Row("b", 3, null, null, null)))
   }
 
+  test("lead/lag with ignoreNulls") {
+val nullStr: String = null
+val df = Seq(
+  ("a", 0, nullStr),
+  ("a", 1, "x"),
+  ("a", 2, "y"),
+  ("a", 3, "z"),
+  ("a", 4, nullStr),
+  ("b", 1, nullStr),
+  ("b", 2, nullStr)).
+  toDF("key", "order", "value")
+val window = Window.orderBy($"order")
+checkAnswer(
+  df.select(
+$"key",
+$"value",
+$"order",
+lead($"value", 1).over(window),

Review comment:
   The current implement of dsl only accept column.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-16 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r544781741



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
##
@@ -147,31 +150,115 @@ class FrameLessOffsetWindowFunctionFrame(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
 target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
 
+  /** The input expression of Lead/Lag. */
+  private lazy val inputExpression = expressions.toSeq.map(_.input).head
+
+  /** The index of input expression in the row. */
+  private lazy val idx = inputAttrs.zipWithIndex.find(_._1 == 
inputExpression).map(_._2).head
+
+  /** Cache some UnsafeRow that will be used many times. */
+  private val rowBuffer = new ArrayBuffer[UnsafeRow]
+
   override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
 input = rows
 inputIterator = input.generateIterator()
 // drain the first few rows if offset is larger than zero
 inputIndex = 0
-while (inputIndex < offset) {
-  if (inputIterator.hasNext) inputIterator.next()
-  inputIndex += 1
+if (!ignoreNulls || offset < 0) {
+  while (inputIndex < offset) {
+if (inputIterator.hasNext) inputIterator.next()
+inputIndex += 1
+  }
+  inputIndex = offset
 }
-inputIndex = offset
+  }
+
+  private val doWrite = if (ignoreNulls && offset > 0) {
+// For illustration, here is one example: the input data contains six rows,
+// and the input values of each row are: null, x, null, y, null, z, null.
+// We use Lead(input, 1) with IGNORE NULLS and the process is as follows:
+// 1. current row -> null, row buffer: [x], output: x;
+// 2. current row -> x, row buffer: [null, y], output: y;
+// 3. current row -> null, row buffer: [y], output: y;
+// 4. current row -> y, row buffer: [null, z], output: z;
+// 5. current row -> null, row buffer: [z], output: z;
+// 6. current row -> z, row buffer: [null], output: null;
+// 7. current row -> null, row buffer: [], output: null;
+(index: Int, current: InternalRow) =>
+  while (inputIndex <= index) {
+if (inputIterator.hasNext) inputIterator.next()
+inputIndex += 1
+  }
+  while (rowBuffer.filterNot(_ == null).size < offset && inputIndex < 
input.length) {
+val r = WindowFunctionFrame.getNextOrNull(inputIterator)
+if (r.isNullAt(idx)) {
+  rowBuffer += null
+} else {
+  rowBuffer += r
+}
+inputIndex += 1
+  }
+  if (rowBuffer.filterNot(_ == null).size == offset) {
+projection(rowBuffer.filterNot(_ == null).last)
+rowBuffer.remove(0)
+  } else {
+// Use default values since the offset row whose input value is not 
null does not exist.
+fillDefaultValue(current)
+  }
+  } else if (ignoreNulls && offset < 0) {
+// For illustration, here is one example: the input data contains six rows,
+// and the input values of each row are: null, x, null, y, null, z, null.
+// We use Lag(input, 1) with IGNORE NULLS and the process is as follows:
+// 1. current row -> null, row buffer: [], output: null;
+// 2. current row -> x, row buffer: [], output: null;
+// 3. current row -> null, row buffer: [x], output: x;
+// 4. current row -> y, row buffer: [x], output: x;
+// 5. current row -> null, row buffer: [y], output: y;
+// 6. current row -> z, row buffer: [y], output: y;
+// 7. current row -> null, row buffer: [z], output: z;
+val maxSize = Math.abs(offset)
+(index: Int, current: InternalRow) =>
+  if (inputIndex >= 0 && inputIndex < input.length) {
+while (inputIndex < index) {
+  val r = WindowFunctionFrame.getNextOrNull(inputIterator)
+  if (!r.isNullAt(idx)) {
+if (rowBuffer.size == maxSize) {
+  rowBuffer.remove(0)
+}
+rowBuffer += r
+  }
+  inputIndex += 1
+}
+if (rowBuffer.size == maxSize) {
+  projection(rowBuffer.head)
+} else {
+  // Use default values since the offset row whose input value is not 
null does not exist.
+  fillDefaultValue(current)
+}
+  } else {
+// Use default values since the offset row does not exist.
+fillDefaultValue(current)
+inputIndex += 1
+  }
+  } else {

Review comment:
   Return current row itself. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For 

[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-16 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r544781188



##
File path: sql/core/src/main/scala/org/apache/spark/sql/functions.scala
##
@@ -938,8 +938,40 @@ object functions {
* @group window_funcs
* @since 1.4.0
*/
-  def lag(e: Column, offset: Int, defaultValue: Any): Column = withExpr {
-Lag(e.expr, Literal(offset), Literal(defaultValue))
+  def lag(e: Column, offset: Int, defaultValue: Any): Column = {
+lag(e, offset, defaultValue, false)
+  }
+
+  /**
+   * Window function: returns the value that is `offset` rows before the 
current row, and
+   * `defaultValue` if there is less than `offset` rows before the current 
row. `ignoreNulls`
+   * determines whether null values of row are included in or eliminated from 
the calculation.
+   * For example, an `offset` of one will return the previous row at any given 
point in the
+   * window partition.
+   *
+   * This is equivalent to the LAG function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.1.0

Review comment:
   OK





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-16 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r544780768



##
File path: sql/core/src/main/scala/org/apache/spark/sql/functions.scala
##
@@ -938,8 +938,40 @@ object functions {
* @group window_funcs
* @since 1.4.0
*/
-  def lag(e: Column, offset: Int, defaultValue: Any): Column = withExpr {
-Lag(e.expr, Literal(offset), Literal(defaultValue))
+  def lag(e: Column, offset: Int, defaultValue: Any): Column = {
+lag(e, offset, defaultValue, false)
+  }
+
+  /**
+   * Window function: returns the value that is `offset` rows before the 
current row, and
+   * `defaultValue` if there is less than `offset` rows before the current 
row. `ignoreNulls`
+   * determines whether null values of row are included in or eliminated from 
the calculation.
+   * For example, an `offset` of one will return the previous row at any given 
point in the
+   * window partition.
+   *
+   * This is equivalent to the LAG function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.1.0
+   */
+  def lag(columnName: String, offset: Int, defaultValue: Any, ignoreNulls: 
Boolean): Column = {

Review comment:
   OK





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-16 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r544089977



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
##
@@ -116,13 +116,21 @@ trait WindowExecBase extends UnaryExecNode {
* [[WindowExpression]]s and factory function for the 
[[WindowFrameFunction]].
*/
   protected lazy val windowFrameExpressionFactoryPairs = {
-type FrameKey = (String, FrameType, Expression, Expression)
+type FrameKey = (String, FrameType, Expression, Expression, 
Seq[Expression])
 type ExpressionBuffer = mutable.Buffer[Expression]
 val framedFunctions = mutable.Map.empty[FrameKey, (ExpressionBuffer, 
ExpressionBuffer)]
 
 // Add a function and its function to the map for a given frame.
 def collect(tpe: String, fr: SpecifiedWindowFrame, e: Expression, fn: 
Expression): Unit = {
-  val key = (tpe, fr.frameType, fr.lower, fr.upper)
+  val key = fn match {
+// This branch is used for Lead/Lag to support ignoring null.
+// All window frames move in rows. If there are multiple Leads or Lags 
acting on a row
+// and operating on different input expressions, they should not be 
moved uniformly
+// by row. Therefore, we put these functions in different window 
frames.
+case f: FrameLessOffsetWindowFunction if f.ignoreNulls =>
+  (tpe, fr.frameType, fr.lower, fr.upper, 
f.children.map(_.canonicalized))
+case _ => (tpe, fr.frameType, fr.lower, fr.upper, null)

Review comment:
   OK





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-15 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r543863851



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
##
@@ -116,13 +116,17 @@ trait WindowExecBase extends UnaryExecNode {
* [[WindowExpression]]s and factory function for the 
[[WindowFrameFunction]].
*/
   protected lazy val windowFrameExpressionFactoryPairs = {
-type FrameKey = (String, FrameType, Expression, Expression)
+type FrameKey = (String, FrameType, Expression, Expression, Expression)
 type ExpressionBuffer = mutable.Buffer[Expression]
 val framedFunctions = mutable.Map.empty[FrameKey, (ExpressionBuffer, 
ExpressionBuffer)]
 
 // Add a function and its function to the map for a given frame.
 def collect(tpe: String, fr: SpecifiedWindowFrame, e: Expression, fn: 
Expression): Unit = {
-  val key = (tpe, fr.frameType, fr.lower, fr.upper)
+  val key = fn match {
+case f: FrameLessOffsetWindowFunction if f.ignoreNulls =>
+  (tpe, fr.frameType, fr.lower, fr.upper, f.canonicalized)

Review comment:
   Thanks for your suggestion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-12-04 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r535930689



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
##
@@ -136,8 +136,8 @@ trait WindowExecBase extends UnaryExecNode {
   val frame = 
spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
   function match {
 case AggregateExpression(f, _, _, _, _) => collect("AGGREGATE", 
frame, e, f)
-case f: FrameLessOffsetWindowFunction =>
-  collect("FRAME_LESS_OFFSET", f.fakeFrame, e, f)
+case f: FrameLessOffsetWindowFunction => collect(
+  s"FRAME_LESS_OFFSET_${f.ignoreNulls}_${f.input.prettyName}", 
f.fakeFrame, e, f)

Review comment:
   OK





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-11-24 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r530151304



##
File path: sql/core/src/main/scala/org/apache/spark/sql/functions.scala
##
@@ -938,8 +938,40 @@ object functions {
* @group window_funcs
* @since 1.4.0
*/
-  def lag(e: Column, offset: Int, defaultValue: Any): Column = withExpr {
-Lag(e.expr, Literal(offset), Literal(defaultValue))
+  def lag(e: Column, offset: Int, defaultValue: Any): Column = {
+lag(e, offset, defaultValue, false)
+  }
+
+  /**
+   * Window function: returns the value that is `offset` rows before the 
current row, and
+   * `defaultValue` if there is less than `offset` rows before the current 
row. `ignoreNulls`
+   * determines whether null values of row are included in or eliminated from 
the calculation.
+   * For example, an `offset` of one will return the previous row at any given 
point in the
+   * window partition.
+   *
+   * This is equivalent to the LAG function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.1.0
+   */
+  def lag(columnName: String, offset: Int, defaultValue: Any, ignoreNulls: 
Boolean): Column = {

Review comment:
   cc @HyukjinKwon too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-11-20 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r527610591



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
##
@@ -700,6 +700,42 @@ class DataFrameWindowFunctionsSuite extends QueryTest
 Row("b", 3, null, null, null)))
   }
 
+  test("lead with ignoreNulls") {

Review comment:
   I will open another PR to unify the support for `[IGNORE NULLS | RESPECT 
NULLS]` in the SQL parser.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-11-20 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r527610591



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
##
@@ -700,6 +700,42 @@ class DataFrameWindowFunctionsSuite extends QueryTest
 Row("b", 3, null, null, null)))
   }
 
+  test("lead with ignoreNulls") {

Review comment:
   I will open another PR to unify the support for [IGNORE NULLS | RESPECT 
NULLS] in the SQL parser.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-11-20 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r527610591



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
##
@@ -700,6 +700,42 @@ class DataFrameWindowFunctionsSuite extends QueryTest
 Row("b", 3, null, null, null)))
   }
 
+  test("lead with ignoreNulls") {

Review comment:
   I will consider it and unify the support for [IGNORE NULLS | RESPECT 
NULLS] in the SQL parser.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-11-20 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r527571534



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
##
@@ -136,8 +136,8 @@ trait WindowExecBase extends UnaryExecNode {
   val frame = 
spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
   function match {
 case AggregateExpression(f, _, _, _, _) => collect("AGGREGATE", 
frame, e, f)
-case f: FrameLessOffsetWindowFunction =>
-  collect("FRAME_LESS_OFFSET", f.fakeFrame, e, f)
+case f: FrameLessOffsetWindowFunction => collect(

Review comment:
   Maybe we can execute `lead(a, ignoreNulls = true)` and `lead(b, 
ignoreNulls = true)` with one window frame, but it will bring additional 
complexity.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #30387: [SPARK-33443][SQL] LEAD/LAG should support [ IGNORE NULLS | RESPECT NULLS ]

2020-11-20 Thread GitBox


beliefer commented on a change in pull request #30387:
URL: https://github.com/apache/spark/pull/30387#discussion_r527569444



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
##
@@ -136,8 +136,8 @@ trait WindowExecBase extends UnaryExecNode {
   val frame = 
spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
   function match {
 case AggregateExpression(f, _, _, _, _) => collect("AGGREGATE", 
frame, e, f)
-case f: FrameLessOffsetWindowFunction =>
-  collect("FRAME_LESS_OFFSET", f.fakeFrame, e, f)
+case f: FrameLessOffsetWindowFunction => collect(
+  s"FRAME_LESS_OFFSET_${f.ignoreNulls}_${f.input.prettyName}", 
f.fakeFrame, e, f)

Review comment:
   Can't distinguish input completely according to index. Different window 
function may use the same input.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org