Repository: spark
Updated Branches:
  refs/heads/master 90d77e971 -> afb21bf22


[SPARK-20537][CORE] Fixing OffHeapColumnVector reallocation

## What changes were proposed in this pull request?

As #17773 revealed `OnHeapColumnVector` may copy a part of the original storage.

`OffHeapColumnVector` reallocation also copies to the new storage data up to 
'elementsAppended'. This variable is only updated when using the 
`ColumnVector.appendX` API, while `ColumnVector.putX` is more commonly used.
This PR copies the new storage data up to the previously-allocated size 
in`OffHeapColumnVector`.

## How was this patch tested?

Existing test suites

Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com>

Closes #17811 from kiszk/SPARK-20537.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afb21bf2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afb21bf2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afb21bf2

Branch: refs/heads/master
Commit: afb21bf22a59c9416c04637412fb69d1442e6826
Parents: 90d77e9
Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com>
Authored: Tue May 2 13:56:41 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue May 2 13:56:41 2017 +0800

----------------------------------------------------------------------
 .../execution/vectorized/OffHeapColumnVector.java  | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/afb21bf2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index e988c07..a7d3744 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -436,28 +436,29 @@ public final class OffHeapColumnVector extends 
ColumnVector {
   // Split out the slow path.
   @Override
   protected void reserveInternal(int newCapacity) {
+    int oldCapacity = (this.data == 0L) ? 0 : capacity;
     if (this.resultArray != null) {
       this.lengthData =
-          Platform.reallocateMemory(lengthData, elementsAppended * 4, 
newCapacity * 4);
+          Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 
4);
       this.offsetData =
-          Platform.reallocateMemory(offsetData, elementsAppended * 4, 
newCapacity * 4);
+          Platform.reallocateMemory(offsetData, oldCapacity * 4, newCapacity * 
4);
     } else if (type instanceof ByteType || type instanceof BooleanType) {
-      this.data = Platform.reallocateMemory(data, elementsAppended, 
newCapacity);
+      this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity);
     } else if (type instanceof ShortType) {
-      this.data = Platform.reallocateMemory(data, elementsAppended * 2, 
newCapacity * 2);
+      this.data = Platform.reallocateMemory(data, oldCapacity * 2, newCapacity 
* 2);
     } else if (type instanceof IntegerType || type instanceof FloatType ||
         type instanceof DateType || DecimalType.is32BitDecimalType(type)) {
-      this.data = Platform.reallocateMemory(data, elementsAppended * 4, 
newCapacity * 4);
+      this.data = Platform.reallocateMemory(data, oldCapacity * 4, newCapacity 
* 4);
     } else if (type instanceof LongType || type instanceof DoubleType ||
         DecimalType.is64BitDecimalType(type) || type instanceof TimestampType) 
{
-      this.data = Platform.reallocateMemory(data, elementsAppended * 8, 
newCapacity * 8);
+      this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity 
* 8);
     } else if (resultStruct != null) {
       // Nothing to store.
     } else {
       throw new RuntimeException("Unhandled " + type);
     }
-    this.nulls = Platform.reallocateMemory(nulls, elementsAppended, 
newCapacity);
-    Platform.setMemory(nulls + elementsAppended, (byte)0, newCapacity - 
elementsAppended);
+    this.nulls = Platform.reallocateMemory(nulls, oldCapacity, newCapacity);
+    Platform.setMemory(nulls + oldCapacity, (byte)0, newCapacity - 
oldCapacity);
     capacity = newCapacity;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to