Repository: spark
Updated Branches:
  refs/heads/master 70221903f -> e3fd93f14


[SPARK-22604][SQL] remove the get address methods from ColumnVector

## What changes were proposed in this pull request?

`nullsNativeAddress` and `valuesNativeAddress` are only used in tests and 
benchmark, no need to be top class API.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenc...@databricks.com>

Closes #19818 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3fd93f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3fd93f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3fd93f1

Branch: refs/heads/master
Commit: e3fd93f149ff0ff1caff28a5191215e2a29749a9
Parents: 7022190
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Fri Nov 24 22:43:47 2017 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Fri Nov 24 22:43:47 2017 -0800

----------------------------------------------------------------------
 .../execution/vectorized/ArrowColumnVector.java | 10 ---
 .../sql/execution/vectorized/ColumnVector.java  |  7 --
 .../vectorized/OffHeapColumnVector.java         |  6 +-
 .../vectorized/OnHeapColumnVector.java          |  9 ---
 .../vectorized/ColumnarBatchBenchmark.scala     | 32 ++++----
 .../vectorized/ColumnarBatchSuite.scala         | 82 +++++++-------------
 6 files changed, 47 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e3fd93f1/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
index 949035b..3a10e98 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
@@ -60,16 +60,6 @@ public final class ArrowColumnVector extends ColumnVector {
   }
 
   @Override
