[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-22 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r152503817
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -140,6 +140,13 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val COLUMN_VECTOR_OFFHEAP_ENABLED =
+buildConf("spark.sql.columnVector.offheap.enable")
--- End diff --

Thanks, submitted [a PR](https://github.com/apache/spark/pull/19794)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-22 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r152493744
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -140,6 +140,13 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val COLUMN_VECTOR_OFFHEAP_ENABLED =
+buildConf("spark.sql.columnVector.offheap.enable")
--- End diff --

OK


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r152192733
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -140,6 +140,13 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val COLUMN_VECTOR_OFFHEAP_ENABLED =
+buildConf("spark.sql.columnVector.offheap.enable")
--- End diff --

ah good catch! @kiszk can you send a minor PR to fix it? thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-20 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r152189670
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -140,6 +140,13 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val COLUMN_VECTOR_OFFHEAP_ENABLED =
+buildConf("spark.sql.columnVector.offheap.enable")
--- End diff --

Sorry for the late comment but this should be 
`spark.sql.columnVector.offheap.enabled` instead of 
`spark.sql.columnVector.offheap.enable`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17436


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-17 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151828392
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -140,6 +140,13 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val COLUMN_VECTOR_OFFHEAP_ENABLED =
+buildConf("spark.sql.columnVector.offheap.enable")
+  .internal()
+  .doc("When true, use OffHeapColumnVector in ColumnarBatch.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

Sorry, this was test code in my local environment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151765263
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
 ---
@@ -260,6 +261,7 @@ object ParquetReadBenchmark {
   def stringWithNullsScanBenchmark(values: Int, fractionOfNulls: Double): 
Unit = {
 withTempPath { dir =>
   withTempTable("t1", "tempTable") {
+val enableOffHeapColumnVector = 
spark.sqlContext.conf.offHeapColumnVectorEnabled
--- End diff --

nit: spark.sessionState.conf.offHeapColumnVectorEnabled


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151764870
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -62,7 +69,11 @@ case class InMemoryTableScanExec(
 
   private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): 
ColumnarBatch = {
 val rowCount = cachedColumnarBatch.numRows
-val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, 
columnarBatchSchema)
+val columnVectors = if (!conf.offHeapColumnVectorEnabled) {
--- End diff --

only enable it when `TaskContext.get != null`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151764498
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 ---
@@ -101,9 +101,13 @@
   private boolean returnColumnarBatch;
 
   /**
-   * The default config on whether columnarBatch should be offheap.
+   * The config on whether columnarBatch should be offheap.
--- End diff --

nit: the memory mode of the columnarBatch


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151764317
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -140,6 +140,13 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val COLUMN_VECTOR_OFFHEAP_ENABLED =
+buildConf("spark.sql.columnVector.offheap.enable")
+  .internal()
+  .doc("When true, use OffHeapColumnVector in ColumnarBatch.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

hey let's not change the existing behavior.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-17 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151725539
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
 ---
@@ -75,9 +77,14 @@ class VectorizedHashMapGenerator(
   }
 }.mkString("\n").concat(";")
 
+val columnVector = if (!conf.offHeapColumnVectorEnabled) {
+  "OnHeapColumnVector"
+} else {
+  "OffHeapColumnVector"
+}
 s"""
-   |  private 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] batchVectors;
-   |  private 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] bufferVectors;
--- End diff --

I see. `"spark.sql.codegen.aggregate.map.vectorized.enable" = true` only in 
benchmark or test cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-16 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151479595
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -80,6 +80,22 @@ package object config {
 .bytesConf(ByteUnit.MiB)
 .createWithDefaultString("1g")
 
+  private[spark] val MEMORY_OFFHEAP_ENABLED = 
ConfigBuilder("spark.memory.offHeap.enabled")
+.doc("If true, Spark will attempt to use off-heap memory for certain 
operations. " +
+  "If off-heap memory use is enabled, then spark.memory.offHeap.size 
must be positive.")
+.withAlternative("spark.unsafe.offHeap")
--- End diff --

Yes, it is necessary.  
Without this, we see [this 
failure](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83933/testReport/)
 
`org.apache.spark.memory.TaskMemoryManagerSuite.offHeapConfigurationBackwardsCompatibility`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151473749
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
 ---
@@ -75,6 +75,7 @@ object ParquetReadBenchmark {
 
 withTempPath { dir =>
   withTempTable("t1", "tempTable") {
+val enableOffHeapColumnVector = 
spark.sqlContext.conf.offHeapColumnVectorEnabled
--- End diff --

`spark.sessionState.conf.xxx`,  `sqlContext` is deprecated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151473518
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -364,8 +372,10 @@ class ParquetFileFormat
   if (pushed.isDefined) {
 
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
   }
+  val taskContext = Option(TaskContext.get())
   val parquetReader = if (enableVectorizedReader) {
-val vectorizedReader = new VectorizedParquetRecordReader()
+val vectorizedReader =
+  new VectorizedParquetRecordReader(enableOffHeapColumnVector)
--- End diff --

only enable it when taskContext exist?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151473122
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
 ---
@@ -75,9 +77,14 @@ class VectorizedHashMapGenerator(
   }
 }.mkString("\n").concat(";")
 
+val columnVector = if (!conf.offHeapColumnVectorEnabled) {
+  "OnHeapColumnVector"
+} else {
+  "OffHeapColumnVector"
+}
 s"""
-   |  private 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] batchVectors;
-   |  private 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] bufferVectors;
--- End diff --

Let's not bother about here too, this class is only used in benchmark AFAIK.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151472844
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 ---
@@ -103,7 +103,11 @@
   /**
* The default config on whether columnarBatch should be offheap.
*/
-  private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
+  private final MemoryMode DEFAULT_MEMORY_MODE;
--- End diff --

do we still need to call it DEFAULT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151472648
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -140,6 +140,13 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val COLUMNVECTOR_OFFHEAP_ENABLED =
--- End diff --

nit `COLUMN_VECTOR`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151472401
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -80,6 +80,22 @@ package object config {
 .bytesConf(ByteUnit.MiB)
 .createWithDefaultString("1g")
 
+  private[spark] val MEMORY_OFFHEAP_ENABLED = 
ConfigBuilder("spark.memory.offHeap.enabled")
+.doc("If true, Spark will attempt to use off-heap memory for certain 
operations. " +
+  "If off-heap memory use is enabled, then spark.memory.offHeap.size 
must be positive.")
+.withAlternative("spark.unsafe.offHeap")
--- End diff --

do we still need this as we already did it in 
https://github.com/apache/spark/pull/17436/files#diff-529fc5c06b9731c1fbda6f3db60b16aaR658
 ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-16 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151352379
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -60,9 +62,15 @@ case class InMemoryTableScanExec(
 
   private lazy val columnarBatchSchema = new 
StructType(columnIndices.map(i => relationSchema(i)))
 
+  private lazy val memoryMode = 
SparkEnv.get.memoryManager.tungstenMemoryMode
+
   private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): 
ColumnarBatch = {
 val rowCount = cachedColumnarBatch.numRows
-val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, 
columnarBatchSchema)
+val columnVectors = if (memoryMode == MemoryMode.ON_HEAP) {
+  OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
+} else {
+  OffHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
--- End diff --

For now, I added `context.addTaskCompletionListener`, but I did not add 
`batch.close` in generated code.
This is because ParquetReader reuses a `ColumnarBatch. Thus, to call 
`batch.close` causes runtime exception during the reuse.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-15 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151340713
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
 ---
@@ -75,9 +77,14 @@ class VectorizedHashMapGenerator(
   }
 }.mkString("\n").concat(";")
 
+val columnVector = if (SparkEnv.get.memoryManager.tungstenMemoryMode 
== MemoryMode.ON_HEAP) {
+  "OnHeapColumnVector"
+} else {
+  "OffHeapColumnVector"
+}
 s"""
-   |  private 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] batchVectors;
-   |  private 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] bufferVectors;
+   |  private 
org.apache.spark.sql.execution.vectorized.WritableColumnVector[] batchVectors;
+   |  private 
org.apache.spark.sql.execution.vectorized.WritableColumnVector[] bufferVectors;
--- End diff --

Good catch, thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-15 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151333181
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
 ---
@@ -75,9 +77,14 @@ class VectorizedHashMapGenerator(
   }
 }.mkString("\n").concat(";")
 
+val columnVector = if (SparkEnv.get.memoryManager.tungstenMemoryMode 
== MemoryMode.ON_HEAP) {
+  "OnHeapColumnVector"
+} else {
+  "OffHeapColumnVector"
+}
 s"""
-   |  private 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] batchVectors;
-   |  private 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] bufferVectors;
+   |  private 
org.apache.spark.sql.execution.vectorized.WritableColumnVector[] batchVectors;
+   |  private 
org.apache.spark.sql.execution.vectorized.WritableColumnVector[] bufferVectors;
--- End diff --

We can use `$columnVector` instead of `WritableColumnVector`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-14 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r151037948
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
 ---
@@ -40,7 +42,7 @@
  */
 public class AggregateHashMap {
 
-  private OnHeapColumnVector[] columnVectors;
+  private WritableColumnVector[] columnVectors;
--- End diff --

Thanks, I realized it this morning. I will revert changes in 
`AggregateHashMap`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r150929464
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
 ---
@@ -40,7 +42,7 @@
  */
 public class AggregateHashMap {
 
-  private OnHeapColumnVector[] columnVectors;
+  private WritableColumnVector[] columnVectors;
--- End diff --

I mean the `AggregateHashMap` seems only used in the benchmark.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-14 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r150889421
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---
@@ -40,10 +42,12 @@ class UnifiedMemoryManagerSuite extends 
MemoryManagerSuite with PrivateMethodTes
   override protected def createMemoryManager(
   maxOnHeapExecutionMemory: Long,
   maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = {
+val offHeapSize = sc.get(MEMORY_OFFHEAP_SIZE)
--- End diff --

You are right.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-14 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r150889105
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---
@@ -43,12 +44,14 @@ class StaticMemoryManagerSuite extends 
MemoryManagerSuite {
 
   override protected def createMemoryManager(
   maxOnHeapExecutionMemory: Long,
-  maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
+  maxOffHeapExecutionMemory: Long = 1000): StaticMemoryManager = {
+val offHeapSize = conf.get(MEMORY_OFFHEAP_SIZE)
--- End diff --

Yeah, I made a mistake.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-14 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r150859426
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -60,9 +62,15 @@ case class InMemoryTableScanExec(
 
   private lazy val columnarBatchSchema = new 
StructType(columnIndices.map(i => relationSchema(i)))
 
+  private lazy val memoryMode = 
SparkEnv.get.memoryManager.tungstenMemoryMode
+
   private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): 
ColumnarBatch = {
 val rowCount = cachedColumnarBatch.numRows
-val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, 
columnarBatchSchema)
+val columnVectors = if (memoryMode == MemoryMode.ON_HEAP) {
+  OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
+} else {
+  OffHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
--- End diff --

Great catch, we need to use `context.addTaskCompletionListener` for close 
this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-14 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r150852190
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
 ---
@@ -40,7 +42,7 @@
  */
 public class AggregateHashMap {
 
-  private OnHeapColumnVector[] columnVectors;
+  private WritableColumnVector[] columnVectors;
--- End diff --

`columnVectors` is actually referenced several places (e.g. 
[here](https://github.com/apache/spark/pull/17436/files/9ce6fc0b0ad2c4c97236f0519db07b5a3600bb81#diff-47d4756845a7561110301320621d7f3cR72))
 in this class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r150848938
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -60,9 +62,15 @@ case class InMemoryTableScanExec(
 
   private lazy val columnarBatchSchema = new 
StructType(columnIndices.map(i => relationSchema(i)))
 
+  private lazy val memoryMode = 
SparkEnv.get.memoryManager.tungstenMemoryMode
+
   private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): 
ColumnarBatch = {
 val rowCount = cachedColumnarBatch.numRows
-val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, 
columnarBatchSchema)
+val columnVectors = if (memoryMode == MemoryMode.ON_HEAP) {
+  OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
+} else {
+  OffHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
--- End diff --

where do we close it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r150847402
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
 ---
@@ -40,7 +42,7 @@
  */
 public class AggregateHashMap {
 
-  private OnHeapColumnVector[] columnVectors;
+  private WritableColumnVector[] columnVectors;
--- End diff --

do we need to change it? I don't think this class is actually used.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r150846694
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -351,11 +351,12 @@ class ParquetFileFormat
   if (pushed.isDefined) {
 
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
   }
+  val taskContext = Option(TaskContext.get())
--- End diff --

Why can't we make this assumption?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r150845551
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---
@@ -40,10 +42,12 @@ class UnifiedMemoryManagerSuite extends 
MemoryManagerSuite with PrivateMethodTes
   override protected def createMemoryManager(
   maxOnHeapExecutionMemory: Long,
   maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = {
+val offHeapSize = sc.get(MEMORY_OFFHEAP_SIZE)
--- End diff --

I think we always get default value here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r150845467
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---
@@ -43,12 +44,14 @@ class StaticMemoryManagerSuite extends 
MemoryManagerSuite {
 
   override protected def createMemoryManager(
   maxOnHeapExecutionMemory: Long,
-  maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
+  maxOffHeapExecutionMemory: Long = 1000): StaticMemoryManager = {
+val offHeapSize = conf.get(MEMORY_OFFHEAP_SIZE)
--- End diff --

Why read this conf? we always get default value here, don't we?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r150840518
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---
@@ -43,12 +43,13 @@ class StaticMemoryManagerSuite extends 
MemoryManagerSuite {
 
   override protected def createMemoryManager(
   maxOnHeapExecutionMemory: Long,
-  maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
+  maxOffHeapExecutionMemory: Long = 1000): StaticMemoryManager = {
 new StaticMemoryManager(
   conf.clone
 .set("spark.memory.fraction", "1")
 .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
-.set("spark.memory.offHeap.size", 
maxOffHeapExecutionMemory.toString),
+.set("spark.memory.offHeap.size",
--- End diff --

we can use `config.MEMORY_OFFHEAP_SIZE.key` here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r150840015
  
--- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
---
@@ -195,7 +196,7 @@ private[spark] abstract class MemoryManager(
*/
   final val tungstenMemoryMode: MemoryMode = {
 if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
--- End diff --

we can move this to the `object config` too


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r150839810
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -80,6 +80,11 @@ package object config {
 .bytesConf(ByteUnit.MiB)
 .createWithDefaultString("1g")
 
+  private[spark] val MEMORY_OFFHEAP_SIZE = 
ConfigBuilder("spark.memory.offHeap.size")
+.bytesConf(ByteUnit.BYTE)
--- End diff --

document please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r149418567
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---
@@ -43,12 +43,13 @@ class StaticMemoryManagerSuite extends 
MemoryManagerSuite {
 
   override protected def createMemoryManager(
   maxOnHeapExecutionMemory: Long,
-  maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
+  maxOffHeapExecutionMemory: Long = 1000): StaticMemoryManager = {
 new StaticMemoryManager(
   conf.clone
 .set("spark.memory.fraction", "1")
 .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
-.set("spark.memory.offHeap.size", 
maxOffHeapExecutionMemory.toString),
+.set("spark.memory.offHeap.size",
--- End diff --

To add `= 1000` is intentional and necessary.  
If the original line is used, `spark.memory.offHeap.size` is zero even in 
the case `spark.memory.offHeap.enabled == true`. This is because the caller 
sites do not pass the second argument and then the default value `(= 0)` is 
used.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r149415162
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---
@@ -43,12 +43,13 @@ class StaticMemoryManagerSuite extends 
MemoryManagerSuite {
 
   override protected def createMemoryManager(
   maxOnHeapExecutionMemory: Long,
-  maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
+  maxOffHeapExecutionMemory: Long = 1000): StaticMemoryManager = {
 new StaticMemoryManager(
   conf.clone
 .set("spark.memory.fraction", "1")
 .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
-.set("spark.memory.offHeap.size", 
maxOffHeapExecutionMemory.toString),
+.set("spark.memory.offHeap.size",
--- End diff --

I see. I will move `"spark.memory.offHeap.size"` to 
`org.apache.spark.internal`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-07 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r149382213
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---
@@ -43,12 +43,13 @@ class StaticMemoryManagerSuite extends 
MemoryManagerSuite {
 
   override protected def createMemoryManager(
   maxOnHeapExecutionMemory: Long,
-  maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
+  maxOffHeapExecutionMemory: Long = 1000): StaticMemoryManager = {
--- End diff --

Please update this line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-11-07 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r149382131
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---
@@ -43,12 +43,13 @@ class StaticMemoryManagerSuite extends 
MemoryManagerSuite {
 
   override protected def createMemoryManager(
   maxOnHeapExecutionMemory: Long,
-  maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
+  maxOffHeapExecutionMemory: Long = 1000): StaticMemoryManager = {
 new StaticMemoryManager(
   conf.clone
 .set("spark.memory.fraction", "1")
 .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
-.set("spark.memory.offHeap.size", 
maxOffHeapExecutionMemory.toString),
+.set("spark.memory.offHeap.size",
--- End diff --

How about move the configuration to `internal/config`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-13 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111534878
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---
@@ -43,12 +43,13 @@ class StaticMemoryManagerSuite extends 
MemoryManagerSuite {
 
   override protected def createMemoryManager(
   maxOnHeapExecutionMemory: Long,
-  maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
+  maxOffHeapExecutionMemory: Long = 1000): StaticMemoryManager = {
--- End diff --

Yeah, I will do that at `MemoryManagerSuite`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-13 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111534779
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -351,11 +351,12 @@ class ParquetFileFormat
   if (pushed.isDefined) {
 
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
   }
+  val taskContext = Option(TaskContext.get())
--- End diff --

Yes, you are right. 

We have to ensure `close()` is closed at 
[here](https://github.com/apache/spark/pull/17436/files#diff-ee26d4c4be21e92e92a02e9f16dbc285R380)
 to use `OffHeapColumnVector`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111530070
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -351,11 +351,12 @@ class ParquetFileFormat
   if (pushed.isDefined) {
 
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
   }
+  val taskContext = Option(TaskContext.get())
--- End diff --

If we require the `TaskContext` must exist here, then we can remove the 
`forceUseOnHeap` parameter in `initBatch`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-13 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111528495
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 ---
@@ -197,11 +200,17 @@ public void initBatch(MemoryMode memMode, StructType 
partitionColumns,
   }
 
   public void initBatch() {
-initBatch(DEFAULT_MEMORY_MODE, null, null);
+assert(memoryMode != null);
+initBatch(memoryMode,null, null);
--- End diff --

[Here 
](https://github.com/apache/spark/pull/17436/files/e8c598097573ff469eddf2b2a8f273b6ea33fc86#diff-ee26d4c4be21e92e92a02e9f16dbc285R359)
 is a place to pass the 3rd argument.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111522359
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 ---
@@ -197,11 +200,17 @@ public void initBatch(MemoryMode memMode, StructType 
partitionColumns,
   }
 
   public void initBatch() {
-initBatch(DEFAULT_MEMORY_MODE, null, null);
+assert(memoryMode != null);
+initBatch(memoryMode,null, null);
--- End diff --

where do we call `initBatch` like this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111522248
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---
@@ -43,12 +43,13 @@ class StaticMemoryManagerSuite extends 
MemoryManagerSuite {
 
   override protected def createMemoryManager(
   maxOnHeapExecutionMemory: Long,
-  maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
+  maxOffHeapExecutionMemory: Long = 1000): StaticMemoryManager = {
--- End diff --

can't we do this at parent class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-13 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111421994
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 ---
@@ -197,11 +200,17 @@ public void initBatch(MemoryMode memMode, StructType 
partitionColumns,
   }
 
   public void initBatch() {
-initBatch(DEFAULT_MEMORY_MODE, null, null);
+assert(memoryMode != null);
+initBatch(memoryMode,null, null);
--- End diff --

If `initBatch()` is called after calling `initBatch(..., ..., true), we 
have to use `MemoryMode.ON_HEAP` instead of 
`SparkEnv.get().memoryManager().tungstenMemoryMode()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-13 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111421428
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---
@@ -48,7 +48,10 @@ class StaticMemoryManagerSuite extends 
MemoryManagerSuite {
   conf.clone
 .set("spark.memory.fraction", "1")
 .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
-.set("spark.memory.offHeap.size", 
maxOffHeapExecutionMemory.toString),
+.set("spark.memory.offHeap.size",
+  if (maxOffHeapExecutionMemory != 0L) { 
maxOffHeapExecutionMemory.toString } else {
+conf.get("spark.memory.offHeap.size", 
maxOffHeapExecutionMemory.toString)
--- End diff --

Sounds good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-13 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111420510
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -351,11 +351,12 @@ class ParquetFileFormat
   if (pushed.isDefined) {
 
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
   }
+  val taskContext = Option(TaskContext.get())
   val parquetReader = if (enableVectorizedReader) {
 val vectorizedReader = new VectorizedParquetRecordReader()
 vectorizedReader.initialize(split, hadoopAttemptContext)
 logDebug(s"Appending $partitionSchema ${file.partitionValues}")
-vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+vectorizedReader.initBatch(partitionSchema, file.partitionValues, 
taskContext.isDefined)
--- End diff --

Oh, you are right. Thank you very much.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111420155
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 ---
@@ -197,11 +200,17 @@ public void initBatch(MemoryMode memMode, StructType 
partitionColumns,
   }
 
   public void initBatch() {
-initBatch(DEFAULT_MEMORY_MODE, null, null);
+assert(memoryMode != null);
+initBatch(memoryMode,null, null);
--- End diff --

shall we use `SparkEnv.get().memoryManager().tungstenMemoryMode()` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111419534
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -351,11 +351,12 @@ class ParquetFileFormat
   if (pushed.isDefined) {
 
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
   }
+  val taskContext = Option(TaskContext.get())
   val parquetReader = if (enableVectorizedReader) {
 val vectorizedReader = new VectorizedParquetRecordReader()
 vectorizedReader.initialize(split, hadoopAttemptContext)
 logDebug(s"Appending $partitionSchema ${file.partitionValues}")
-vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+vectorizedReader.initBatch(partitionSchema, file.partitionValues, 
taskContext.isDefined)
--- End diff --

shouldn't it be `taskContext.isEmpty`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-13 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111411947
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 ---
@@ -90,6 +91,8 @@
*/
   private ColumnarBatch columnarBatch;
 
+  private MemoryMode memoryMode = null;
--- End diff --

Sure, done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111397563
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 ---
@@ -90,6 +91,8 @@
*/
   private ColumnarBatch columnarBatch;
 
+  private MemoryMode memoryMode = null;
--- End diff --

if it's only about test suite, can't we just change tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-13 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111372491
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 ---
@@ -90,6 +91,8 @@
*/
   private ColumnarBatch columnarBatch;
 
+  private MemoryMode memoryMode = null;
--- End diff --

I agree. IIUC, only [these three test 
suites](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala#L37)
 requires case 1 without calling `PinitBatch(partitionColumns, partitionValues, 
isEnabledOffHeap)`.
If we solve them, we can remove `(memoryMode != null) ?` at line 203.

Case 2 is still required since line 213 calls `initBatch()` after calling 
`initBatch(partitionColumns, partitionValues, isEnabledOffHeap)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111366006
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 ---
@@ -90,6 +91,8 @@
*/
   private ColumnarBatch columnarBatch;
 
+  private MemoryMode memoryMode = null;
--- End diff --

This semantic is a bit weird, where do we require this semantic?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-12 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r31172
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 ---
@@ -90,6 +91,8 @@
*/
   private ColumnarBatch columnarBatch;
 
+  private MemoryMode memoryMode = null;
--- End diff --

This is because `initBatch()` has to distinguish two cases. 
1. `initBatch()` is called without calling `initBatch(partitionColumns, 
partitionValues, isEnabledOffHeap)`
2. `initBatch()` is called after `initBatch(partitionColumns, 
partitionValues, isEnabledOffHeap)` was called.

In the case 2., we have to keep the memory mode that has been set by 
`initBatch(partitionColumns, partitionValues, isEnabledOffHeap)`.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r01490
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---
@@ -48,7 +48,10 @@ class StaticMemoryManagerSuite extends 
MemoryManagerSuite {
   conf.clone
 .set("spark.memory.fraction", "1")
 .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
-.set("spark.memory.offHeap.size", 
maxOffHeapExecutionMemory.toString),
+.set("spark.memory.offHeap.size",
+  if (maxOffHeapExecutionMemory != 0L) { 
maxOffHeapExecutionMemory.toString } else {
+conf.get("spark.memory.offHeap.size", 
maxOffHeapExecutionMemory.toString)
--- End diff --

can we just change the default value of `maxOffHeapExecutionMemory` from 0 
to something like 1000?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-12 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r00556
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---
@@ -48,7 +48,10 @@ class StaticMemoryManagerSuite extends 
MemoryManagerSuite {
   conf.clone
 .set("spark.memory.fraction", "1")
 .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
-.set("spark.memory.offHeap.size", 
maxOffHeapExecutionMemory.toString),
+.set("spark.memory.offHeap.size",
+  if (maxOffHeapExecutionMemory != 0L) { 
maxOffHeapExecutionMemory.toString } else {
+conf.get("spark.memory.offHeap.size", 
maxOffHeapExecutionMemory.toString)
--- End diff --

If `maxOffHeapExecutionMemory` is not explicitly specified like [this 
code](https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala#L162)),
 we want to use the value in `"spark.memory.offHeap.size"` when 
`spark.memory.offHeap.enabled` is `true`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-12 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111098912
  
--- Diff: 
core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala ---
@@ -210,7 +210,7 @@ object UnifiedMemoryManager {
   private def getMaxMemory(conf: SparkConf): Long = {
 val systemMemory = conf.getLong("spark.testing.memory", 
Runtime.getRuntime.maxMemory)
 val reservedMemory = conf.getLong("spark.testing.reservedMemory",
-  if (conf.contains("spark.testing")) 0 else 
RESERVED_SYSTEM_MEMORY_BYTES)
+  if (conf.contains("spark.testing") || true) 0 else 
RESERVED_SYSTEM_MEMORY_BYTES)
--- End diff --

Sorry, it is my mistake.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-12 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111095838
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -351,11 +351,12 @@ class ParquetFileFormat
   if (pushed.isDefined) {
 
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
   }
+  val taskContext = Option(TaskContext.get())
   val parquetReader = if (enableVectorizedReader) {
 val vectorizedReader = new VectorizedParquetRecordReader()
 vectorizedReader.initialize(split, hadoopAttemptContext)
 logDebug(s"Appending $partitionSchema ${file.partitionValues}")
-vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+vectorizedReader.initBatch(partitionSchema, file.partitionValues, 
taskContext.isDefined)
--- End diff --

When `taskContext.isDefined` is false, we force to use on-heap.
This is because we cannot register `iter.close` at 
[here](https://github.com/apache/spark/pull/17436/files#diff-ee26d4c4be21e92e92a02e9f16dbc285R380)
 to release off-heap.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111094256
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 ---
@@ -90,6 +91,8 @@
*/
   private ColumnarBatch columnarBatch;
 
+  private MemoryMode memoryMode = null;
--- End diff --

why do we need to record the memory mode?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r111062746
  
--- Diff: 
core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala ---
@@ -210,7 +210,7 @@ object UnifiedMemoryManager {
   private def getMaxMemory(conf: SparkConf): Long = {
 val systemMemory = conf.getLong("spark.testing.memory", 
Runtime.getRuntime.maxMemory)
 val reservedMemory = conf.getLong("spark.testing.reservedMemory",
-  if (conf.contains("spark.testing")) 0 else 
RESERVED_SYSTEM_MEMORY_BYTES)
+  if (conf.contains("spark.testing") || true) 0 else 
RESERVED_SYSTEM_MEMORY_BYTES)
--- End diff --

?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-11 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r110923167
  
--- Diff: core/src/main/java/org/apache/spark/memory/MemoryConsumer.java ---
@@ -41,7 +41,7 @@ protected MemoryConsumer(TaskMemoryManager 
taskMemoryManager, long pageSize, Mem
   }
 
   protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
--- End diff --

[This test 
code](https://github.com/apache/spark/blob/master/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java#L24)
 is only the case that specifies memory mode different from 
`TaskMemoryManager.getTungstenMemoryMode()`.

To simplify the code, I have just replaced 
`MemoryConsumer(taskMemoryManager, pageSize, 
taskMemoryManager.getTungstenMemoryMode())` with 
`MemoryConsumer(taskMemoryManager, pageSize)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r110891992
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -351,11 +351,12 @@ class ParquetFileFormat
   if (pushed.isDefined) {
 
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
   }
+  val taskContext = Option(TaskContext.get())
   val parquetReader = if (enableVectorizedReader) {
 val vectorizedReader = new VectorizedParquetRecordReader()
 vectorizedReader.initialize(split, hadoopAttemptContext)
 logDebug(s"Appending $partitionSchema ${file.partitionValues}")
-vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+vectorizedReader.initBatch(partitionSchema, file.partitionValues, 
taskContext.isDefined)
--- End diff --

`taskContext.isDefined` means enable off heap?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r110889293
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---
@@ -48,7 +48,10 @@ class StaticMemoryManagerSuite extends 
MemoryManagerSuite {
   conf.clone
 .set("spark.memory.fraction", "1")
 .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
-.set("spark.memory.offHeap.size", 
maxOffHeapExecutionMemory.toString),
+.set("spark.memory.offHeap.size",
+  if (maxOffHeapExecutionMemory != 0L) { 
maxOffHeapExecutionMemory.toString } else {
+conf.get("spark.memory.offHeap.size", 
maxOffHeapExecutionMemory.toString)
--- End diff --

why this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r110838855
  
--- Diff: core/src/main/java/org/apache/spark/memory/MemoryConsumer.java ---
@@ -41,7 +41,7 @@ protected MemoryConsumer(TaskMemoryManager 
taskMemoryManager, long pageSize, Mem
   }
 
   protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
--- End diff --

so, do we still need `MemoryConsumer(TaskMemoryManager taskMemoryManager, 
long pageSize, MemoryMode mode)`? I think it can just be 
`MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-10 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r110801228
  
--- Diff: core/src/main/java/org/apache/spark/memory/MemoryConsumer.java ---
@@ -41,7 +41,7 @@ protected MemoryConsumer(TaskMemoryManager 
taskMemoryManager, long pageSize, Mem
   }
 
   protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
--- End diff --

Yes, you are right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-10 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r110714213
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -67,6 +67,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
   if (loadDefaults) {
 loadFromSystemProperties(false)
   }
+  // tentatively enable offHeap for test
+  set("spark.memory.offHeap.enabled", "true")
+  set("spark.memory.offHeap.size", "1gb")
--- End diff --

Yes, it should be removed before merging this PR.
This code is tentative added to enable offheap mode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r110576977
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -67,6 +67,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
   if (loadDefaults) {
 loadFromSystemProperties(false)
   }
+  // tentatively enable offHeap for test
+  set("spark.memory.offHeap.enabled", "true")
+  set("spark.memory.offHeap.size", "1gb")
--- End diff --

remove it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-04-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r110576873
  
--- Diff: core/src/main/java/org/apache/spark/memory/MemoryConsumer.java ---
@@ -41,7 +41,7 @@ protected MemoryConsumer(TaskMemoryManager 
taskMemoryManager, long pageSize, Mem
   }
 
   protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
--- End diff --

yea, but this PR fixed it, so after this PR, the memory mode is always same 
with `TaskMemoryManager`'s memory mode, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-03-30 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r108994791
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -430,12 +430,15 @@ public int numValidRows() {
   /**
* Sets (replaces) the column at `ordinal` with column. This can be used 
to do very efficient
* projections.
+   * Note: Caller must ensure that ColumnVector in column[ordinal] is not 
used in other places
*/
   public void setColumn(int ordinal, ColumnVector column) {
-if (column instanceof OffHeapColumnVector) {
-  throw new UnsupportedOperationException("Need to ref count 
columns.");
-}
+ColumnVector oldColumn = columns[ordinal];
 columns[ordinal] = column;
+if (oldColumn instanceof OffHeapColumnVector) {
--- End diff --

Sure, done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-03-30 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r108989432
  
--- Diff: core/src/main/java/org/apache/spark/memory/MemoryConsumer.java ---
@@ -41,7 +41,7 @@ protected MemoryConsumer(TaskMemoryManager 
taskMemoryManager, long pageSize, Mem
   }
 
   protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
--- End diff --

[This 
constructor](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala#L29)
 calls this constructor that specified `ON_HEAP regardless 
`TaskMemoryManager`'s memory mode. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-03-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r108947566
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -430,12 +430,15 @@ public int numValidRows() {
   /**
* Sets (replaces) the column at `ordinal` with column. This can be used 
to do very efficient
* projections.
+   * Note: Caller must ensure that ColumnVector in column[ordinal] is not 
used in other places
*/
   public void setColumn(int ordinal, ColumnVector column) {
-if (column instanceof OffHeapColumnVector) {
-  throw new UnsupportedOperationException("Need to ref count 
columns.");
-}
+ColumnVector oldColumn = columns[ordinal];
 columns[ordinal] = column;
+if (oldColumn instanceof OffHeapColumnVector) {
--- End diff --

we can call `oldColumn.close()` directly, as `OnHeapColumnVector.close` is 
no-op


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-03-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17436#discussion_r108946488
  
--- Diff: core/src/main/java/org/apache/spark/memory/MemoryConsumer.java ---
@@ -41,7 +41,7 @@ protected MemoryConsumer(TaskMemoryManager 
taskMemoryManager, long pageSize, Mem
   }
 
   protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
--- End diff --

When will we construct a `MemoryConsumer` while the memory mode is 
different from `TaskMemoryManager`'s memory mode?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...

2017-03-26 Thread kiszk
GitHub user kiszk opened a pull request:

https://github.com/apache/spark/pull/17436

[SPARK-20101][SQL] Use OffHeapColumnVector when 
"spark.memory.offHeap.enabled" is set to "true"

## What changes were proposed in this pull request?

This PR enables to use ``OffHeapColumnVector`` when 
``spark.memory.offHeap.enable`` is set to ``true``. While ``ColumnVector`` has 
two implementations ``OnHeapColumnVector`` and ``OffHeapColumnVector``, only 
``OnHeapColumnVector`` is always used.

This PR implements the followings
- Pass ``OFF_HEAP`` to ``ColumnarBatch.allocate()`` when 
``spark.memory.offHeap.enable`` is set to ``true``
- Free all of off-heap memory regions by ``OffHeapColumnVector.close()``
- Ensure to call ``OffHeapColumnVector.close()``

## How was this patch tested?

Use existing tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kiszk/spark SPARK-20101

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17436.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17436


commit 6cf5a8d98d65a5c354f80dd99f9990f1a8a3ad98
Author: Kazuaki Ishizaki 
Date:   2017-03-26T17:50:55Z

initial commit

commit 099a257675741aa281d15db4b3a0d1f4898632e8
Author: Kazuaki Ishizaki 
Date:   2017-03-26T17:51:51Z

tentatively set true into spark.memory.offHeap.enabled




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org