Repository: spark
Updated Branches:
  refs/heads/master 10e37f6eb -> d2b2932d8


[SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts 
struct and array data

## What changes were proposed in this pull request?

`OffHeapColumnVector.reserveInternal()` will only copy already inserted values 
during reallocation if `data != null`. In vectors containing arrays or structs 
this is incorrect, since there field `data` is not used at all. We need to 
check `nulls` instead.

## How was this patch tested?

Adds new tests to `ColumnVectorSuite` that reproduce the errors.

Author: Ala Luszczak <a...@databricks.com>

Closes #19308 from ala/vector-realloc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2b2932d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2b2932d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2b2932d

Branch: refs/heads/master
Commit: d2b2932d8be01dee31983121f6fffd16177bf48a
Parents: 10e37f6
Author: Ala Luszczak <a...@databricks.com>
Authored: Fri Sep 22 15:31:43 2017 +0200
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Fri Sep 22 15:31:43 2017 +0200

----------------------------------------------------------------------
 .../vectorized/OffHeapColumnVector.java         |  2 +-
 .../vectorized/ColumnVectorSuite.scala          | 26 ++++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d2b2932d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 3568275..e1d3685 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -515,7 +515,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   // Split out the slow path.
   @Override
   protected void reserveInternal(int newCapacity) {
-    int oldCapacity = (this.data == 0L) ? 0 : capacity;
+    int oldCapacity = (nulls == 0L) ? 0 : capacity;
     if (this.resultArray != null) {
       this.lengthData =
           Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 
4);

http://git-wip-us.apache.org/repos/asf/spark/blob/d2b2932d/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
index 998067a..f7b06c9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -198,4 +198,30 @@ class ColumnVectorSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(0, 
IntegerType) === 456)
     assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(1, 
DoubleType) === 5.67)
   }
+
+  test("[SPARK-22092] off-heap column vector reallocation corrupts array 
data") {
+    val arrayType = ArrayType(IntegerType, true)
+    testVector = new OffHeapColumnVector(8, arrayType)
+
+    val data = testVector.arrayData()
+    (0 until 8).foreach(i => data.putInt(i, i))
+    (0 until 8).foreach(i => testVector.putArray(i, i, 1))
+
+    // Increase vector's capacity and reallocate the data to new bigger 
buffers.
+    testVector.reserve(16)
+
+    // Check that none of the values got lost/overwritten.
+    val array = new ColumnVector.Array(testVector)
+    (0 until 8).foreach { i =>
+      assert(array.get(i, arrayType).asInstanceOf[ArrayData].toIntArray() === 
Array(i))
+    }
+  }
+
+  test("[SPARK-22092] off-heap column vector reallocation corrupts struct 
nullability") {
+    val structType = new StructType().add("int", IntegerType).add("double", 
DoubleType)
+    testVector = new OffHeapColumnVector(8, structType)
+    (0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else 
testVector.putNotNull(i))
+    testVector.reserve(16)
+    (0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0)))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to