Repository: spark
Updated Branches:
  refs/heads/master f470df2fc -> 52e00f706


[SPARK-23280][SQL] add map type support to ColumnVector

## What changes were proposed in this pull request?

Fill the last missing piece of `ColumnVector`: the map type support.

The idea is similar to the array type support. A map is basically 2 arrays: 
keys and values. We ask the implementations to provide a key array, a value 
array, and an offset and length to specify the range of this map in the 
key/value array.

In `WritableColumnVector`, we put the key array in first child vector, and 
value array in second child vector, and offsets and lengths in the current 
vector, which is very similar to how array type is implemented here.

## How was this patch tested?

a new test

Author: Wenchen Fan <wenc...@databricks.com>

Closes #20450 from cloud-fan/map.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52e00f70
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52e00f70
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52e00f70

Branch: refs/heads/master
Commit: 52e00f70663a87b5837235bdf72a3e6f84e11411
Parents: f470df2
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu Feb 1 11:56:06 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Feb 1 11:56:06 2018 +0800

----------------------------------------------------------------------
 .../datasources/orc/OrcColumnVector.java        |  6 ++
 .../execution/vectorized/ColumnVectorUtils.java | 15 +++++
 .../vectorized/OffHeapColumnVector.java         |  4 +-
 .../vectorized/OnHeapColumnVector.java          |  4 +-
 .../vectorized/WritableColumnVector.java        | 13 ++++
 .../spark/sql/vectorized/ArrowColumnVector.java |  5 ++
 .../spark/sql/vectorized/ColumnVector.java      | 14 +++-
 .../spark/sql/vectorized/ColumnarArray.java     |  4 +-
 .../spark/sql/vectorized/ColumnarMap.java       | 53 +++++++++++++++
 .../spark/sql/vectorized/ColumnarRow.java       |  5 +-
 .../vectorized/ColumnarBatchSuite.scala         | 70 +++++++++++++++-----
 11 files changed, 166 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/52e00f70/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
index 78203e3..c8add4c 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
@@ -25,6 +25,7 @@ import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.TimestampType;
 import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarMap;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
@@ -178,6 +179,11 @@ public class OrcColumnVector extends 
org.apache.spark.sql.vectorized.ColumnVecto
   }
 
   @Override
