[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-06-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r124623866
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala 
---
@@ -323,78 +324,43 @@ case class WindowExec(
   val currentGroup = nextGroup.copy()
 
   // clear last partition
-  if (sorter != null) {
-// the last sorter of this task will be cleaned up via task 
completion listener
-sorter.cleanupResources()
-sorter = null
-  } else {
-rows.clear()
-  }
+  buffer.clear()
 
   while (nextRowAvailable && nextGroup == currentGroup) {
-if (sorter == null) {
-  rows += nextRow.copy()
-
-  if (rows.length >= 4096) {
-// We will not sort the rows, so prefixComparator and 
recordComparator are null.
-sorter = UnsafeExternalSorter.create(
-  TaskContext.get().taskMemoryManager(),
-  SparkEnv.get.blockManager,
-  SparkEnv.get.serializerManager,
-  TaskContext.get(),
-  null,
-  null,
-  1024,
-  SparkEnv.get.memoryManager.pageSizeBytes,
-  
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
-
UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
-  false)
-rows.foreach { r =>
-  sorter.insertRecord(r.getBaseObject, r.getBaseOffset, 
r.getSizeInBytes, 0, false)
-}
-rows.clear()
-  }
-} else {
-  sorter.insertRecord(nextRow.getBaseObject, 
nextRow.getBaseOffset,
-nextRow.getSizeInBytes, 0, false)
-}
+buffer.add(nextRow)
 fetchNextRow()
   }
-  if (sorter != null) {
-rowBuffer = new ExternalRowBuffer(sorter, inputFields)
-  } else {
-rowBuffer = new ArrayRowBuffer(rows)
-  }
 
   // Setup the frames.
   var i = 0
   while (i < numFrames) {
-frames(i).prepare(rowBuffer.copy())
+frames(i).prepare(buffer)
--- End diff --

No copy?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-03-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16909


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-03-08 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r105094904
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -674,6 +675,24 @@ object SQLConf {
   .stringConf
   .createWithDefault(TimeZone.getDefault().getID())
 
+  val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD =
+buildConf("spark.sql.windowExec.buffer.spill.threshold")
+  .doc("Threshold for number of rows buffered in window operator")
+  .intConf
+  .createWithDefault(4096)
+
+  val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD =
+buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold")
+  .doc("Threshold for number of rows buffered in sort merge join 
operator")
+  .intConf
+  .createWithDefault(Int.MaxValue)
+
+  val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD =
+buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold")
+  .doc("Threshold for number of rows buffered in cartesian product 
operator")
+  .intConf
+  
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
--- End diff --

marked as internal


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-03-08 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r105093717
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
 ---
@@ -164,9 +176,12 @@ private[window] final class SlidingWindowFunctionFrame(
   private[this] var inputLowIndex = 0
 
   /** Prepare the frame for calculating a new partition. Reset all 
variables. */
-  override def prepare(rows: RowBuffer): Unit = {
+  override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
 input = rows
-nextRow = rows.next()
+inputIterator = input.generateIterator()
+if (inputIterator.hasNext) {
--- End diff --

Technically I was keeping it in-par with the earlier code. If there are any 
`rows` in the buffer, let `nextRow` point to it ... otherwise `nextRow` was 
already set to `null`. Anyways, I have changed this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-03-08 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r105093397
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
 ---
@@ -341,25 +364,27 @@ private[window] final class 
UnboundedFollowingWindowFunctionFrame(
   override def write(index: Int, current: InternalRow): Unit = {
 var bufferUpdated = index == 0
 
-// Duplicate the input to have a new iterator
-val tmp = input.copy()
-
-// Drop all rows from the buffer for which the input row value is 
smaller than
+// Ignore all the rows from the buffer for which the input row value 
is smaller than
 // the output row lower bound.
-tmp.skip(inputIndex)
-var nextRow = tmp.next()
+val iterator = input.generateIterator(startIndex = inputIndex)
+
+def getNextOrNull(iterator: Iterator[UnsafeRow]): UnsafeRow = {
--- End diff --

@hvanhovell @davies : In case of windowing, there are no nulls expected so 
`null` is being used as an indicator for no more data left to be read. Earlier 
this was being done in `RowBuffer.next` [0] which this PR gets rid of (notice 
that its not throwing any exception once the end is reached). Adding this as a 
method in iterator class might be bad given this assumption specific to 
windowing usage (ie. no nulls).

I have created a static method within `WindowFunctionFrame` so that there 
would be sharing amongst all the windowing frame implementations.

[0] : 
https://github.com/apache/spark/blob/417140e441505f20eb5bd4943ce216c3ec6adc10/sql/core/src/main/scala/org/apache/spark/sql/execution/window/RowBuffer.scala#L37


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r105014914
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
 ---
@@ -164,9 +176,12 @@ private[window] final class SlidingWindowFunctionFrame(
   private[this] var inputLowIndex = 0
 
   /** Prepare the frame for calculating a new partition. Reset all 
variables. */
-  override def prepare(rows: RowBuffer): Unit = {
+  override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
 input = rows
-nextRow = rows.next()
+inputIterator = input.generateIterator()
+if (inputIterator.hasNext) {
--- End diff --

In all fairness: the old code (`rows.next()`) returns `null` if the group 
is empty. However in this case the `rows` buffer should contain one or more 
rows; so an assert might be better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-03-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r105007318
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
 ---
@@ -341,25 +364,27 @@ private[window] final class 
UnboundedFollowingWindowFunctionFrame(
   override def write(index: Int, current: InternalRow): Unit = {
 var bufferUpdated = index == 0
 
-// Duplicate the input to have a new iterator
-val tmp = input.copy()
-
-// Drop all rows from the buffer for which the input row value is 
smaller than
+// Ignore all the rows from the buffer for which the input row value 
is smaller than
 // the output row lower bound.
-tmp.skip(inputIndex)
-var nextRow = tmp.next()
+val iterator = input.generateIterator(startIndex = inputIndex)
+
+def getNextOrNull(iterator: Iterator[UnsafeRow]): UnsafeRow = {
--- End diff --

If we expect a row somewhere, I'd like to call next(), it will throw an 
exception if something goes wrong, otherwise you will silient got wrong result.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-03-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r105006782
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
 ---
@@ -164,9 +176,12 @@ private[window] final class SlidingWindowFunctionFrame(
   private[this] var inputLowIndex = 0
 
   /** Prepare the frame for calculating a new partition. Reset all 
variables. */
-  override def prepare(rows: RowBuffer): Unit = {
+  override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
 input = rows
-nextRow = rows.next()
+inputIterator = input.generateIterator()
+if (inputIterator.hasNext) {
--- End diff --

Should this be a assert? Or you may suddenly have a wrong result.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-03-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r105006306
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -674,6 +675,24 @@ object SQLConf {
   .stringConf
   .createWithDefault(TimeZone.getDefault().getID())
 
+  val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD =
+buildConf("spark.sql.windowExec.buffer.spill.threshold")
+  .doc("Threshold for number of rows buffered in window operator")
+  .intConf
+  .createWithDefault(4096)
+
+  val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD =
+buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold")
+  .doc("Threshold for number of rows buffered in sort merge join 
operator")
+  .intConf
+  .createWithDefault(Int.MaxValue)
+
+  val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD =
+buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold")
+  .doc("Threshold for number of rows buffered in cartesian product 
operator")
+  .intConf
+  
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
--- End diff --

There are too many nobs, should we mark them as internal?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r105001564
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
 ---
@@ -341,25 +364,27 @@ private[window] final class 
UnboundedFollowingWindowFunctionFrame(
   override def write(index: Int, current: InternalRow): Unit = {
 var bufferUpdated = index == 0
 
-// Duplicate the input to have a new iterator
-val tmp = input.copy()
-
-// Drop all rows from the buffer for which the input row value is 
smaller than
+// Ignore all the rows from the buffer for which the input row value 
is smaller than
 // the output row lower bound.
-tmp.skip(inputIndex)
-var nextRow = tmp.next()
+val iterator = input.generateIterator(startIndex = inputIndex)
+
+def getNextOrNull(iterator: Iterator[UnsafeRow]): UnsafeRow = {
--- End diff --

NIT: Perhaps add this to the iterator class? Since we are using a similar 
construct in more than one place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-03-08 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r104950862
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.serializer.SerializerManager
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
+import org.apache.spark.storage.BlockManager
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there a predefined
+ * threshold of rows is reached.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(
+taskMemoryManager: TaskMemoryManager,
+blockManager: BlockManager,
+serializerManager: SerializerManager,
+taskContext: TaskContext,
+initialSize: Int,
+pageSizeBytes: Long,
+numRowsSpillThreshold: Int) extends Logging {
+
+  def this(numRowsSpillThreshold: Int) {
+this(
+  TaskContext.get().taskMemoryManager(),
+  SparkEnv.get.blockManager,
+  SparkEnv.get.serializerManager,
+  TaskContext.get(),
+  1024,
+  SparkEnv.get.memoryManager.pageSizeBytes,
+  numRowsSpillThreshold)
+  }
+
+  private val initialSizeOfInMemoryBuffer =
+Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsSpillThreshold)
+
+  private val inMemoryBuffer = if (initialSizeOfInMemoryBuffer > 0) {
+new ArrayBuffer[UnsafeRow](initialSizeOfInMemoryBuffer)
+  } else {
+null
+  }
+
+  private var spillableArray: UnsafeExternalSorter = _
+  private var numRows = 0
+
+  // A counter to keep track of total modifications done to this array 
since its creation.
+  // This helps to invalidate iterators when there are changes done to the 
backing array.
+  private var modificationsCount: Long = 0
--- End diff --

> this is to detect modifications in the same thread right?

Yes

> When does this happen?

It can happen when a client generates an iterator but has not iterated the 
whole array. Later, there are new entries being added to the array OR the array 
is cleared. Any attempt to use the old iterator now can give inconsistent view 
of the array and this counter helps with invalidation of the iterator.

In my opinion, none of the existing places where this array is used hits 
this scenario. But having said that I added this to make this data structure 
robust to the new usages would be protected against such usage (or even if 
there are bugs introduced in the existing usages which would otherwise silently 
work).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe

[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r104940657
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.serializer.SerializerManager
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
+import org.apache.spark.storage.BlockManager
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there a predefined
+ * threshold of rows is reached.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(
+taskMemoryManager: TaskMemoryManager,
+blockManager: BlockManager,
+serializerManager: SerializerManager,
+taskContext: TaskContext,
+initialSize: Int,
+pageSizeBytes: Long,
+numRowsSpillThreshold: Int) extends Logging {
+
+  def this(numRowsSpillThreshold: Int) {
+this(
+  TaskContext.get().taskMemoryManager(),
+  SparkEnv.get.blockManager,
+  SparkEnv.get.serializerManager,
+  TaskContext.get(),
+  1024,
+  SparkEnv.get.memoryManager.pageSizeBytes,
+  numRowsSpillThreshold)
+  }
+
+  private val initialSizeOfInMemoryBuffer =
+Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsSpillThreshold)
+
+  private val inMemoryBuffer = if (initialSizeOfInMemoryBuffer > 0) {
+new ArrayBuffer[UnsafeRow](initialSizeOfInMemoryBuffer)
+  } else {
+null
+  }
+
+  private var spillableArray: UnsafeExternalSorter = _
+  private var numRows = 0
+
+  // A counter to keep track of total modifications done to this array 
since its creation.
+  // This helps to invalidate iterators when there are changes done to the 
backing array.
+  private var modificationsCount: Long = 0
--- End diff --

Just for my understanding: this is to detect modifications in the same 
thread right? When does this happen?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-03-01 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r103749494
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there a predefined
+ * threshold of rows is reached.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
--- End diff --

Update: I created a test version by removing the `ArrayBuffer` and let 
alone `ExternalUnsafeSorter` be used to back the data. Over prod queries, this 
turned out to be slightly slower unlike what the micro-benchmark results 
showed. I will stick with the current approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-16 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r101693864
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there a predefined
+ * threshold of rows is reached.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
--- End diff --

Good point. My original guess before writing benchmark was `ArrayBuffer` 
would be superior to `ExternalUnsafeSorter` so having a impl which would 
initially behave like `ArrayBuffer` but later switch to `ExternalUnsafeSorter` 
was what I went with. I won't make this call solely based on the micro 
benchmark results as it might not reflect what acutally happens when a query 
runs because there are other operations that happen while the buffer is 
populated and accessed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-15 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r101454158
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there a predefined
+ * threshold of rows is reached.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
--- End diff --

From the comparison results, `ExternalUnsafeSorter` performs slightly 
better?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r101214678
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there a predefined
+ * threshold of rows is reached.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
--- End diff --

How does this compare to use ExternalUnsafeSorter directly? There is a 
similar use case here: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala#L42


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r101201086
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there a predefined
+ * threshold of rows is reached.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
+  private val initialSizeOfInMemoryBuffer =
+Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsSpillThreshold)
+
+  private val inMemoryBuffer = new 
ArrayBuffer[UnsafeRow](initialSizeOfInMemoryBuffer)
+
+  private var spillableArray: UnsafeExternalSorter = _
+  private var numRows = 0
+
+  // A counter to keep track of total modifications done to this array 
since its creation.
+  // This helps to invalidate iterators when there are changes done to the 
backing array.
+  private var modificationsCount: Long = 0
+
+  private var numFieldsPerRow = 0
+
+  def length: Int = numRows
+
+  def isEmpty: Boolean = numRows == 0
+
+  /**
+   * Clears up resources (eg. memory) held by the backing storage
+   */
+  def clear(): Unit = {
+if (spillableArray != null) {
+  // The last `spillableArray` of this task will be cleaned up via 
task completion listener
+  // inside `UnsafeExternalSorter`
+  spillableArray.cleanupResources()
+  spillableArray = null
+} else {
+  inMemoryBuffer.clear()
+}
+numFieldsPerRow = 0
+numRows = 0
+modificationsCount += 1
+  }
+
+  def add(unsafeRow: UnsafeRow): Unit = {
+if (numRows < numRowsSpillThreshold) {
+  inMemoryBuffer += unsafeRow.copy()
+} else {
+  if (spillableArray == null) {
+logInfo(s"Reached spill threshold of $numRowsSpillThreshold rows, 
switching to " +
+  s"${classOf[UnsafeExternalSorter].getName}")
+
+// We will not sort the rows, so prefixComparator and 
recordComparator are null
+spillableArray = UnsafeExternalSorter.create(
+  TaskContext.get().taskMemoryManager(),
+  SparkEnv.get.blockManager,
+  SparkEnv.get.serializerManager,
+  TaskContext.get(),
+  null,
+  null,
+  if (numRowsSpillThreshold > 2) numRowsSpillThreshold / 2 else 1,
+  SparkEnv.get.memoryManager.pageSizeBytes,
+  numRowsSpillThreshold,
+  false)
+
+inMemoryBuffer.foreach(existingUnsafeRow =>
+  spillableArray.insertRecord(
+existingUnsafeRow.getBaseObject,
+existingUnsafeRow.getBaseOffset,
+existingUnsafeRow.getSizeInBytes,
+0,
+false)
+)
+inMemoryBuffer.clear()
+numFieldsPerRow = unsafeRow.nu

[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r101199851
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there is insufficient
+ * space for it to grow.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
+  private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = 
ArrayBuffer.empty[UnsafeRow]
+
+  private var spillableArray: UnsafeExternalSorter = null
+  private var numElements = 0
+
+  // A counter to keep track of total additions made to this array since 
its creation.
+  // This helps to invalidate iterators when there are changes done to the 
backing array.
+  private var modCount: Long = 0
--- End diff --

EDIT: I changed to not use inbuilt iterator from `ArrayBuffer` and instead 
use a counter to iterate over the array. However I still depend on inbuilt 
iterator of `UnsafeExternalSorter`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100926134
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -376,8 +386,15 @@ case class SortMergeJoinExec(
 
 // A list to hold all matched rows from right side.
 val matches = ctx.freshName("matches")
-val clsName = classOf[java.util.ArrayList[InternalRow]].getName
-ctx.addMutableState(clsName, matches, s"$matches = new $clsName();")
+val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName
+
+val spillThreshold =
--- End diff --

moved to a method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100899895
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there is insufficient
+ * space for it to grow.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
+  private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = 
ArrayBuffer.empty[UnsafeRow]
+
+  private var spillableArray: UnsafeExternalSorter = null
+  private var numElements = 0
+
+  // A counter to keep track of total additions made to this array since 
its creation.
+  // This helps to invalidate iterators when there are changes done to the 
backing array.
+  private var modCount: Long = 0
+
+  private var numFieldPerRow = 0
+
+  def length: Int = numElements
+
+  def isEmpty: Boolean = numElements == 0
+
+  /**
+   * Clears up resources (eg. memory) held by the backing storage
+   */
+  def clear(): Unit = {
+if (spillableArray != null) {
+  // The last `spillableArray` of this task will be cleaned up via 
task completion listener
+  // inside `UnsafeExternalSorter`
+  spillableArray.cleanupResources()
+  spillableArray = null
+} else {
+  inMemoryBuffer.clear()
+}
+numFieldPerRow = 0
+numElements = 0
+modCount += 1
+  }
+
+  def add(entry: InternalRow): Unit = {
+val unsafeRow = entry.asInstanceOf[UnsafeRow]
+
+if (numElements < numRowsSpillThreshold) {
+  inMemoryBuffer += unsafeRow.copy()
+} else {
+  if (spillableArray == null) {
+logInfo(s"Reached spill threshold of $numRowsSpillThreshold rows, 
switching to " +
+  
s"${classOf[org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray].getName}")
--- End diff --

This was a typo. Thanks for pointing this out. Corrected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100900763
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there is insufficient
+ * space for it to grow.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
+  private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = 
ArrayBuffer.empty[UnsafeRow]
+
+  private var spillableArray: UnsafeExternalSorter = null
+  private var numElements = 0
+
+  // A counter to keep track of total additions made to this array since 
its creation.
--- End diff --

changed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100925208
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala 
---
@@ -285,6 +283,9 @@ case class WindowExec(
 val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1)
 val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray
 
+var spillThreshold =
+  
sqlContext.conf.getConfString("spark.sql.windowExec.buffer.spill.threshold", 
"4096").toInt
--- End diff --

created a `SQLConf`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100807225
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala 
---
@@ -310,10 +311,15 @@ case class WindowExec(
 fetchNextRow()
 
 // Manage the current partition.
-val rows = ArrayBuffer.empty[UnsafeRow]
 val inputFields = child.output.length
-var sorter: UnsafeExternalSorter = null
 var rowBuffer: RowBuffer = null
+if (sqlContext == null) {
--- End diff --

Removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r101150812
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there is insufficient
+ * space for it to grow.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
+  private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = 
ArrayBuffer.empty[UnsafeRow]
+
+  private var spillableArray: UnsafeExternalSorter = null
+  private var numElements = 0
+
+  // A counter to keep track of total additions made to this array since 
its creation.
+  // This helps to invalidate iterators when there are changes done to the 
backing array.
+  private var modCount: Long = 0
+
+  private var numFieldPerRow = 0
+
+  def length: Int = numElements
+
+  def isEmpty: Boolean = numElements == 0
+
+  /**
+   * Clears up resources (eg. memory) held by the backing storage
+   */
+  def clear(): Unit = {
+if (spillableArray != null) {
+  // The last `spillableArray` of this task will be cleaned up via 
task completion listener
+  // inside `UnsafeExternalSorter`
+  spillableArray.cleanupResources()
+  spillableArray = null
+} else {
+  inMemoryBuffer.clear()
+}
+numFieldPerRow = 0
+numElements = 0
+modCount += 1
+  }
+
+  def add(entry: InternalRow): Unit = {
+val unsafeRow = entry.asInstanceOf[UnsafeRow]
+
+if (numElements < numRowsSpillThreshold) {
+  inMemoryBuffer += unsafeRow.copy()
+} else {
+  if (spillableArray == null) {
+logInfo(s"Reached spill threshold of $numRowsSpillThreshold rows, 
switching to " +
+  
s"${classOf[org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray].getName}")
+
+// We will not sort the rows, so prefixComparator and 
recordComparator are null
+spillableArray = UnsafeExternalSorter.create(
+  TaskContext.get().taskMemoryManager(),
+  SparkEnv.get.blockManager,
+  SparkEnv.get.serializerManager,
+  TaskContext.get(),
+  null,
+  null,
+  if (numRowsSpillThreshold > 2) numRowsSpillThreshold / 2 else 1,
+  SparkEnv.get.memoryManager.pageSizeBytes,
+  numRowsSpillThreshold,
+  false)
+
+inMemoryBuffer.foreach(existingUnsafeRow =>
+  spillableArray.insertRecord(
+existingUnsafeRow.getBaseObject,
+existingUnsafeRow.getBaseOffset,
+existingUnsafeRow.getSizeInBytes,
+0,
+false)
+)
+inMemoryBuffer.clear()
+numFieldPerRow = unsafeRow.numFields()
+  }
+
+  spillableArray.insertRecord(
+unsafeRow.getBase

[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100899694
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark._
+import org.apache.spark.memory.MemoryTestingUtils
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with 
LocalSparkContext {
+  private val random = new java.util.Random()
+
+  private def createSparkConf(): SparkConf = {
+val conf = new SparkConf(false)
+// Make the Java serializer write a reset instruction (TC_RESET) after 
each object to test
+// for a bug we had with bytes written past the last object in a batch 
(SPARK-2792)
+conf.set("spark.serializer.objectStreamReset", "1")
+conf.set("spark.serializer", 
"org.apache.spark.serializer.JavaSerializer")
+conf
+  }
+
+  private def withExternalArray(spillThreshold: Int)
+   (f: ExternalAppendOnlyUnsafeRowArray => 
Unit): Unit = {
+sc = new SparkContext("local", "test", createSparkConf())
+val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
+TaskContext.setTaskContext(taskContext)
+val array = new ExternalAppendOnlyUnsafeRowArray(spillThreshold)
+
+try f(array) finally {
+  array.clear()
+  sc.stop()
+}
+  }
+
+  private def insertRow(array: ExternalAppendOnlyUnsafeRowArray): Long = {
+val valueInserted = random.nextLong()
+
+val row = new UnsafeRow(1)
+row.pointTo(new Array[Byte](64), 16)
+row.setLong(0, valueInserted)
+array.add(row)
+valueInserted
+  }
+
+  private def checkIfValueExits(iterator: Iterator[UnsafeRow], 
expectedValue: Long): Unit = {
--- End diff --

fixed the typo


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100818273
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there is insufficient
+ * space for it to grow.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
+  private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = 
ArrayBuffer.empty[UnsafeRow]
+
+  private var spillableArray: UnsafeExternalSorter = null
+  private var numElements = 0
+
+  // A counter to keep track of total additions made to this array since 
its creation.
+  // This helps to invalidate iterators when there are changes done to the 
backing array.
+  private var modCount: Long = 0
+
+  private var numFieldPerRow = 0
+
+  def length: Int = numElements
+
+  def isEmpty: Boolean = numElements == 0
+
+  /**
+   * Clears up resources (eg. memory) held by the backing storage
+   */
+  def clear(): Unit = {
+if (spillableArray != null) {
+  // The last `spillableArray` of this task will be cleaned up via 
task completion listener
+  // inside `UnsafeExternalSorter`
+  spillableArray.cleanupResources()
+  spillableArray = null
+} else {
+  inMemoryBuffer.clear()
+}
+numFieldPerRow = 0
+numElements = 0
+modCount += 1
+  }
+
+  def add(entry: InternalRow): Unit = {
+val unsafeRow = entry.asInstanceOf[UnsafeRow]
--- End diff --

Moved cast to client


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100899498
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there is insufficient
+ * space for it to grow.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
+  private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = 
ArrayBuffer.empty[UnsafeRow]
+
+  private var spillableArray: UnsafeExternalSorter = null
+  private var numElements = 0
+
+  // A counter to keep track of total additions made to this array since 
its creation.
+  // This helps to invalidate iterators when there are changes done to the 
backing array.
+  private var modCount: Long = 0
+
+  private var numFieldPerRow = 0
--- End diff --

renamed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r101150862
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/RowBuffer.scala 
---
@@ -17,99 +17,33 @@
 
 package org.apache.spark.sql.execution.window
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
-
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
 
 /**
- * The interface of row buffer for a partition. In absence of a buffer 
pool (with locking), the
+ * Represents row buffer for a partition. In absence of a buffer pool 
(with locking), the
  * row buffer is used to materialize a partition of rows since we need to 
repeatedly scan these
  * rows in window function processing.
  */
-private[window] abstract class RowBuffer {
-
-  /** Number of rows. */
-  def size: Int
-
-  /** Return next row in the buffer, null if no more left. */
-  def next(): InternalRow
-
-  /** Skip the next `n` rows. */
-  def skip(n: Int): Unit
-
-  /** Return a new RowBuffer that has the same rows. */
-  def copy(): RowBuffer
-}
-
-/**
- * A row buffer based on ArrayBuffer (the number of rows is limited).
- */
-private[window] class ArrayRowBuffer(buffer: ArrayBuffer[UnsafeRow]) 
extends RowBuffer {
-
-  private[this] var cursor: Int = -1
-
-  /** Number of rows. */
-  override def size: Int = buffer.length
-
-  /** Return next row in the buffer, null if no more left. */
-  override def next(): InternalRow = {
-cursor += 1
-if (cursor < buffer.length) {
-  buffer(cursor)
-} else {
-  null
-}
-  }
-
-  /** Skip the next `n` rows. */
-  override def skip(n: Int): Unit = {
-cursor += n
-  }
-
-  /** Return a new RowBuffer that has the same rows. */
-  override def copy(): RowBuffer = {
-new ArrayRowBuffer(buffer)
-  }
-}
-
-/**
- * An external buffer of rows based on UnsafeExternalSorter.
- */
-private[window] class ExternalRowBuffer(sorter: UnsafeExternalSorter, 
numFields: Int)
-  extends RowBuffer {
-
-  private[this] val iter: UnsafeSorterIterator = sorter.getIterator
-
-  private[this] val currentRow = new UnsafeRow(numFields)
+private[window] class RowBuffer(appendOnlyExternalArray: 
ExternalAppendOnlyUnsafeRowArray) {
--- End diff --

Removed `RowBuffer`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r101150777
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -97,6 +98,11 @@ case class SortMergeJoinExec(
 
   protected override def doExecute(): RDD[InternalRow] = {
 val numOutputRows = longMetric("numOutputRows")
+val spillThreshold =
+  sqlContext.conf.getConfString(
+"spark.sql.sortMergeJoinExec.buffer.spill.threshold",
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100899601
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala 
---
@@ -285,6 +283,9 @@ case class WindowExec(
 val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1)
 val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray
 
+var spillThreshold =
--- End diff --

changed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100821291
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there is insufficient
+ * space for it to grow.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
+  private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = 
ArrayBuffer.empty[UnsafeRow]
--- End diff --

My intention behind not using raw `Array` is to avoid holding that memory 
(if we go this route, one would have to set the spill threshold to a relatively 
lower value to avoid potential wastage of memory).

Before this PR:
- Initial size:
   - `SortMergeJoin` started off with`ArrayBuffer` of default size (ie. 16)
   - `WindowExec` started off with empty `ArrayBuffer`
 - For both the cases, there was no shrinking of the array so memory is not 
reclaimed until the operator finishes.

Proposed change:
- I am switching to `new ArrayBuffer(128)` for both cases in order to init 
with decent size and not start with an empty array. Allocating space for 128 
entries upfront is trivial memory footprint.
- Keeping the "no shrinking" behavior same. A part of me thinks I could do 
something smarter by shrinking based on running average of actual lengths of 
the array, but it might be over-optimization. I will first focus on getting the 
basic stuff in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100818119
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there is insufficient
+ * space for it to grow.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
+  private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = 
ArrayBuffer.empty[UnsafeRow]
+
+  private var spillableArray: UnsafeExternalSorter = null
+  private var numElements = 0
+
+  // A counter to keep track of total additions made to this array since 
its creation.
+  // This helps to invalidate iterators when there are changes done to the 
backing array.
+  private var modCount: Long = 0
--- End diff --

No. There is more than that. Apart from `clear()`, even `add()` would be 
regarded as a modification.

The inbuilt iterators (from both `ArrayBuffer` and `UnsafeExternalSorter`) 
once created do not see any new data added. Clients of 
`ExternalAppendOnlyUnsafeRowArray ` may not realise this and think that they 
have read all the data. To prevent that, I have to have such mechanism to 
invalidate existing iterators.

```
val buffer = ArrayBuffer.empty[Int]
buffer.append(1)

val iterator = buffer.iterator
assert(iterator.hasNext)
assert(iterator.next() == 1)

buffer.append(2)
assert(iterator.hasNext)   // <-- THIS FAILS
```

Also, when `add()`  transparently switches the backing storage from 
`ArrayBuffer` => `UnsafeExternalSorter` and there was an open iterator created 
over the `ArrayBuffer`, it will lead to `IndexOutOfBoundsException`

```
val buffer = ArrayBuffer.empty[Int]
buffer.append(1)
buffer.append(2)

val iterator = buffer.iterator
buffer.clear()
assert(iterator.hasNext)   // <-- THIS FAILS
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-13 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100880685
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there is insufficient
+ * space for it to grow.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
+  private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = 
ArrayBuffer.empty[UnsafeRow]
+
+  private var spillableArray: UnsafeExternalSorter = null
+  private var numElements = 0
+
+  // A counter to keep track of total additions made to this array since 
its creation.
+  // This helps to invalidate iterators when there are changes done to the 
backing array.
+  private var modCount: Long = 0
+
+  private var numFieldPerRow = 0
--- End diff --

`numFieldsPerRow` for consistency ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-13 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100878983
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala 
---
@@ -285,6 +283,9 @@ case class WindowExec(
 val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1)
 val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray
 
+var spillThreshold =
--- End diff --

`var` -> `val`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-13 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100870912
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there is insufficient
+ * space for it to grow.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
+  private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = 
ArrayBuffer.empty[UnsafeRow]
+
+  private var spillableArray: UnsafeExternalSorter = null
+  private var numElements = 0
+
+  // A counter to keep track of total additions made to this array since 
its creation.
+  // This helps to invalidate iterators when there are changes done to the 
backing array.
+  private var modCount: Long = 0
+
+  private var numFieldPerRow = 0
+
+  def length: Int = numElements
+
+  def isEmpty: Boolean = numElements == 0
+
+  /**
+   * Clears up resources (eg. memory) held by the backing storage
+   */
+  def clear(): Unit = {
+if (spillableArray != null) {
+  // The last `spillableArray` of this task will be cleaned up via 
task completion listener
+  // inside `UnsafeExternalSorter`
+  spillableArray.cleanupResources()
+  spillableArray = null
+} else {
+  inMemoryBuffer.clear()
+}
+numFieldPerRow = 0
+numElements = 0
+modCount += 1
+  }
+
+  def add(entry: InternalRow): Unit = {
+val unsafeRow = entry.asInstanceOf[UnsafeRow]
+
+if (numElements < numRowsSpillThreshold) {
+  inMemoryBuffer += unsafeRow.copy()
+} else {
+  if (spillableArray == null) {
+logInfo(s"Reached spill threshold of $numRowsSpillThreshold rows, 
switching to " +
+  
s"${classOf[org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray].getName}")
--- End diff --

I understand the intention here, but the log message looks a little 
misleading to me because we are already using 
`ExternalAppendOnlyUnsafeRowArray`. Also, technically, we are switching to 
`UnsafeExternalSorter` under the hood.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-13 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100868235
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there is insufficient
+ * space for it to grow.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
+  private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = 
ArrayBuffer.empty[UnsafeRow]
+
+  private var spillableArray: UnsafeExternalSorter = null
+  private var numElements = 0
+
+  // A counter to keep track of total additions made to this array since 
its creation.
--- End diff --

According to 
[here](https://github.com/apache/spark/pull/16909/files#diff-a17b7c6b3521a3e93f958a634501b7f6R72),
 `total additions` -> `total modifications`?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-13 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100864950
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark._
+import org.apache.spark.memory.MemoryTestingUtils
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with 
LocalSparkContext {
+  private val random = new java.util.Random()
+
+  private def createSparkConf(): SparkConf = {
+val conf = new SparkConf(false)
+// Make the Java serializer write a reset instruction (TC_RESET) after 
each object to test
+// for a bug we had with bytes written past the last object in a batch 
(SPARK-2792)
+conf.set("spark.serializer.objectStreamReset", "1")
+conf.set("spark.serializer", 
"org.apache.spark.serializer.JavaSerializer")
+conf
+  }
+
+  private def withExternalArray(spillThreshold: Int)
+   (f: ExternalAppendOnlyUnsafeRowArray => 
Unit): Unit = {
+sc = new SparkContext("local", "test", createSparkConf())
+val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
+TaskContext.setTaskContext(taskContext)
+val array = new ExternalAppendOnlyUnsafeRowArray(spillThreshold)
+
+try f(array) finally {
+  array.clear()
+  sc.stop()
+}
+  }
+
+  private def insertRow(array: ExternalAppendOnlyUnsafeRowArray): Long = {
+val valueInserted = random.nextLong()
+
+val row = new UnsafeRow(1)
+row.pointTo(new Array[Byte](64), 16)
+row.setLong(0, valueInserted)
+array.add(row)
+valueInserted
+  }
+
+  private def checkIfValueExits(iterator: Iterator[UnsafeRow], 
expectedValue: Long): Unit = {
--- End diff --

Hi, @tejasapatil .
`checkIfValueExists`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100757937
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/RowBuffer.scala 
---
@@ -17,99 +17,33 @@
 
 package org.apache.spark.sql.execution.window
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
-
+import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray
 
 /**
- * The interface of row buffer for a partition. In absence of a buffer 
pool (with locking), the
+ * Represents row buffer for a partition. In absence of a buffer pool 
(with locking), the
  * row buffer is used to materialize a partition of rows since we need to 
repeatedly scan these
  * rows in window function processing.
  */
-private[window] abstract class RowBuffer {
-
-  /** Number of rows. */
-  def size: Int
-
-  /** Return next row in the buffer, null if no more left. */
-  def next(): InternalRow
-
-  /** Skip the next `n` rows. */
-  def skip(n: Int): Unit
-
-  /** Return a new RowBuffer that has the same rows. */
-  def copy(): RowBuffer
-}
-
-/**
- * A row buffer based on ArrayBuffer (the number of rows is limited).
- */
-private[window] class ArrayRowBuffer(buffer: ArrayBuffer[UnsafeRow]) 
extends RowBuffer {
-
-  private[this] var cursor: Int = -1
-
-  /** Number of rows. */
-  override def size: Int = buffer.length
-
-  /** Return next row in the buffer, null if no more left. */
-  override def next(): InternalRow = {
-cursor += 1
-if (cursor < buffer.length) {
-  buffer(cursor)
-} else {
-  null
-}
-  }
-
-  /** Skip the next `n` rows. */
-  override def skip(n: Int): Unit = {
-cursor += n
-  }
-
-  /** Return a new RowBuffer that has the same rows. */
-  override def copy(): RowBuffer = {
-new ArrayRowBuffer(buffer)
-  }
-}
-
-/**
- * An external buffer of rows based on UnsafeExternalSorter.
- */
-private[window] class ExternalRowBuffer(sorter: UnsafeExternalSorter, 
numFields: Int)
-  extends RowBuffer {
-
-  private[this] val iter: UnsafeSorterIterator = sorter.getIterator
-
-  private[this] val currentRow = new UnsafeRow(numFields)
+private[window] class RowBuffer(appendOnlyExternalArray: 
ExternalAppendOnlyUnsafeRowArray) {
--- End diff --

Lets just drop row buffer in favor of `ExternalAppendOnlyUnsafeRowArray` it 
doesn't make a lot of sense to keep this around.  We just need a 
`generateIterator(offset)` for the unbounded following case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100756809
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there is insufficient
+ * space for it to grow.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
+  private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = 
ArrayBuffer.empty[UnsafeRow]
--- End diff --

Perhaps it is better just to allocate an array, but that might be overkill.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100755730
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there is insufficient
+ * space for it to grow.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
+  private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = 
ArrayBuffer.empty[UnsafeRow]
+
+  private var spillableArray: UnsafeExternalSorter = null
+  private var numElements = 0
+
+  // A counter to keep track of total additions made to this array since 
its creation.
+  // This helps to invalidate iterators when there are changes done to the 
backing array.
+  private var modCount: Long = 0
+
+  private var numFieldPerRow = 0
+
+  def length: Int = numElements
+
+  def isEmpty: Boolean = numElements == 0
+
+  /**
+   * Clears up resources (eg. memory) held by the backing storage
+   */
+  def clear(): Unit = {
+if (spillableArray != null) {
+  // The last `spillableArray` of this task will be cleaned up via 
task completion listener
+  // inside `UnsafeExternalSorter`
+  spillableArray.cleanupResources()
+  spillableArray = null
+} else {
+  inMemoryBuffer.clear()
+}
+numFieldPerRow = 0
+numElements = 0
+modCount += 1
+  }
+
+  def add(entry: InternalRow): Unit = {
+val unsafeRow = entry.asInstanceOf[UnsafeRow]
+
+if (numElements < numRowsSpillThreshold) {
+  inMemoryBuffer += unsafeRow.copy()
+} else {
+  if (spillableArray == null) {
+logInfo(s"Reached spill threshold of $numRowsSpillThreshold rows, 
switching to " +
+  
s"${classOf[org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray].getName}")
+
+// We will not sort the rows, so prefixComparator and 
recordComparator are null
+spillableArray = UnsafeExternalSorter.create(
+  TaskContext.get().taskMemoryManager(),
+  SparkEnv.get.blockManager,
+  SparkEnv.get.serializerManager,
+  TaskContext.get(),
+  null,
+  null,
+  if (numRowsSpillThreshold > 2) numRowsSpillThreshold / 2 else 1,
+  SparkEnv.get.memoryManager.pageSizeBytes,
+  numRowsSpillThreshold,
+  false)
+
+inMemoryBuffer.foreach(existingUnsafeRow =>
+  spillableArray.insertRecord(
+existingUnsafeRow.getBaseObject,
+existingUnsafeRow.getBaseOffset,
+existingUnsafeRow.getSizeInBytes,
+0,
+false)
+)
+inMemoryBuffer.clear()
+numFieldPerRow = unsafeRow.numFields()
+  }
+
+  spillableArray.insertRecord(
+unsafeRow.getBaseO

[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100755136
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there is insufficient
+ * space for it to grow.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
+  private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = 
ArrayBuffer.empty[UnsafeRow]
+
+  private var spillableArray: UnsafeExternalSorter = null
+  private var numElements = 0
+
+  // A counter to keep track of total additions made to this array since 
its creation.
+  // This helps to invalidate iterators when there are changes done to the 
backing array.
+  private var modCount: Long = 0
+
+  private var numFieldPerRow = 0
+
+  def length: Int = numElements
+
+  def isEmpty: Boolean = numElements == 0
+
+  /**
+   * Clears up resources (eg. memory) held by the backing storage
+   */
+  def clear(): Unit = {
+if (spillableArray != null) {
+  // The last `spillableArray` of this task will be cleaned up via 
task completion listener
+  // inside `UnsafeExternalSorter`
+  spillableArray.cleanupResources()
+  spillableArray = null
+} else {
+  inMemoryBuffer.clear()
+}
+numFieldPerRow = 0
+numElements = 0
+modCount += 1
+  }
+
+  def add(entry: InternalRow): Unit = {
+val unsafeRow = entry.asInstanceOf[UnsafeRow]
--- End diff --

This seems tricky. Lets move this cast to the call site (and preferably 
avoid it).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100756696
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there is insufficient
+ * space for it to grow.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
+  private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = 
ArrayBuffer.empty[UnsafeRow]
+
+  private var spillableArray: UnsafeExternalSorter = null
+  private var numElements = 0
+
+  // A counter to keep track of total additions made to this array since 
its creation.
+  // This helps to invalidate iterators when there are changes done to the 
backing array.
+  private var modCount: Long = 0
--- End diff --

This is only an issue after we have cleared the buffer right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100756976
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -97,6 +98,11 @@ case class SortMergeJoinExec(
 
   protected override def doExecute(): RDD[InternalRow] = {
 val numOutputRows = longMetric("numOutputRows")
+val spillThreshold =
+  sqlContext.conf.getConfString(
+"spark.sql.sortMergeJoinExec.buffer.spill.threshold",
--- End diff --

Lets also move this configuration in `SQLConf` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100757130
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -376,8 +386,15 @@ case class SortMergeJoinExec(
 
 // A list to hold all matched rows from right side.
 val matches = ctx.freshName("matches")
-val clsName = classOf[java.util.ArrayList[InternalRow]].getName
-ctx.addMutableState(clsName, matches, s"$matches = new $clsName();")
+val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName
+
+val spillThreshold =
--- End diff --

Place it in a def/lazy cal if we need it more than once?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100756129
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala 
---
@@ -285,6 +283,9 @@ case class WindowExec(
 val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1)
 val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray
 
+var spillThreshold =
+  
sqlContext.conf.getConfString("spark.sql.windowExec.buffer.spill.threshold", 
"4096").toInt
--- End diff --

Please make an internal configuration in `SQLConf` for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r100756047
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala 
---
@@ -310,10 +311,15 @@ case class WindowExec(
 fetchNextRow()
 
 // Manage the current partition.
-val rows = ArrayBuffer.empty[UnsafeRow]
 val inputFields = child.output.length
-var sorter: UnsafeExternalSorter = null
 var rowBuffer: RowBuffer = null
+if (sqlContext == null) {
--- End diff --

???


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org