Repository: spark Updated Branches: refs/heads/master 90d77e971 -> afb21bf22
[SPARK-20537][CORE] Fixing OffHeapColumnVector reallocation ## What changes were proposed in this pull request? As #17773 revealed `OnHeapColumnVector` may copy a part of the original storage. `OffHeapColumnVector` reallocation also copies to the new storage data up to 'elementsAppended'. This variable is only updated when using the `ColumnVector.appendX` API, while `ColumnVector.putX` is more commonly used. This PR copies the new storage data up to the previously-allocated size in`OffHeapColumnVector`. ## How was this patch tested? Existing test suites Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com> Closes #17811 from kiszk/SPARK-20537. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afb21bf2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afb21bf2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afb21bf2 Branch: refs/heads/master Commit: afb21bf22a59c9416c04637412fb69d1442e6826 Parents: 90d77e9 Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com> Authored: Tue May 2 13:56:41 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Tue May 2 13:56:41 2017 +0800 ---------------------------------------------------------------------- .../execution/vectorized/OffHeapColumnVector.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/afb21bf2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index e988c07..a7d3744 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -436,28 +436,29 @@ public final class OffHeapColumnVector extends ColumnVector { // Split out the slow path. @Override protected void reserveInternal(int newCapacity) { + int oldCapacity = (this.data == 0L) ? 0 : capacity; if (this.resultArray != null) { this.lengthData = - Platform.reallocateMemory(lengthData, elementsAppended * 4, newCapacity * 4); + Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4); this.offsetData = - Platform.reallocateMemory(offsetData, elementsAppended * 4, newCapacity * 4); + Platform.reallocateMemory(offsetData, oldCapacity * 4, newCapacity * 4); } else if (type instanceof ByteType || type instanceof BooleanType) { - this.data = Platform.reallocateMemory(data, elementsAppended, newCapacity); + this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity); } else if (type instanceof ShortType) { - this.data = Platform.reallocateMemory(data, elementsAppended * 2, newCapacity * 2); + this.data = Platform.reallocateMemory(data, oldCapacity * 2, newCapacity * 2); } else if (type instanceof IntegerType || type instanceof FloatType || type instanceof DateType || DecimalType.is32BitDecimalType(type)) { - this.data = Platform.reallocateMemory(data, elementsAppended * 4, newCapacity * 4); + this.data = Platform.reallocateMemory(data, oldCapacity * 4, newCapacity * 4); } else if (type instanceof LongType || type instanceof DoubleType || DecimalType.is64BitDecimalType(type) || type instanceof TimestampType) { - this.data = Platform.reallocateMemory(data, elementsAppended * 8, newCapacity * 8); + this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8); } else if (resultStruct != null) { // Nothing to store. } else { throw new RuntimeException("Unhandled " + type); } - this.nulls = Platform.reallocateMemory(nulls, elementsAppended, newCapacity); - Platform.setMemory(nulls + elementsAppended, (byte)0, newCapacity - elementsAppended); + this.nulls = Platform.reallocateMemory(nulls, oldCapacity, newCapacity); + Platform.setMemory(nulls + oldCapacity, (byte)0, newCapacity - oldCapacity); capacity = newCapacity; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org