-  public long nullsNativeAddress() {
-    throw new RuntimeException("Cannot get native address for arrow column");
-  }
-
-  @Override
-  public long valuesNativeAddress() {
-    throw new RuntimeException("Cannot get native address for arrow column");
-  }
-
-  @Override
   public void close() {
     if (childColumns != null) {
       for (int i = 0; i < childColumns.length; i++) {

http://git-wip-us.apache.org/repos/asf/spark/blob/e3fd93f1/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 666fd63..360ed83e 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -63,13 +63,6 @@ public abstract class ColumnVector implements AutoCloseable {
   public abstract boolean anyNullsSet();
 
   /**
-   * Returns the off heap ptr for the arrays backing the NULLs and values 
buffer. Only valid
-   * to call for off heap columns.
-   */
-  public abstract long nullsNativeAddress();
-  public abstract long valuesNativeAddress();
-
-  /**
    * Returns whether the value at rowId is NULL.
    */
   public abstract boolean isNullAt(int rowId);

http://git-wip-us.apache.org/repos/asf/spark/blob/e3fd93f1/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 2bf523b..6b5c783 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.vectorized;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 
@@ -73,12 +75,12 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
     reset();
   }
 
-  @Override
+  @VisibleForTesting
   public long valuesNativeAddress() {
     return data;
   }
 
-  @Override
+  @VisibleForTesting
   public long nullsNativeAddress() {
     return nulls;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e3fd93f1/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index d699d29..a7b103a 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -80,15 +80,6 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
   }
 
   @Override
-  public long valuesNativeAddress() {
-    throw new RuntimeException("Cannot get native address for on heap column");
-  }
-  @Override
-  public long nullsNativeAddress() {
-    throw new RuntimeException("Cannot get native address for on heap column");
-  }
-
-  @Override
   public void close() {
     super.close();
     nulls = null;

http://git-wip-us.apache.org/repos/asf/spark/blob/e3fd93f1/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
index 1331f15..705b26b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
@@ -36,15 +36,6 @@ import org.apache.spark.util.collection.BitSet
  * Benchmark to low level memory access using different ways to manage buffers.
  */
 object ColumnarBatchBenchmark {
-
-  def allocate(capacity: Int, dt: DataType, memMode: MemoryMode): 
WritableColumnVector = {
-    if (memMode == MemoryMode.OFF_HEAP) {
-      new OffHeapColumnVector(capacity, dt)
-    } else {
-      new OnHeapColumnVector(capacity, dt)
-    }
-  }
-
   // This benchmark reads and writes an array of ints.
   // TODO: there is a big (2x) penalty for a random access API for off heap.
   // Note: carefully if modifying this code. It's hard to reason about the JIT.
@@ -151,7 +142,7 @@ object ColumnarBatchBenchmark {
 
     // Access through the column API with on heap memory
     val columnOnHeap = { i: Int =>
-      val col = allocate(count, IntegerType, MemoryMode.ON_HEAP)
+      val col = new OnHeapColumnVector(count, IntegerType)
       var sum = 0L
       for (n <- 0L until iters) {
         var i = 0
@@ -170,7 +161,7 @@ object ColumnarBatchBenchmark {
 
     // Access through the column API with off heap memory
     def columnOffHeap = { i: Int => {
-      val col = allocate(count, IntegerType, MemoryMode.OFF_HEAP)
+      val col = new OffHeapColumnVector(count, IntegerType)
       var sum = 0L
       for (n <- 0L until iters) {
         var i = 0
@@ -189,7 +180,7 @@ object ColumnarBatchBenchmark {
 
     // Access by directly getting the buffer backing the column.
     val columnOffheapDirect = { i: Int =>
-      val col = allocate(count, IntegerType, MemoryMode.OFF_HEAP)
+      val col = new OffHeapColumnVector(count, IntegerType)
       var sum = 0L
       for (n <- 0L until iters) {
         var addr = col.valuesNativeAddress()
@@ -255,7 +246,7 @@ object ColumnarBatchBenchmark {
 
     // Adding values by appending, instead of putting.
     val onHeapAppend = { i: Int =>
-      val col = allocate(count, IntegerType, MemoryMode.ON_HEAP)
+      val col = new OnHeapColumnVector(count, IntegerType)
       var sum = 0L
       for (n <- 0L until iters) {
         var i = 0
@@ -330,7 +321,7 @@ object ColumnarBatchBenchmark {
       for (n <- 0L until iters) {
         var i = 0
         while (i < count) {
-          if (i % 2 == 0) b(i) = 1;
+          if (i % 2 == 0) b(i) = 1
           i += 1
         }
         i = 0
@@ -351,7 +342,7 @@ object ColumnarBatchBenchmark {
   }
 
   def stringAccess(iters: Long): Unit = {
-    val chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+    val chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
     val random = new Random(0)
 
     def randomString(min: Int, max: Int): String = {
@@ -359,10 +350,10 @@ object ColumnarBatchBenchmark {
       val sb = new StringBuilder(len)
       var i = 0
       while (i < len) {
-        sb.append(chars.charAt(random.nextInt(chars.length())));
+        sb.append(chars.charAt(random.nextInt(chars.length())))
         i += 1
       }
-      return sb.toString
+      sb.toString
     }
 
     val minString = 3
@@ -373,7 +364,12 @@ object ColumnarBatchBenchmark {
       .map(_.getBytes(StandardCharsets.UTF_8)).toArray
 
     def column(memoryMode: MemoryMode) = { i: Int =>
-      val column = allocate(count, BinaryType, memoryMode)
+      val column = if (memoryMode == MemoryMode.OFF_HEAP) {
+        new OffHeapColumnVector(count, BinaryType)
+      } else {
+        new OnHeapColumnVector(count, BinaryType)
+      }
+
       var sum = 0L
       for (n <- 0L until iters) {
         var i = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/e3fd93f1/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 4a6c8f5..80a5086 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -50,11 +50,11 @@ class ColumnarBatchSuite extends SparkFunSuite {
       name: String,
       size: Int,
       dt: DataType)(
-      block: (WritableColumnVector, MemoryMode) => Unit): Unit = {
+      block: WritableColumnVector => Unit): Unit = {
     test(name) {
       Seq(MemoryMode.ON_HEAP, MemoryMode.OFF_HEAP).foreach { mode =>
         val vector = allocate(size, dt, mode)
-        try block(vector, mode) finally {
+        try block(vector) finally {
           vector.close()
         }
       }
@@ -62,7 +62,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
   }
 
   testVector("Null APIs", 1024, IntegerType) {
-    (column, memMode) =>
+    column =>
       val reference = mutable.ArrayBuffer.empty[Boolean]
       var idx = 0
       assert(!column.anyNullsSet())
@@ -121,15 +121,11 @@ class ColumnarBatchSuite extends SparkFunSuite {
 
       reference.zipWithIndex.foreach { v =>
         assert(v._1 == column.isNullAt(v._2))
-        if (memMode == MemoryMode.OFF_HEAP) {
-          val addr = column.nullsNativeAddress()
-          assert(v._1 == (Platform.getByte(null, addr + v._2) == 1), "index=" 
+ v._2)
-        }
       }
   }
 
   testVector("Byte APIs", 1024, ByteType) {
-    (column, memMode) =>
+    column =>
       val reference = mutable.ArrayBuffer.empty[Byte]
 
       var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toByte).toArray
@@ -173,16 +169,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
       idx += 3
 
       reference.zipWithIndex.foreach { v =>
-        assert(v._1 == column.getByte(v._2), "MemoryMode" + memMode)
-        if (memMode == MemoryMode.OFF_HEAP) {
-          val addr = column.valuesNativeAddress()
-          assert(v._1 == Platform.getByte(null, addr + v._2))
-        }
+        assert(v._1 == column.getByte(v._2), "VectorType=" + 
column.getClass.getSimpleName)
       }
   }
 
   testVector("Short APIs", 1024, ShortType) {
-    (column, memMode) =>
+    column =>
       val seed = System.currentTimeMillis()
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Short]
@@ -248,16 +240,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
       }
 
       reference.zipWithIndex.foreach { v =>
-        assert(v._1 == column.getShort(v._2), "Seed = " + seed + " Mem Mode=" 
+ memMode)
-        if (memMode == MemoryMode.OFF_HEAP) {
-          val addr = column.valuesNativeAddress()
-          assert(v._1 == Platform.getShort(null, addr + 2 * v._2))
-        }
+        assert(v._1 == column.getShort(v._2),
+          "Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
       }
   }
 
   testVector("Int APIs", 1024, IntegerType) {
-    (column, memMode) =>
+    column =>
       val seed = System.currentTimeMillis()
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Int]
@@ -329,16 +318,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
       }
 
       reference.zipWithIndex.foreach { v =>
-        assert(v._1 == column.getInt(v._2), "Seed = " + seed + " Mem Mode=" + 
memMode)
-        if (memMode == MemoryMode.OFF_HEAP) {
-          val addr = column.valuesNativeAddress()
-          assert(v._1 == Platform.getInt(null, addr + 4 * v._2))
-        }
+        assert(v._1 == column.getInt(v._2),
+          "Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
       }
   }
 
   testVector("Long APIs", 1024, LongType) {
-    (column, memMode) =>
+    column =>
       val seed = System.currentTimeMillis()
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Long]
@@ -413,16 +399,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
 
       reference.zipWithIndex.foreach { v =>
         assert(v._1 == column.getLong(v._2), "idx=" + v._2 +
-          " Seed = " + seed + " MemMode=" + memMode)
-        if (memMode == MemoryMode.OFF_HEAP) {
-          val addr = column.valuesNativeAddress()
-          assert(v._1 == Platform.getLong(null, addr + 8 * v._2))
-        }
+          " Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
       }
   }
 
   testVector("Float APIs", 1024, FloatType) {
-    (column, memMode) =>
+    column =>
       val seed = System.currentTimeMillis()
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Float]
@@ -500,16 +482,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
       }
 
       reference.zipWithIndex.foreach { v =>
-        assert(v._1 == column.getFloat(v._2), "Seed = " + seed + " MemMode=" + 
memMode)
-        if (memMode == MemoryMode.OFF_HEAP) {
-          val addr = column.valuesNativeAddress()
-          assert(v._1 == Platform.getFloat(null, addr + 4 * v._2))
-        }
+        assert(v._1 == column.getFloat(v._2),
+          "Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
       }
   }
 
   testVector("Double APIs", 1024, DoubleType) {
-    (column, memMode) =>
+    column =>
       val seed = System.currentTimeMillis()
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Double]
@@ -587,16 +566,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
       }
 
       reference.zipWithIndex.foreach { v =>
-        assert(v._1 == column.getDouble(v._2), "Seed = " + seed + " MemMode=" 
+ memMode)
-        if (memMode == MemoryMode.OFF_HEAP) {
-          val addr = column.valuesNativeAddress()
-          assert(v._1 == Platform.getDouble(null, addr + 8 * v._2))
-        }
+        assert(v._1 == column.getDouble(v._2),
+          "Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
       }
   }
 
   testVector("String APIs", 6, StringType) {
-    (column, memMode) =>
+    column =>
       val reference = mutable.ArrayBuffer.empty[String]
 
       assert(column.arrayData().elementsAppended == 0)
@@ -643,9 +619,9 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(column.arrayData().elementsAppended == 17 + (s + s).length)
 
       reference.zipWithIndex.foreach { v =>
-        assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + 
memMode)
-        assert(v._1 == column.getUTF8String(v._2).toString,
-          "MemoryMode" + memMode)
+        val errMsg = "VectorType=" + column.getClass.getSimpleName
+        assert(v._1.length == column.getArrayLength(v._2), errMsg)
+        assert(v._1 == column.getUTF8String(v._2).toString, errMsg)
       }
 
       column.reset()
@@ -653,7 +629,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
   }
 
   testVector("Int Array", 10, new ArrayType(IntegerType, true)) {
-    (column, _) =>
+    column =>
 
       // Fill the underlying data with all the arrays back to back.
       val data = column.arrayData()
@@ -763,7 +739,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
   testVector(
     "Struct Column",
     10,
-    new StructType().add("int", IntegerType).add("double", DoubleType)) { 
(column, _) =>
+    new StructType().add("int", IntegerType).add("double", DoubleType)) { 
column =>
       val c1 = column.getChildColumn(0)
       val c2 = column.getChildColumn(1)
       assert(c1.dataType() == IntegerType)
@@ -789,7 +765,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
   }
 
   testVector("Nest Array in Array", 10, new ArrayType(new 
ArrayType(IntegerType, true), true)) {
-    (column, _) =>
+    column =>
       val childColumn = column.arrayData()
       val data = column.arrayData().arrayData()
       (0 until 6).foreach {
@@ -822,7 +798,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
   testVector(
     "Nest Struct in Array",
     10,
-    new ArrayType(structType, true)) { (column, _) =>
+    new ArrayType(structType, true)) { column =>
       val data = column.arrayData()
       val c0 = data.getChildColumn(0)
       val c1 = data.getChildColumn(1)
@@ -851,7 +827,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
     10,
     new StructType()
       .add("int", IntegerType)
-      .add("array", new ArrayType(IntegerType, true))) { (column, _) =>
+      .add("array", new ArrayType(IntegerType, true))) { column =>
       val c0 = column.getChildColumn(0)
       val c1 = column.getChildColumn(1)
       c0.putInt(0, 0)
@@ -880,7 +856,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
   testVector(
     "Nest Struct in Struct",
     10,
-    new StructType().add("int", IntegerType).add("struct", subSchema)) { 
(column, _) =>
+    new StructType().add("int", IntegerType).add("struct", subSchema)) { 
column =>
       val c0 = column.getChildColumn(0)
       val c1 = column.getChildColumn(1)
       c0.putInt(0, 0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to