Repository: spark Updated Branches: refs/heads/master dc5d34d8d -> 9e33954dd
http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java new file mode 100644 index 0000000..b4f753c --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -0,0 +1,674 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.vectorized; + +import java.math.BigDecimal; +import java.math.BigInteger; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * This class adds write APIs to ColumnVector. + * It supports all the types and contains put APIs as well as their batched versions. + * The batched versions are preferable whenever possible. + * + * Capacity: The data stored is dense but the arrays are not fixed capacity. It is the + * responsibility of the caller to call reserve() to ensure there is enough room before adding + * elements. This means that the put() APIs do not check as in common cases (i.e. flat schemas), + * the lengths are known up front. + * + * A ColumnVector should be considered immutable once originally created. In other words, it is not + * valid to call put APIs after reads until reset() is called. + */ +public abstract class WritableColumnVector extends ColumnVector { + + /** + * Resets this column for writing. The currently stored values are no longer accessible. + */ + public void reset() { + if (isConstant) return; + + if (childColumns != null) { + for (ColumnVector c: childColumns) { + ((WritableColumnVector) c).reset(); + } + } + numNulls = 0; + elementsAppended = 0; + if (anyNullsSet) { + putNotNulls(0, capacity); + anyNullsSet = false; + } + } + + public void reserve(int requiredCapacity) { + if (requiredCapacity > capacity) { + int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L); + if (requiredCapacity <= newCapacity) { + try { + reserveInternal(newCapacity); + } catch (OutOfMemoryError outOfMemoryError) { + throwUnsupportedException(requiredCapacity, outOfMemoryError); + } + } else { + throwUnsupportedException(requiredCapacity, null); + } + } + } + + private void throwUnsupportedException(int requiredCapacity, Throwable cause) { + String message = "Cannot reserve additional contiguous bytes in the vectorized reader " + + "(requested = " + requiredCapacity + " bytes). As a workaround, you can disable the " + + "vectorized reader by setting " + SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() + + " to false."; + throw new RuntimeException(message, cause); + } + + @Override + public int numNulls() { return numNulls; } + + @Override + public boolean anyNullsSet() { return anyNullsSet; } + + /** + * Ensures that there is enough storage to store capacity elements. That is, the put() APIs + * must work for all rowIds < capacity. + */ + protected abstract void reserveInternal(int capacity); + + /** + * Sets the value at rowId to null/not null. + */ + public abstract void putNotNull(int rowId); + public abstract void putNull(int rowId); + + /** + * Sets the values from [rowId, rowId + count) to null/not null. + */ + public abstract void putNulls(int rowId, int count); + public abstract void putNotNulls(int rowId, int count); + + /** + * Sets the value at rowId to `value`. + */ + public abstract void putBoolean(int rowId, boolean value); + + /** + * Sets values from [rowId, rowId + count) to value. + */ + public abstract void putBooleans(int rowId, int count, boolean value); + + /** + * Sets the value at rowId to `value`. + */ + public abstract void putByte(int rowId, byte value); + + /** + * Sets values from [rowId, rowId + count) to value. + */ + public abstract void putBytes(int rowId, int count, byte value); + + /** + * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count) + */ + public abstract void putBytes(int rowId, int count, byte[] src, int srcIndex); + + /** + * Sets the value at rowId to `value`. + */ + public abstract void putShort(int rowId, short value); + + /** + * Sets values from [rowId, rowId + count) to value. + */ + public abstract void putShorts(int rowId, int count, short value); + + /** + * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count) + */ + public abstract void putShorts(int rowId, int count, short[] src, int srcIndex); + + /** + * Sets the value at rowId to `value`. + */ + public abstract void putInt(int rowId, int value); + + /** + * Sets values from [rowId, rowId + count) to value. + */ + public abstract void putInts(int rowId, int count, int value); + + /** + * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count) + */ + public abstract void putInts(int rowId, int count, int[] src, int srcIndex); + + /** + * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count]) + * The data in src must be 4-byte little endian ints. + */ + public abstract void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex); + + /** + * Sets the value at rowId to `value`. + */ + public abstract void putLong(int rowId, long value); + + /** + * Sets values from [rowId, rowId + count) to value. + */ + public abstract void putLongs(int rowId, int count, long value); + + /** + * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count) + */ + public abstract void putLongs(int rowId, int count, long[] src, int srcIndex); + + /** + * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count]) + * The data in src must be 8-byte little endian longs. + */ + public abstract void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex); + + /** + * Sets the value at rowId to `value`. + */ + public abstract void putFloat(int rowId, float value); + + /** + * Sets values from [rowId, rowId + count) to value. + */ + public abstract void putFloats(int rowId, int count, float value); + + /** + * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count) + */ + public abstract void putFloats(int rowId, int count, float[] src, int srcIndex); + + /** + * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count]) + * The data in src must be ieee formatted floats. + */ + public abstract void putFloats(int rowId, int count, byte[] src, int srcIndex); + + /** + * Sets the value at rowId to `value`. + */ + public abstract void putDouble(int rowId, double value); + + /** + * Sets values from [rowId, rowId + count) to value. + */ + public abstract void putDoubles(int rowId, int count, double value); + + /** + * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count) + */ + public abstract void putDoubles(int rowId, int count, double[] src, int srcIndex); + + /** + * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count]) + * The data in src must be ieee formatted doubles. + */ + public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex); + + /** + * Puts a byte array that already exists in this column. + */ + public abstract void putArray(int rowId, int offset, int length); + + /** + * Sets the value at rowId to `value`. + */ + public abstract int putByteArray(int rowId, byte[] value, int offset, int count); + public final int putByteArray(int rowId, byte[] value) { + return putByteArray(rowId, value, 0, value.length); + } + + /** + * Returns the value for rowId. + */ + private ColumnVector.Array getByteArray(int rowId) { + ColumnVector.Array array = getArray(rowId); + array.data.loadBytes(array); + return array; + } + + /** + * Returns the decimal for rowId. + */ + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + if (precision <= Decimal.MAX_INT_DIGITS()) { + return Decimal.createUnsafe(getInt(rowId), precision, scale); + } else if (precision <= Decimal.MAX_LONG_DIGITS()) { + return Decimal.createUnsafe(getLong(rowId), precision, scale); + } else { + // TODO: best perf? + byte[] bytes = getBinary(rowId); + BigInteger bigInteger = new BigInteger(bytes); + BigDecimal javaDecimal = new BigDecimal(bigInteger, scale); + return Decimal.apply(javaDecimal, precision, scale); + } + } + + public void putDecimal(int rowId, Decimal value, int precision) { + if (precision <= Decimal.MAX_INT_DIGITS()) { + putInt(rowId, (int) value.toUnscaledLong()); + } else if (precision <= Decimal.MAX_LONG_DIGITS()) { + putLong(rowId, value.toUnscaledLong()); + } else { + BigInteger bigInteger = value.toJavaBigDecimal().unscaledValue(); + putByteArray(rowId, bigInteger.toByteArray()); + } + } + + /** + * Returns the UTF8String for rowId. + */ + @Override + public UTF8String getUTF8String(int rowId) { + if (dictionary == null) { + ColumnVector.Array a = getByteArray(rowId); + return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length); + } else { + byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId)); + return UTF8String.fromBytes(bytes); + } + } + + /** + * Returns the byte array for rowId. + */ + @Override + public byte[] getBinary(int rowId) { + if (dictionary == null) { + ColumnVector.Array array = getByteArray(rowId); + byte[] bytes = new byte[array.length]; + System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length); + return bytes; + } else { + return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId)); + } + } + + /** + * Append APIs. These APIs all behave similarly and will append data to the current vector. It + * is not valid to mix the put and append APIs. The append APIs are slower and should only be + * used if the sizes are not known up front. + * In all these cases, the return value is the rowId for the first appended element. + */ + public final int appendNull() { + assert (!(dataType() instanceof StructType)); // Use appendStruct() + reserve(elementsAppended + 1); + putNull(elementsAppended); + return elementsAppended++; + } + + public final int appendNotNull() { + reserve(elementsAppended + 1); + putNotNull(elementsAppended); + return elementsAppended++; + } + + public final int appendNulls(int count) { + assert (!(dataType() instanceof StructType)); + reserve(elementsAppended + count); + int result = elementsAppended; + putNulls(elementsAppended, count); + elementsAppended += count; + return result; + } + + public final int appendNotNulls(int count) { + assert (!(dataType() instanceof StructType)); + reserve(elementsAppended + count); + int result = elementsAppended; + putNotNulls(elementsAppended, count); + elementsAppended += count; + return result; + } + + public final int appendBoolean(boolean v) { + reserve(elementsAppended + 1); + putBoolean(elementsAppended, v); + return elementsAppended++; + } + + public final int appendBooleans(int count, boolean v) { + reserve(elementsAppended + count); + int result = elementsAppended; + putBooleans(elementsAppended, count, v); + elementsAppended += count; + return result; + } + + public final int appendByte(byte v) { + reserve(elementsAppended + 1); + putByte(elementsAppended, v); + return elementsAppended++; + } + + public final int appendBytes(int count, byte v) { + reserve(elementsAppended + count); + int result = elementsAppended; + putBytes(elementsAppended, count, v); + elementsAppended += count; + return result; + } + + public final int appendBytes(int length, byte[] src, int offset) { + reserve(elementsAppended + length); + int result = elementsAppended; + putBytes(elementsAppended, length, src, offset); + elementsAppended += length; + return result; + } + + public final int appendShort(short v) { + reserve(elementsAppended + 1); + putShort(elementsAppended, v); + return elementsAppended++; + } + + public final int appendShorts(int count, short v) { + reserve(elementsAppended + count); + int result = elementsAppended; + putShorts(elementsAppended, count, v); + elementsAppended += count; + return result; + } + + public final int appendShorts(int length, short[] src, int offset) { + reserve(elementsAppended + length); + int result = elementsAppended; + putShorts(elementsAppended, length, src, offset); + elementsAppended += length; + return result; + } + + public final int appendInt(int v) { + reserve(elementsAppended + 1); + putInt(elementsAppended, v); + return elementsAppended++; + } + + public final int appendInts(int count, int v) { + reserve(elementsAppended + count); + int result = elementsAppended; + putInts(elementsAppended, count, v); + elementsAppended += count; + return result; + } + + public final int appendInts(int length, int[] src, int offset) { + reserve(elementsAppended + length); + int result = elementsAppended; + putInts(elementsAppended, length, src, offset); + elementsAppended += length; + return result; + } + + public final int appendLong(long v) { + reserve(elementsAppended + 1); + putLong(elementsAppended, v); + return elementsAppended++; + } + + public final int appendLongs(int count, long v) { + reserve(elementsAppended + count); + int result = elementsAppended; + putLongs(elementsAppended, count, v); + elementsAppended += count; + return result; + } + + public final int appendLongs(int length, long[] src, int offset) { + reserve(elementsAppended + length); + int result = elementsAppended; + putLongs(elementsAppended, length, src, offset); + elementsAppended += length; + return result; + } + + public final int appendFloat(float v) { + reserve(elementsAppended + 1); + putFloat(elementsAppended, v); + return elementsAppended++; + } + + public final int appendFloats(int count, float v) { + reserve(elementsAppended + count); + int result = elementsAppended; + putFloats(elementsAppended, count, v); + elementsAppended += count; + return result; + } + + public final int appendFloats(int length, float[] src, int offset) { + reserve(elementsAppended + length); + int result = elementsAppended; + putFloats(elementsAppended, length, src, offset); + elementsAppended += length; + return result; + } + + public final int appendDouble(double v) { + reserve(elementsAppended + 1); + putDouble(elementsAppended, v); + return elementsAppended++; + } + + public final int appendDoubles(int count, double v) { + reserve(elementsAppended + count); + int result = elementsAppended; + putDoubles(elementsAppended, count, v); + elementsAppended += count; + return result; + } + + public final int appendDoubles(int length, double[] src, int offset) { + reserve(elementsAppended + length); + int result = elementsAppended; + putDoubles(elementsAppended, length, src, offset); + elementsAppended += length; + return result; + } + + public final int appendByteArray(byte[] value, int offset, int length) { + int copiedOffset = arrayData().appendBytes(length, value, offset); + reserve(elementsAppended + 1); + putArray(elementsAppended, copiedOffset, length); + return elementsAppended++; + } + + public final int appendArray(int length) { + reserve(elementsAppended + 1); + putArray(elementsAppended, arrayData().elementsAppended, length); + return elementsAppended++; + } + + /** + * Appends a NULL struct. This *has* to be used for structs instead of appendNull() as this + * recursively appends a NULL to its children. + * We don't have this logic as the general appendNull implementation to optimize the more + * common non-struct case. + */ + public final int appendStruct(boolean isNull) { + if (isNull) { + appendNull(); + for (ColumnVector c: childColumns) { + if (c.type instanceof StructType) { + ((WritableColumnVector) c).appendStruct(true); + } else { + ((WritableColumnVector) c).appendNull(); + } + } + } else { + appendNotNull(); + } + return elementsAppended; + } + + /** + * Returns the data for the underlying array. + */ + @Override + public WritableColumnVector arrayData() { return childColumns[0]; } + + /** + * Returns the ordinal's child data column. + */ + @Override + public WritableColumnVector getChildColumn(int ordinal) { return childColumns[ordinal]; } + + /** + * Returns the elements appended. + */ + public final int getElementsAppended() { return elementsAppended; } + + /** + * Marks this column as being constant. + */ + public final void setIsConstant() { isConstant = true; } + + /** + * Maximum number of rows that can be stored in this column. + */ + protected int capacity; + + /** + * Upper limit for the maximum capacity for this column. + */ + @VisibleForTesting + protected int MAX_CAPACITY = Integer.MAX_VALUE; + + /** + * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks. + */ + protected int numNulls; + + /** + * True if there is at least one NULL byte set. This is an optimization for the writer, to skip + * having to clear NULL bits. + */ + protected boolean anyNullsSet; + + /** + * True if this column's values are fixed. This means the column values never change, even + * across resets. + */ + protected boolean isConstant; + + /** + * Default size of each array length value. This grows as necessary. + */ + protected static final int DEFAULT_ARRAY_LENGTH = 4; + + /** + * Current write cursor (row index) when appending data. + */ + protected int elementsAppended; + + /** + * If this is a nested type (array or struct), the column for the child data. + */ + protected WritableColumnVector[] childColumns; + + /** + * Update the dictionary. + */ + public void setDictionary(Dictionary dictionary) { + this.dictionary = dictionary; + } + + /** + * Reserve a integer column for ids of dictionary. + */ + public WritableColumnVector reserveDictionaryIds(int capacity) { + WritableColumnVector dictionaryIds = (WritableColumnVector) this.dictionaryIds; + if (dictionaryIds == null) { + dictionaryIds = reserveNewColumn(capacity, DataTypes.IntegerType); + this.dictionaryIds = dictionaryIds; + } else { + dictionaryIds.reset(); + dictionaryIds.reserve(capacity); + } + return dictionaryIds; + } + + /** + * Returns the underlying integer column for ids of dictionary. + */ + @Override + public WritableColumnVector getDictionaryIds() { + return (WritableColumnVector) dictionaryIds; + } + + /** + * Reserve a new column. + */ + protected abstract WritableColumnVector reserveNewColumn(int capacity, DataType type); + + /** + * Sets up the common state and also handles creating the child columns if this is a nested + * type. + */ + protected WritableColumnVector(int capacity, DataType type) { + super(type); + this.capacity = capacity; + + if (type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType + || DecimalType.isByteArrayDecimalType(type)) { + DataType childType; + int childCapacity = capacity; + if (type instanceof ArrayType) { + childType = ((ArrayType)type).elementType(); + } else { + childType = DataTypes.ByteType; + childCapacity *= DEFAULT_ARRAY_LENGTH; + } + this.childColumns = new WritableColumnVector[1]; + this.childColumns[0] = reserveNewColumn(childCapacity, childType); + this.resultArray = new ColumnVector.Array(this.childColumns[0]); + this.resultStruct = null; + } else if (type instanceof StructType) { + StructType st = (StructType)type; + this.childColumns = new WritableColumnVector[st.fields().length]; + for (int i = 0; i < childColumns.length; ++i) { + this.childColumns[i] = reserveNewColumn(capacity, st.fields()[i].dataType()); + } + this.resultArray = null; + this.resultStruct = new ColumnarBatch.Row(this.childColumns); + } else if (type instanceof CalendarIntervalType) { + // Two columns. Months as int. Microseconds as Long. + this.childColumns = new WritableColumnVector[2]; + this.childColumns[0] = reserveNewColumn(capacity, DataTypes.IntegerType); + this.childColumns[1] = reserveNewColumn(capacity, DataTypes.LongType); + this.resultArray = null; + this.resultStruct = new ColumnarBatch.Row(this.childColumns); + } else { + this.childColumns = null; + this.resultArray = null; + this.resultStruct = null; + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala index 0c40417..13f7927 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala @@ -76,6 +76,8 @@ class VectorizedHashMapGenerator( }.mkString("\n").concat(";") s""" + | private org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] batchVectors; + | private org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] bufferVectors; | private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; | private org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch; | private int[] buckets; @@ -89,14 +91,19 @@ class VectorizedHashMapGenerator( | $generatedAggBufferSchema | | public $generatedClassName() { - | batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema, - | org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); - | // TODO: Possibly generate this projection in HashAggregate directly - | aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate( - | aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); - | for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) { - | aggregateBufferBatch.setColumn(i, batch.column(i+${groupingKeys.length})); + | batchVectors = org.apache.spark.sql.execution.vectorized + | .OnHeapColumnVector.allocateColumns(capacity, schema); + | batch = new org.apache.spark.sql.execution.vectorized.ColumnarBatch( + | schema, batchVectors, capacity); + | + | bufferVectors = new org.apache.spark.sql.execution.vectorized + | .OnHeapColumnVector[aggregateBufferSchema.fields().length]; + | for (int i = 0; i < aggregateBufferSchema.fields().length; i++) { + | bufferVectors[i] = batchVectors[i + ${groupingKeys.length}]; | } + | // TODO: Possibly generate this projection in HashAggregate directly + | aggregateBufferBatch = new org.apache.spark.sql.execution.vectorized.ColumnarBatch( + | aggregateBufferSchema, bufferVectors, capacity); | | buckets = new int[numBuckets]; | java.util.Arrays.fill(buckets, -1); @@ -112,8 +119,8 @@ class VectorizedHashMapGenerator( * * {{{ * private boolean equals(int idx, long agg_key, long agg_key1) { - * return batch.column(0).getLong(buckets[idx]) == agg_key && - * batch.column(1).getLong(buckets[idx]) == agg_key1; + * return batchVectors[0].getLong(buckets[idx]) == agg_key && + * batchVectors[1].getLong(buckets[idx]) == agg_key1; * } * }}} */ @@ -121,8 +128,8 @@ class VectorizedHashMapGenerator( def genEqualsForKeys(groupingKeys: Seq[Buffer]): String = { groupingKeys.zipWithIndex.map { case (key: Buffer, ordinal: Int) => - s"""(${ctx.genEqual(key.dataType, ctx.getValue("batch", "buckets[idx]", - key.dataType, ordinal), key.name)})""" + s"""(${ctx.genEqual(key.dataType, ctx.getValue(s"batchVectors[$ordinal]", "buckets[idx]", + key.dataType), key.name)})""" }.mkString(" && ") } @@ -150,9 +157,9 @@ class VectorizedHashMapGenerator( * while (step < maxSteps) { * // Return bucket index if it's either an empty slot or already contains the key * if (buckets[idx] == -1) { - * batch.column(0).putLong(numRows, agg_key); - * batch.column(1).putLong(numRows, agg_key1); - * batch.column(2).putLong(numRows, 0); + * batchVectors[0].putLong(numRows, agg_key); + * batchVectors[1].putLong(numRows, agg_key1); + * batchVectors[2].putLong(numRows, 0); * buckets[idx] = numRows++; * return batch.getRow(buckets[idx]); * } else if (equals(idx, agg_key, agg_key1)) { @@ -170,13 +177,13 @@ class VectorizedHashMapGenerator( def genCodeToSetKeys(groupingKeys: Seq[Buffer]): Seq[String] = { groupingKeys.zipWithIndex.map { case (key: Buffer, ordinal: Int) => - ctx.setValue("batch", "numRows", key.dataType, ordinal, key.name) + ctx.setValue(s"batchVectors[$ordinal]", "numRows", key.dataType, key.name) } } def genCodeToSetAggBuffers(bufferValues: Seq[Buffer]): Seq[String] = { bufferValues.zipWithIndex.map { case (key: Buffer, ordinal: Int) => - ctx.updateColumn("batch", "numRows", key.dataType, groupingKeys.length + ordinal, + ctx.updateColumn(s"batchVectors[${groupingKeys.length + ordinal}]", "numRows", key.dataType, buffVars(ordinal), nullable = true) } } http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala index 67b3d98..1331f15 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala @@ -24,7 +24,10 @@ import scala.util.Random import org.apache.spark.memory.MemoryMode import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.vectorized.ColumnVector -import org.apache.spark.sql.types.{BinaryType, IntegerType} +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector +import org.apache.spark.sql.execution.vectorized.WritableColumnVector +import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType} import org.apache.spark.unsafe.Platform import org.apache.spark.util.Benchmark import org.apache.spark.util.collection.BitSet @@ -34,6 +37,14 @@ import org.apache.spark.util.collection.BitSet */ object ColumnarBatchBenchmark { + def allocate(capacity: Int, dt: DataType, memMode: MemoryMode): WritableColumnVector = { + if (memMode == MemoryMode.OFF_HEAP) { + new OffHeapColumnVector(capacity, dt) + } else { + new OnHeapColumnVector(capacity, dt) + } + } + // This benchmark reads and writes an array of ints. // TODO: there is a big (2x) penalty for a random access API for off heap. // Note: carefully if modifying this code. It's hard to reason about the JIT. @@ -140,7 +151,7 @@ object ColumnarBatchBenchmark { // Access through the column API with on heap memory val columnOnHeap = { i: Int => - val col = ColumnVector.allocate(count, IntegerType, MemoryMode.ON_HEAP) + val col = allocate(count, IntegerType, MemoryMode.ON_HEAP) var sum = 0L for (n <- 0L until iters) { var i = 0 @@ -159,7 +170,7 @@ object ColumnarBatchBenchmark { // Access through the column API with off heap memory def columnOffHeap = { i: Int => { - val col = ColumnVector.allocate(count, IntegerType, MemoryMode.OFF_HEAP) + val col = allocate(count, IntegerType, MemoryMode.OFF_HEAP) var sum = 0L for (n <- 0L until iters) { var i = 0 @@ -178,7 +189,7 @@ object ColumnarBatchBenchmark { // Access by directly getting the buffer backing the column. val columnOffheapDirect = { i: Int => - val col = ColumnVector.allocate(count, IntegerType, MemoryMode.OFF_HEAP) + val col = allocate(count, IntegerType, MemoryMode.OFF_HEAP) var sum = 0L for (n <- 0L until iters) { var addr = col.valuesNativeAddress() @@ -244,7 +255,7 @@ object ColumnarBatchBenchmark { // Adding values by appending, instead of putting. val onHeapAppend = { i: Int => - val col = ColumnVector.allocate(count, IntegerType, MemoryMode.ON_HEAP) + val col = allocate(count, IntegerType, MemoryMode.ON_HEAP) var sum = 0L for (n <- 0L until iters) { var i = 0 @@ -362,7 +373,7 @@ object ColumnarBatchBenchmark { .map(_.getBytes(StandardCharsets.UTF_8)).toArray def column(memoryMode: MemoryMode) = { i: Int => - val column = ColumnVector.allocate(count, BinaryType, memoryMode) + val column = allocate(count, BinaryType, memoryMode) var sum = 0L for (n <- 0L until iters) { var i = 0 http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index c8461dc..08ccbd6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -34,11 +34,20 @@ import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.types.CalendarInterval class ColumnarBatchSuite extends SparkFunSuite { + + def allocate(capacity: Int, dt: DataType, memMode: MemoryMode): WritableColumnVector = { + if (memMode == MemoryMode.OFF_HEAP) { + new OffHeapColumnVector(capacity, dt) + } else { + new OnHeapColumnVector(capacity, dt) + } + } + test("Null Apis") { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { val reference = mutable.ArrayBuffer.empty[Boolean] - val column = ColumnVector.allocate(1024, IntegerType, memMode) + val column = allocate(1024, IntegerType, memMode) var idx = 0 assert(column.anyNullsSet() == false) assert(column.numNulls() == 0) @@ -109,7 +118,7 @@ class ColumnarBatchSuite extends SparkFunSuite { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { val reference = mutable.ArrayBuffer.empty[Byte] - val column = ColumnVector.allocate(1024, ByteType, memMode) + val column = allocate(1024, ByteType, memMode) var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toByte).toArray column.appendBytes(2, values, 0) @@ -167,7 +176,7 @@ class ColumnarBatchSuite extends SparkFunSuite { val random = new Random(seed) val reference = mutable.ArrayBuffer.empty[Short] - val column = ColumnVector.allocate(1024, ShortType, memMode) + val column = allocate(1024, ShortType, memMode) var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toShort).toArray column.appendShorts(2, values, 0) @@ -247,7 +256,7 @@ class ColumnarBatchSuite extends SparkFunSuite { val random = new Random(seed) val reference = mutable.ArrayBuffer.empty[Int] - val column = ColumnVector.allocate(1024, IntegerType, memMode) + val column = allocate(1024, IntegerType, memMode) var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).toArray column.appendInts(2, values, 0) @@ -332,7 +341,7 @@ class ColumnarBatchSuite extends SparkFunSuite { val random = new Random(seed) val reference = mutable.ArrayBuffer.empty[Long] - val column = ColumnVector.allocate(1024, LongType, memMode) + val column = allocate(1024, LongType, memMode) var values = (10L :: 20L :: 30L :: 40L :: 50L :: Nil).toArray column.appendLongs(2, values, 0) @@ -419,7 +428,7 @@ class ColumnarBatchSuite extends SparkFunSuite { val random = new Random(seed) val reference = mutable.ArrayBuffer.empty[Float] - val column = ColumnVector.allocate(1024, FloatType, memMode) + val column = allocate(1024, FloatType, memMode) var values = (.1f :: .2f :: .3f :: .4f :: .5f :: Nil).toArray column.appendFloats(2, values, 0) @@ -510,7 +519,7 @@ class ColumnarBatchSuite extends SparkFunSuite { val random = new Random(seed) val reference = mutable.ArrayBuffer.empty[Double] - val column = ColumnVector.allocate(1024, DoubleType, memMode) + val column = allocate(1024, DoubleType, memMode) var values = (.1 :: .2 :: .3 :: .4 :: .5 :: Nil).toArray column.appendDoubles(2, values, 0) @@ -599,7 +608,7 @@ class ColumnarBatchSuite extends SparkFunSuite { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { val reference = mutable.ArrayBuffer.empty[String] - val column = ColumnVector.allocate(6, BinaryType, memMode) + val column = allocate(6, BinaryType, memMode) assert(column.arrayData().elementsAppended == 0) val str = "string" @@ -656,7 +665,7 @@ class ColumnarBatchSuite extends SparkFunSuite { test("Int Array") { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { - val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), memMode) + val column = allocate(10, new ArrayType(IntegerType, true), memMode) // Fill the underlying data with all the arrays back to back. val data = column.arrayData(); @@ -714,43 +723,43 @@ class ColumnarBatchSuite extends SparkFunSuite { (MemoryMode.ON_HEAP :: Nil).foreach { memMode => { val len = 4 - val columnBool = ColumnVector.allocate(len, new ArrayType(BooleanType, false), memMode) + val columnBool = allocate(len, new ArrayType(BooleanType, false), memMode) val boolArray = Array(false, true, false, true) boolArray.zipWithIndex.map { case (v, i) => columnBool.arrayData.putBoolean(i, v) } columnBool.putArray(0, 0, len) assert(columnBool.getArray(0).toBooleanArray === boolArray) - val columnByte = ColumnVector.allocate(len, new ArrayType(ByteType, false), memMode) + val columnByte = allocate(len, new ArrayType(ByteType, false), memMode) val byteArray = Array[Byte](0, 1, 2, 3) byteArray.zipWithIndex.map { case (v, i) => columnByte.arrayData.putByte(i, v) } columnByte.putArray(0, 0, len) assert(columnByte.getArray(0).toByteArray === byteArray) - val columnShort = ColumnVector.allocate(len, new ArrayType(ShortType, false), memMode) + val columnShort = allocate(len, new ArrayType(ShortType, false), memMode) val shortArray = Array[Short](0, 1, 2, 3) shortArray.zipWithIndex.map { case (v, i) => columnShort.arrayData.putShort(i, v) } columnShort.putArray(0, 0, len) assert(columnShort.getArray(0).toShortArray === shortArray) - val columnInt = ColumnVector.allocate(len, new ArrayType(IntegerType, false), memMode) + val columnInt = allocate(len, new ArrayType(IntegerType, false), memMode) val intArray = Array(0, 1, 2, 3) intArray.zipWithIndex.map { case (v, i) => columnInt.arrayData.putInt(i, v) } columnInt.putArray(0, 0, len) assert(columnInt.getArray(0).toIntArray === intArray) - val columnLong = ColumnVector.allocate(len, new ArrayType(LongType, false), memMode) + val columnLong = allocate(len, new ArrayType(LongType, false), memMode) val longArray = Array[Long](0, 1, 2, 3) longArray.zipWithIndex.map { case (v, i) => columnLong.arrayData.putLong(i, v) } columnLong.putArray(0, 0, len) assert(columnLong.getArray(0).toLongArray === longArray) - val columnFloat = ColumnVector.allocate(len, new ArrayType(FloatType, false), memMode) + val columnFloat = allocate(len, new ArrayType(FloatType, false), memMode) val floatArray = Array(0.0F, 1.1F, 2.2F, 3.3F) floatArray.zipWithIndex.map { case (v, i) => columnFloat.arrayData.putFloat(i, v) } columnFloat.putArray(0, 0, len) assert(columnFloat.getArray(0).toFloatArray === floatArray) - val columnDouble = ColumnVector.allocate(len, new ArrayType(DoubleType, false), memMode) + val columnDouble = allocate(len, new ArrayType(DoubleType, false), memMode) val doubleArray = Array(0.0, 1.1, 2.2, 3.3) doubleArray.zipWithIndex.map { case (v, i) => columnDouble.arrayData.putDouble(i, v) } columnDouble.putArray(0, 0, len) @@ -761,7 +770,7 @@ class ColumnarBatchSuite extends SparkFunSuite { test("Struct Column") { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { val schema = new StructType().add("int", IntegerType).add("double", DoubleType) - val column = ColumnVector.allocate(1024, schema, memMode) + val column = allocate(1024, schema, memMode) val c1 = column.getChildColumn(0) val c2 = column.getChildColumn(1) @@ -790,7 +799,7 @@ class ColumnarBatchSuite extends SparkFunSuite { test("Nest Array in Array.") { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => - val column = ColumnVector.allocate(10, new ArrayType(new ArrayType(IntegerType, true), true), + val column = allocate(10, new ArrayType(new ArrayType(IntegerType, true), true), memMode) val childColumn = column.arrayData() val data = column.arrayData().arrayData() @@ -823,7 +832,7 @@ class ColumnarBatchSuite extends SparkFunSuite { test("Nest Struct in Array.") { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => val schema = new StructType().add("int", IntegerType).add("long", LongType) - val column = ColumnVector.allocate(10, new ArrayType(schema, true), memMode) + val column = allocate(10, new ArrayType(schema, true), memMode) val data = column.arrayData() val c0 = data.getChildColumn(0) val c1 = data.getChildColumn(1) @@ -853,7 +862,7 @@ class ColumnarBatchSuite extends SparkFunSuite { val schema = new StructType() .add("int", IntegerType) .add("array", new ArrayType(IntegerType, true)) - val column = ColumnVector.allocate(10, schema, memMode) + val column = allocate(10, schema, memMode) val c0 = column.getChildColumn(0) val c1 = column.getChildColumn(1) c0.putInt(0, 0) @@ -885,7 +894,7 @@ class ColumnarBatchSuite extends SparkFunSuite { val schema = new StructType() .add("int", IntegerType) .add("struct", subSchema) - val column = ColumnVector.allocate(10, schema, memMode) + val column = allocate(10, schema, memMode) val c0 = column.getChildColumn(0) val c1 = column.getChildColumn(1) c0.putInt(0, 0) @@ -918,7 +927,11 @@ class ColumnarBatchSuite extends SparkFunSuite { .add("intCol2", IntegerType) .add("string", BinaryType) - val batch = ColumnarBatch.allocate(schema, memMode) + val capacity = ColumnarBatch.DEFAULT_BATCH_SIZE + val columns = schema.fields.map { field => + allocate(capacity, field.dataType, memMode) + } + val batch = new ColumnarBatch(schema, columns.toArray, ColumnarBatch.DEFAULT_BATCH_SIZE) assert(batch.numCols() == 4) assert(batch.numRows() == 0) assert(batch.numValidRows() == 0) @@ -926,10 +939,10 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(batch.rowIterator().hasNext == false) // Add a row [1, 1.1, NULL] - batch.column(0).putInt(0, 1) - batch.column(1).putDouble(0, 1.1) - batch.column(2).putNull(0) - batch.column(3).putByteArray(0, "Hello".getBytes(StandardCharsets.UTF_8)) + columns(0).putInt(0, 1) + columns(1).putDouble(0, 1.1) + columns(2).putNull(0) + columns(3).putByteArray(0, "Hello".getBytes(StandardCharsets.UTF_8)) batch.setNumRows(1) // Verify the results of the row. @@ -939,12 +952,12 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(batch.rowIterator().hasNext == true) assert(batch.rowIterator().hasNext == true) - assert(batch.column(0).getInt(0) == 1) - assert(batch.column(0).isNullAt(0) == false) - assert(batch.column(1).getDouble(0) == 1.1) - assert(batch.column(1).isNullAt(0) == false) - assert(batch.column(2).isNullAt(0) == true) - assert(batch.column(3).getUTF8String(0).toString == "Hello") + assert(columns(0).getInt(0) == 1) + assert(columns(0).isNullAt(0) == false) + assert(columns(1).getDouble(0) == 1.1) + assert(columns(1).isNullAt(0) == false) + assert(columns(2).isNullAt(0) == true) + assert(columns(3).getUTF8String(0).toString == "Hello") // Verify the iterator works correctly. val it = batch.rowIterator() @@ -955,7 +968,7 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(row.getDouble(1) == 1.1) assert(row.isNullAt(1) == false) assert(row.isNullAt(2) == true) - assert(batch.column(3).getUTF8String(0).toString == "Hello") + assert(columns(3).getUTF8String(0).toString == "Hello") assert(it.hasNext == false) assert(it.hasNext == false) @@ -972,20 +985,20 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(batch.rowIterator().hasNext == false) // Add rows [NULL, 2.2, 2, "abc"], [3, NULL, 3, ""], [4, 4.4, 4, "world] - batch.column(0).putNull(0) - batch.column(1).putDouble(0, 2.2) - batch.column(2).putInt(0, 2) - batch.column(3).putByteArray(0, "abc".getBytes(StandardCharsets.UTF_8)) - - batch.column(0).putInt(1, 3) - batch.column(1).putNull(1) - batch.column(2).putInt(1, 3) - batch.column(3).putByteArray(1, "".getBytes(StandardCharsets.UTF_8)) - - batch.column(0).putInt(2, 4) - batch.column(1).putDouble(2, 4.4) - batch.column(2).putInt(2, 4) - batch.column(3).putByteArray(2, "world".getBytes(StandardCharsets.UTF_8)) + columns(0).putNull(0) + columns(1).putDouble(0, 2.2) + columns(2).putInt(0, 2) + columns(3).putByteArray(0, "abc".getBytes(StandardCharsets.UTF_8)) + + columns(0).putInt(1, 3) + columns(1).putNull(1) + columns(2).putInt(1, 3) + columns(3).putByteArray(1, "".getBytes(StandardCharsets.UTF_8)) + + columns(0).putInt(2, 4) + columns(1).putDouble(2, 4.4) + columns(2).putInt(2, 4) + columns(3).putByteArray(2, "world".getBytes(StandardCharsets.UTF_8)) batch.setNumRows(3) def rowEquals(x: InternalRow, y: Row): Unit = { @@ -1232,7 +1245,7 @@ class ColumnarBatchSuite extends SparkFunSuite { test("exceeding maximum capacity should throw an error") { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => - val column = ColumnVector.allocate(1, ByteType, memMode) + val column = allocate(1, ByteType, memMode) column.MAX_CAPACITY = 15 column.appendBytes(5, 0.toByte) // Successfully allocate twice the requested capacity --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org