Repository: spark
Updated Branches:
  refs/heads/branch-2.2 c0a34a9ff -> 1a829df94


[SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts 
struct and array data

`OffHeapColumnVector.reserveInternal()` will only copy already inserted values 
during reallocation if `data != null`. In vectors containing arrays or structs 
this is incorrect, since there field `data` is not used at all. We need to 
check `nulls` instead.

Adds new tests to `ColumnVectorSuite` that reproduce the errors.

Author: Ala Luszczak <a...@databricks.com>

Closes #19323 from ala/port-vector-realloc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a829df9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a829df9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a829df9

Branch: refs/heads/branch-2.2
Commit: 1a829df94a9cfee4395353b0f93fb5bcd628dce4
Parents: c0a34a9
Author: Ala Luszczak <a...@databricks.com>
Authored: Sat Sep 23 16:09:47 2017 +0200
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Sat Sep 23 16:09:47 2017 +0200

----------------------------------------------------------------------
 .../vectorized/OffHeapColumnVector.java         |   2 +-
 .../vectorized/ColumnVectorSuite.scala          | 227 +++++++++++++++++++
 2 files changed, 228 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1a829df9/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index a7d3744..cda7f2f 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -436,7 +436,7 @@ public final class OffHeapColumnVector extends ColumnVector 
{
   // Split out the slow path.
   @Override
   protected void reserveInternal(int newCapacity) {
-    int oldCapacity = (this.data == 0L) ? 0 : capacity;
+    int oldCapacity = (nulls == 0L) ? 0 : capacity;
     if (this.resultArray != null) {
       this.lengthData =
           Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 
4);

http://git-wip-us.apache.org/repos/asf/spark/blob/1a829df9/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
new file mode 100644
index 0000000..19b93c9
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+  var testVector: ColumnVector = _
+
+  private def allocate(capacity: Int, dt: DataType): ColumnVector = {
+    new OnHeapColumnVector(capacity, dt)
+  }
+
+  override def afterEach(): Unit = {
+    testVector.close()
+  }
+
+  test("boolean") {
+    testVector = allocate(10, BooleanType)
+    (0 until 10).foreach { i =>
+      testVector.appendBoolean(i % 2 == 0)
+    }
+
+    val array = new ColumnVector.Array(testVector)
+
+    (0 until 10).foreach { i =>
+      assert(array.getBoolean(i) === (i % 2 == 0))
+    }
+  }
+
+  test("byte") {
+    testVector = allocate(10, ByteType)
+    (0 until 10).foreach { i =>
+      testVector.appendByte(i.toByte)
+    }
+
+    val array = new ColumnVector.Array(testVector)
+
+    (0 until 10).foreach { i =>
+      assert(array.getByte(i) === (i.toByte))
+    }
+  }
+
+  test("short") {
+    testVector = allocate(10, ShortType)
+    (0 until 10).foreach { i =>
+      testVector.appendShort(i.toShort)
+    }
+
+    val array = new ColumnVector.Array(testVector)
+
+    (0 until 10).foreach { i =>
+      assert(array.getShort(i) === (i.toShort))
+    }
+  }
+
+  test("int") {
+    testVector = allocate(10, IntegerType)
+    (0 until 10).foreach { i =>
+      testVector.appendInt(i)
+    }
+
+    val array = new ColumnVector.Array(testVector)
+
+    (0 until 10).foreach { i =>
+      assert(array.getInt(i) === i)
+    }
+  }
+
+  test("long") {
+    testVector = allocate(10, LongType)
+    (0 until 10).foreach { i =>
+      testVector.appendLong(i)
+    }
+
+    val array = new ColumnVector.Array(testVector)
+
+    (0 until 10).foreach { i =>
+      assert(array.getLong(i) === i)
+    }
+  }
+
+  test("float") {
+    testVector = allocate(10, FloatType)
+    (0 until 10).foreach { i =>
+      testVector.appendFloat(i.toFloat)
+    }
+
+    val array = new ColumnVector.Array(testVector)
+
+    (0 until 10).foreach { i =>
+      assert(array.getFloat(i) === i.toFloat)
+    }
+  }
+
+  test("double") {
+    testVector = allocate(10, DoubleType)
+    (0 until 10).foreach { i =>
+      testVector.appendDouble(i.toDouble)
+    }
+
+    val array = new ColumnVector.Array(testVector)
+
+    (0 until 10).foreach { i =>
+      assert(array.getDouble(i) === i.toDouble)
+    }
+  }
+
+  test("string") {
+    testVector = allocate(10, StringType)
+    (0 until 10).map { i =>
+      val utf8 = s"str$i".getBytes("utf8")
+      testVector.appendByteArray(utf8, 0, utf8.length)
+    }
+
+    val array = new ColumnVector.Array(testVector)
+
+    (0 until 10).foreach { i =>
+      assert(array.getUTF8String(i) === UTF8String.fromString(s"str$i"))
+    }
+  }
+
+  test("binary") {
+    testVector = allocate(10, BinaryType)
+    (0 until 10).map { i =>
+      val utf8 = s"str$i".getBytes("utf8")
+      testVector.appendByteArray(utf8, 0, utf8.length)
+    }
+
+    val array = new ColumnVector.Array(testVector)
+
+    (0 until 10).foreach { i =>
+      val utf8 = s"str$i".getBytes("utf8")
+      assert(array.getBinary(i) === utf8)
+    }
+  }
+
+  test("array") {
+    val arrayType = ArrayType(IntegerType, true)
+    testVector = allocate(10, arrayType)
+
+    val data = testVector.arrayData()
+    var i = 0
+    while (i < 6) {
+      data.putInt(i, i)
+      i += 1
+    }
+
+    // Populate it with arrays [0], [1, 2], [], [3, 4, 5]
+    testVector.putArray(0, 0, 1)
+    testVector.putArray(1, 1, 2)
+    testVector.putArray(2, 3, 0)
+    testVector.putArray(3, 3, 3)
+
+    val array = new ColumnVector.Array(testVector)
+
+    assert(array.getArray(0).toIntArray() === Array(0))
+    assert(array.getArray(1).asInstanceOf[ArrayData].toIntArray() === Array(1, 
2))
+    assert(array.getArray(2).asInstanceOf[ArrayData].toIntArray() === 
Array.empty[Int])
+    assert(array.getArray(3).asInstanceOf[ArrayData].toIntArray() === Array(3, 
4, 5))
+  }
+
+  test("struct") {
+    val schema = new StructType().add("int", IntegerType).add("double", 
DoubleType)
+    testVector = allocate(10, schema)
+    val c1 = testVector.getChildColumn(0)
+    val c2 = testVector.getChildColumn(1)
+    c1.putInt(0, 123)
+    c2.putDouble(0, 3.45)
+    c1.putInt(1, 456)
+    c2.putDouble(1, 5.67)
+
+    val array = new ColumnVector.Array(testVector)
+
+    assert(array.getStruct(0, 2).asInstanceOf[ColumnarBatch.Row].getInt(0) === 
123)
+    assert(array.getStruct(0, 2).asInstanceOf[ColumnarBatch.Row].getDouble(1) 
=== 3.45)
+    assert(array.getStruct(1, 2).asInstanceOf[ColumnarBatch.Row].getInt(0) === 
456)
+    assert(array.getStruct(1, 2).asInstanceOf[ColumnarBatch.Row].getDouble(1) 
=== 5.67)
+  }
+
+  test("[SPARK-22092] off-heap column vector reallocation corrupts array 
data") {
+    val arrayType = ArrayType(IntegerType, true)
+    testVector = new OffHeapColumnVector(8, arrayType)
+
+    val data = testVector.arrayData()
+    (0 until 8).foreach(i => data.putInt(i, i))
+    (0 until 8).foreach(i => testVector.putArray(i, i, 1))
+
+    // Increase vector's capacity and reallocate the data to new bigger 
buffers.
+    testVector.reserve(16)
+
+    // Check that none of the values got lost/overwritten.
+    val array = new ColumnVector.Array(testVector)
+    (0 until 8).foreach { i =>
+      assert(array.getArray(i).toIntArray() === Array(i))
+    }
+  }
+
+  test("[SPARK-22092] off-heap column vector reallocation corrupts struct 
nullability") {
+    val structType = new StructType().add("int", IntegerType).add("double", 
DoubleType)
+    testVector = new OffHeapColumnVector(8, structType)
+    (0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else 
testVector.putNotNull(i))
+    testVector.reserve(16)
+    (0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0)))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to