Repository: spark Updated Branches: refs/heads/master d49d9e403 -> 5a02e3a2a
[SPARK-22602][SQL] remove ColumnVector#loadBytes ## What changes were proposed in this pull request? `ColumnVector#loadBytes` is only used as an optimization for reading UTF8String in `WritableColumnVector`, this PR moves this optimization to `WritableColumnVector` and simplified it. ## How was this patch tested? existing test Author: Wenchen Fan <wenc...@databricks.com> Closes #19815 from cloud-fan/load-bytes. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a02e3a2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a02e3a2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a02e3a2 Branch: refs/heads/master Commit: 5a02e3a2ac8a25d92d98d3b3b0d1173dddb9cc91 Parents: d49d9e4 Author: Wenchen Fan <wenc...@databricks.com> Authored: Sun Nov 26 21:49:09 2017 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Sun Nov 26 21:49:09 2017 -0800 ---------------------------------------------------------------------- .../execution/vectorized/ArrowColumnVector.java | 5 ---- .../sql/execution/vectorized/ColumnVector.java | 8 ++---- .../sql/execution/vectorized/ColumnarArray.java | 8 ------ .../vectorized/OffHeapColumnVector.java | 23 ++++++---------- .../vectorized/OnHeapColumnVector.java | 12 ++++---- .../vectorized/WritableColumnVector.java | 29 +++++--------------- 6 files changed, 24 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java index 3a10e98..5c502c9 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java @@ -240,11 +240,6 @@ public final class ArrowColumnVector extends ColumnVector { return accessor.getArrayOffset(rowId); } - @Override - public void loadBytes(ColumnarArray array) { - throw new UnsupportedOperationException(); - } - // // APIs dealing with Decimals // http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index 360ed83e..940457f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -181,11 +181,6 @@ public abstract class ColumnVector implements AutoCloseable { } /** - * Loads the data into array.byteArray. - */ - public abstract void loadBytes(ColumnarArray array); - - /** * Returns the value for rowId. */ public MapData getMap(int ordinal) { @@ -198,7 +193,8 @@ public abstract class ColumnVector implements AutoCloseable { public abstract Decimal getDecimal(int rowId, int precision, int scale); /** - * Returns the UTF8String for rowId. + * Returns the UTF8String for rowId. Note that the returned UTF8String may point to the data of + * this column vector, please copy it if you want to keep it after this column vector is freed. */ public abstract UTF8String getUTF8String(int rowId); http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java index 34bde3e..b9da641 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java @@ -33,14 +33,6 @@ public final class ColumnarArray extends ArrayData { public int length; public int offset; - // Populate if binary data is required for the Array. This is stored here as an optimization - // for string data. - public byte[] byteArray; - public int byteArrayOffset; - - // Reused staging buffer, used for loading from offheap. - protected byte[] tmpByteArray = new byte[1]; - ColumnarArray(ColumnVector data) { this.data = data; } http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 6b5c783..1cbaf08 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.types.UTF8String; /** * Column data backed using offheap memory. @@ -75,16 +76,14 @@ public final class OffHeapColumnVector extends WritableColumnVector { reset(); } + /** + * Returns the off heap pointer for the values buffer. + */ @VisibleForTesting public long valuesNativeAddress() { return data; } - @VisibleForTesting - public long nullsNativeAddress() { - return nulls; - } - @Override public void close() { super.close(); @@ -207,6 +206,11 @@ public final class OffHeapColumnVector extends WritableColumnVector { return array; } + @Override + protected UTF8String getBytesAsUTF8String(int rowId, int count) { + return UTF8String.fromAddress(null, data + rowId, count); + } + // // APIs dealing with shorts // @@ -524,15 +528,6 @@ public final class OffHeapColumnVector extends WritableColumnVector { return result; } - @Override - public void loadBytes(ColumnarArray array) { - if (array.tmpByteArray.length < array.length) array.tmpByteArray = new byte[array.length]; - Platform.copyMemory( - null, data + array.offset, array.tmpByteArray, Platform.BYTE_ARRAY_OFFSET, array.length); - array.byteArray = array.tmpByteArray; - array.byteArrayOffset = 0; - } - // Split out the slow path. @Override protected void reserveInternal(int newCapacity) { http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index a7b103a..85d7229 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -22,6 +22,7 @@ import java.util.Arrays; import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.types.UTF8String; /** * A column backed by an in memory JVM array. This stores the NULLs as a byte per value @@ -203,6 +204,11 @@ public final class OnHeapColumnVector extends WritableColumnVector { return array; } + @Override + protected UTF8String getBytesAsUTF8String(int rowId, int count) { + return UTF8String.fromBytes(byteData, rowId, count); + } + // // APIs dealing with Shorts // @@ -484,12 +490,6 @@ public final class OnHeapColumnVector extends WritableColumnVector { arrayLengths[rowId] = length; } - @Override - public void loadBytes(ColumnarArray array) { - array.byteArray = byteData; - array.byteArrayOffset = array.offset; - } - // // APIs dealing with Byte Arrays // http://git-wip-us.apache.org/repos/asf/spark/blob/5a02e3a2/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 96cfeed..e7653f0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -280,18 +280,6 @@ public abstract class WritableColumnVector extends ColumnVector { return putByteArray(rowId, value, 0, value.length); } - /** - * Returns the value for rowId. - */ - private ColumnarArray getByteArray(int rowId) { - ColumnarArray array = getArray(rowId); - array.data.loadBytes(array); - return array; - } - - /** - * Returns the decimal for rowId. - */ @Override public Decimal getDecimal(int rowId, int precision, int scale) { if (precision <= Decimal.MAX_INT_DIGITS()) { @@ -318,14 +306,10 @@ public abstract class WritableColumnVector extends ColumnVector { } } - /** - * Returns the UTF8String for rowId. - */ @Override public UTF8String getUTF8String(int rowId) { if (dictionary == null) { - ColumnarArray a = getByteArray(rowId); - return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length); + return arrayData().getBytesAsUTF8String(getArrayOffset(rowId), getArrayLength(rowId)); } else { byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId)); return UTF8String.fromBytes(bytes); @@ -333,15 +317,16 @@ public abstract class WritableColumnVector extends ColumnVector { } /** - * Returns the byte array for rowId. + * Gets the values of bytes from [rowId, rowId + count), as a UTF8String. + * This method is similar to {@link ColumnVector#getBytes(int, int)}, but can save data copy as + * UTF8String is used as a pointer. */ + protected abstract UTF8String getBytesAsUTF8String(int rowId, int count); + @Override public byte[] getBinary(int rowId) { if (dictionary == null) { - ColumnarArray array = getByteArray(rowId); - byte[] bytes = new byte[array.length]; - System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length); - return bytes; + return arrayData().getBytes(getArrayOffset(rowId), getArrayLength(rowId)); } else { return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org