+  public ColumnarMap getMap(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) {
     throw new UnsupportedOperationException();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/52e00f70/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index a2853bb..829f3ce 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -20,8 +20,10 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
 import java.sql.Date;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.Row;
@@ -30,6 +32,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.types.*;
 import org.apache.spark.sql.vectorized.ColumnarArray;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.sql.vectorized.ColumnarMap;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
@@ -109,6 +112,18 @@ public class ColumnVectorUtils {
     return array.toIntArray();
   }
 
+  public static Map<Integer, Integer> toJavaIntMap(ColumnarMap map) {
+    int[] keys = toJavaIntArray(map.keyArray());
+    int[] values = toJavaIntArray(map.valueArray());
+    assert keys.length == values.length;
+
+    Map<Integer, Integer> result = new HashMap<>();
+    for (int i = 0; i < keys.length; i++) {
+      result.put(keys[i], values[i]);
+    }
+    return result;
+  }
+
   private static void appendValue(WritableColumnVector dst, DataType t, Object 
o) {
     if (o == null) {
       if (t instanceof CalendarIntervalType) {

http://git-wip-us.apache.org/repos/asf/spark/blob/52e00f70/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index fa52e4a..754c265 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -60,7 +60,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   private long nulls;
   private long data;
 
-  // Set iff the type is array.
+  // Only set if type is Array or Map.
   private long lengthData;
   private long offsetData;
 
@@ -530,7 +530,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   @Override
   protected void reserveInternal(int newCapacity) {
     int oldCapacity = (nulls == 0L) ? 0 : capacity;
-    if (isArray()) {
+    if (isArray() || type instanceof MapType) {
       this.lengthData =
           Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 
4);
       this.offsetData =

http://git-wip-us.apache.org/repos/asf/spark/blob/52e00f70/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index cccef78..23dcc10 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -69,7 +69,7 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
   private float[] floatData;
   private double[] doubleData;
 
-  // Only set if type is Array.
+  // Only set if type is Array or Map.
   private int[] arrayLengths;
   private int[] arrayOffsets;
 
@@ -503,7 +503,7 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
   // Spilt this function out since it is the slow path.
   @Override
   protected void reserveInternal(int newCapacity) {
-    if (isArray()) {
+    if (isArray() || type instanceof MapType) {
       int[] newLengths = new int[newCapacity];
       int[] newOffsets = new int[newCapacity];
       if (this.arrayLengths != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/52e00f70/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index 8ebc1ad..c2e5954 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -25,6 +25,7 @@ import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.*;
 import org.apache.spark.sql.vectorized.ColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarMap;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.unsafe.types.UTF8String;
 
@@ -612,6 +613,13 @@ public abstract class WritableColumnVector extends 
ColumnVector {
     return new ColumnarArray(arrayData(), getArrayOffset(rowId), 
getArrayLength(rowId));
   }
 
+  // `WritableColumnVector` puts the key array in the first child column 
vector, value array in the
+  // second child column vector, and puts the offsets and lengths in the 
current column vector.
+  @Override
+  public final ColumnarMap getMap(int rowId) {
+    return new ColumnarMap(getChild(0), getChild(1), getArrayOffset(rowId), 
getArrayLength(rowId));
+  }
+
   public WritableColumnVector arrayData() {
     return childColumns[0];
   }
@@ -705,6 +713,11 @@ public abstract class WritableColumnVector extends 
ColumnVector {
       for (int i = 0; i < childColumns.length; ++i) {
         this.childColumns[i] = reserveNewColumn(capacity, 
st.fields()[i].dataType());
       }
+    } else if (type instanceof MapType) {
+      MapType mapType = (MapType) type;
+      this.childColumns = new WritableColumnVector[2];
+      this.childColumns[0] = reserveNewColumn(capacity, mapType.keyType());
+      this.childColumns[1] = reserveNewColumn(capacity, mapType.valueType());
     } else if (type instanceof CalendarIntervalType) {
       // Two columns. Months as int. Microseconds as Long.
       this.childColumns = new WritableColumnVector[2];

http://git-wip-us.apache.org/repos/asf/spark/blob/52e00f70/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java 
b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
index 5ff6474..f3ece53 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
@@ -120,6 +120,11 @@ public final class ArrowColumnVector extends ColumnVector {
   }
 
   @Override
+  public ColumnarMap getMap(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public ArrowColumnVector getChild(int ordinal) { return 
childColumns[ordinal]; }
 
   public ArrowColumnVector(ValueVector vector) {

http://git-wip-us.apache.org/repos/asf/spark/blob/52e00f70/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java 
b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
index d588956..05271ec 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
@@ -220,10 +220,18 @@ public abstract class ColumnVector implements 
AutoCloseable {
 
   /**
    * Returns the map type value for rowId.
+   *
+   * In Spark, map type value is basically a key data array and a value data 
array. A key from the
+   * key array with a index and a value from the value array with the same 
index contribute to
+   * an entry of this map type value.
+   *
+   * To support map type, implementations must construct an {@link 
ColumnarMap} and return it in
+   * this method. {@link ColumnarMap} requires a {@link ColumnVector} that 
stores the data of all
+   * the keys of all the maps in this vector, and another {@link ColumnVector} 
that stores the data
+   * of all the values of all the maps in this vector, and a pair of offset 
and length which
+   * specify the range of the key/value array that belongs to the map type 
value at rowId.
    */
-  public MapData getMap(int ordinal) {
-    throw new UnsupportedOperationException();
-  }
+  public abstract ColumnarMap getMap(int ordinal);
 
   /**
    * Returns the decimal type value for rowId.

http://git-wip-us.apache.org/repos/asf/spark/blob/52e00f70/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java 
b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
index 72c07ee..7c7a1c8 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
@@ -149,8 +149,8 @@ public final class ColumnarArray extends ArrayData {
   }
 
   @Override
-  public MapData getMap(int ordinal) {
-    throw new UnsupportedOperationException();
+  public ColumnarMap getMap(int ordinal) {
+    return data.getMap(offset + ordinal);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/52e00f70/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarMap.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarMap.java 
b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarMap.java
new file mode 100644
index 0000000..35648e3
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarMap.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.vectorized;
+
+import org.apache.spark.sql.catalyst.util.MapData;
+
+/**
+ * Map abstraction in {@link ColumnVector}.
+ */
+public final class ColumnarMap extends MapData {
+  private final ColumnarArray keys;
+  private final ColumnarArray values;
+  private final int length;
+
+  public ColumnarMap(ColumnVector keys, ColumnVector values, int offset, int 
length) {
+    this.length = length;
+    this.keys = new ColumnarArray(keys, offset, length);
+    this.values = new ColumnarArray(values, offset, length);
+  }
+
+  @Override
+  public int numElements() { return length; }
+
+  @Override
+  public ColumnarArray keyArray() {
+    return keys;
+  }
+
+  @Override
+  public ColumnarArray valueArray() {
+    return values;
+  }
+
+  @Override
+  public ColumnarMap copy() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/52e00f70/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java 
b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
index 6ca749d..0c9e92e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
@@ -155,8 +155,9 @@ public final class ColumnarRow extends InternalRow {
   }
 
   @Override
-  public MapData getMap(int ordinal) {
-    throw new UnsupportedOperationException();
+  public ColumnarMap getMap(int ordinal) {
+    if (data.getChild(ordinal).isNullAt(rowId)) return null;
+    return data.getChild(ordinal).getMap(rowId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/52e00f70/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 168bc5e..8fe2985 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -673,35 +673,37 @@ class ColumnarBatchSuite extends SparkFunSuite {
         i += 1
       }
 
-      // Populate it with arrays [0], [1, 2], [], [3, 4, 5]
+      // Populate it with arrays [0], [1, 2], null, [], [3, 4, 5]
       column.putArray(0, 0, 1)
       column.putArray(1, 1, 2)
-      column.putArray(2, 2, 0)
-      column.putArray(3, 3, 3)
+      column.putNull(2)
+      column.putArray(3, 3, 0)
+      column.putArray(4, 3, 3)
+
+      assert(column.getArray(0).numElements == 1)
+      assert(column.getArray(1).numElements == 2)
+      assert(column.isNullAt(2))
+      assert(column.getArray(3).numElements == 0)
+      assert(column.getArray(4).numElements == 3)
 
       val a1 = ColumnVectorUtils.toJavaIntArray(column.getArray(0))
       val a2 = ColumnVectorUtils.toJavaIntArray(column.getArray(1))
-      val a3 = ColumnVectorUtils.toJavaIntArray(column.getArray(2))
-      val a4 = ColumnVectorUtils.toJavaIntArray(column.getArray(3))
+      val a3 = ColumnVectorUtils.toJavaIntArray(column.getArray(3))
+      val a4 = ColumnVectorUtils.toJavaIntArray(column.getArray(4))
       assert(a1 === Array(0))
       assert(a2 === Array(1, 2))
       assert(a3 === Array.empty[Int])
       assert(a4 === Array(3, 4, 5))
 
-      // Verify the ArrayData APIs
-      assert(column.getArray(0).numElements() == 1)
+      // Verify the ArrayData get APIs
       assert(column.getArray(0).getInt(0) == 0)
 
-      assert(column.getArray(1).numElements() == 2)
       assert(column.getArray(1).getInt(0) == 1)
       assert(column.getArray(1).getInt(1) == 2)
 
-      assert(column.getArray(2).numElements() == 0)
-
-      assert(column.getArray(3).numElements() == 3)
-      assert(column.getArray(3).getInt(0) == 3)
-      assert(column.getArray(3).getInt(1) == 4)
-      assert(column.getArray(3).getInt(2) == 5)
+      assert(column.getArray(4).getInt(0) == 3)
+      assert(column.getArray(4).getInt(1) == 4)
+      assert(column.getArray(4).getInt(2) == 5)
 
       // Add a longer array which requires resizing
       column.reset()
@@ -711,8 +713,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(data.capacity == array.length * 2)
       data.putInts(0, array.length, array, 0)
       column.putArray(0, 0, array.length)
-      assert(ColumnVectorUtils.toJavaIntArray(column.getArray(0))
-        === array)
+      assert(ColumnVectorUtils.toJavaIntArray(column.getArray(0)) === array)
   }
 
   test("toArray for primitive types") {
@@ -770,6 +771,43 @@ class ColumnarBatchSuite extends SparkFunSuite {
     }
   }
 
+  test("Int Map") {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
+      val column = allocate(10, new MapType(IntegerType, IntegerType, false), 
memMode)
+      (0 to 1).foreach { colIndex =>
+        val data = column.getChild(colIndex)
+        (0 to 5).foreach {i =>
+          data.putInt(i, i * (colIndex + 1))
+        }
+      }
+
+      // Populate it with maps [0->0], [1->2, 2->4], null, [], [3->6, 4->8, 
5->10]
+      column.putArray(0, 0, 1)
+      column.putArray(1, 1, 2)
+      column.putNull(2)
+      column.putArray(3, 3, 0)
+      column.putArray(4, 3, 3)
+
+      assert(column.getMap(0).numElements == 1)
+      assert(column.getMap(1).numElements == 2)
+      assert(column.isNullAt(2))
+      assert(column.getMap(3).numElements == 0)
+      assert(column.getMap(4).numElements == 3)
+
+      val a1 = ColumnVectorUtils.toJavaIntMap(column.getMap(0))
+      val a2 = ColumnVectorUtils.toJavaIntMap(column.getMap(1))
+      val a4 = ColumnVectorUtils.toJavaIntMap(column.getMap(3))
+      val a5 = ColumnVectorUtils.toJavaIntMap(column.getMap(4))
+
+      assert(a1.asScala == Map(0 -> 0))
+      assert(a2.asScala == Map(1 -> 2, 2 -> 4))
+      assert(a4.asScala == Map())
+      assert(a5.asScala == Map(3 -> 6, 4 -> 8, 5 -> 10))
+
+      column.close()
+    }
+  }
+
   testVector(
     "Struct Column",
     10,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to