This is an automated email from the ASF dual-hosted git repository.
felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ce09169f8 [GLUTEN-7205] [VL] Optimize row to column for scalar type
(#7206)
ce09169f8 is described below
commit ce09169f87940aa16d2289e8db7cae09dfea9d31
Author: Jin Chengcheng <[email protected]>
AuthorDate: Fri Sep 13 05:39:24 2024 +0800
[GLUTEN-7205] [VL] Optimize row to column for scalar type (#7206)
Optimize current deserialize data one by one to batch deserialization, the
performance will improve more than 3 times.
---
.../serializer/VeloxRowToColumnarConverter.cc | 260 +++++++++++++++++++++
.../serializer/VeloxRowToColumnarConverter.h | 3 +-
2 files changed, 262 insertions(+), 1 deletion(-)
diff --git a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc
b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc
index 75fa3c3d3..5dec7db23 100644
--- a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc
+++ b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc
@@ -22,6 +22,239 @@
using namespace facebook::velox;
namespace gluten {
+namespace {
+
+inline int64_t calculateBitSetWidthInBytes(int32_t numFields) {
+ return ((numFields + 63) / 64) * 8;
+}
+
+inline int64_t getFieldOffset(int64_t nullBitsetWidthInBytes, int32_t index) {
+ return nullBitsetWidthInBytes + 8L * index;
+}
+
+inline bool isNull(uint8_t* buffer_address, int32_t index) {
+ int64_t mask = 1L << (index & 0x3f); // mod 64 and shift
+ int64_t wordOffset = (index >> 6) * 8;
+ int64_t value = *((int64_t*)(buffer_address + wordOffset));
+ return (value & mask) != 0;
+}
+
+int32_t getTotalStringSize(
+ int32_t columnIdx,
+ int32_t numRows,
+ int64_t fieldOffset,
+ std::vector<int64_t>& offsets,
+ uint8_t* memoryAddress) {
+ size_t size = 0;
+ for (auto pos = 0; pos < numRows; pos++) {
+ if (isNull(memoryAddress + offsets[pos], columnIdx)) {
+ continue;
+ }
+
+ int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] +
fieldOffset);
+ int32_t length = static_cast<int32_t>(offsetAndSize);
+ if (!StringView::isInline(length)) {
+ size += length;
+ }
+ }
+ return size;
+}
+
+template <TypeKind Kind>
+VectorPtr createFlatVector(
+ const TypePtr& type,
+ int32_t columnIdx,
+ int32_t numRows,
+ int64_t fieldOffset,
+ std::vector<int64_t>& offsets,
+ uint8_t* memoryAddress,
+ memory::MemoryPool* pool) {
+ using T = typename TypeTraits<Kind>::NativeType;
+ auto typeWidth = sizeof(T);
+ auto column = BaseVector::create<FlatVector<T>>(type, numRows, pool);
+ auto rawValues = column->template mutableRawValues<uint8_t>();
+ auto shift = __builtin_ctz((uint32_t)typeWidth);
+ for (auto pos = 0; pos < numRows; pos++) {
+ if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
+ const uint8_t* srcptr = (memoryAddress + offsets[pos] + fieldOffset);
+ uint8_t* destptr = rawValues + (pos << shift);
+ memcpy(destptr, srcptr, typeWidth);
+ } else {
+ column->setNull(pos, true);
+ }
+ }
+ return column;
+}
+
+template <>
+VectorPtr createFlatVector<TypeKind::HUGEINT>(
+ const TypePtr& type,
+ int32_t columnIdx,
+ int32_t numRows,
+ int64_t fieldOffset,
+ std::vector<int64_t>& offsets,
+ uint8_t* memoryAddress,
+ memory::MemoryPool* pool) {
+ auto column = BaseVector::create<FlatVector<int128_t>>(type, numRows, pool);
+ auto rawValues = column->mutableRawValues<uint8_t>();
+ auto typeWidth = sizeof(int128_t);
+ auto shift = __builtin_ctz((uint32_t)typeWidth);
+ for (auto pos = 0; pos < numRows; pos++) {
+ if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
+ uint8_t* destptr = rawValues + (pos << shift);
+ int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] +
fieldOffset);
+ int32_t length = static_cast<int32_t>(offsetAndSize);
+ int32_t wordoffset = static_cast<int32_t>(offsetAndSize >> 32);
+ uint8_t bytesValue[length];
+ memcpy(bytesValue, memoryAddress + offsets[pos] + wordoffset, length);
+ uint8_t bytesValue2[16]{};
+ for (int k = length - 1; k >= 0; k--) {
+ bytesValue2[length - 1 - k] = bytesValue[k];
+ }
+ if (int8_t(bytesValue[0]) < 0) {
+ memset(bytesValue2 + length, 255, 16 - length);
+ }
+ memcpy(destptr, bytesValue2, typeWidth);
+ } else {
+ column->setNull(pos, true);
+ }
+ }
+ return column;
+}
+
+template <>
+VectorPtr createFlatVector<TypeKind::BOOLEAN>(
+ const TypePtr& type,
+ int32_t columnIdx,
+ int32_t numRows,
+ int64_t fieldOffset,
+ std::vector<int64_t>& offsets,
+ uint8_t* memoryAddress,
+ memory::MemoryPool* pool) {
+ auto column = BaseVector::create<FlatVector<bool>>(type, numRows, pool);
+ auto rawValues = column->mutableRawValues<uint64_t>();
+ for (auto pos = 0; pos < numRows; pos++) {
+ if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
+ bool value = *(bool*)(memoryAddress + offsets[pos] + fieldOffset);
+ bits::setBit(rawValues, pos, value);
+ } else {
+ column->setNull(pos, true);
+ }
+ }
+ return column;
+}
+
+template <>
+VectorPtr createFlatVector<TypeKind::TIMESTAMP>(
+ const TypePtr& type,
+ int32_t columnIdx,
+ int32_t numRows,
+ int64_t fieldOffset,
+ std::vector<int64_t>& offsets,
+ uint8_t* memoryAddress,
+ memory::MemoryPool* pool) {
+ auto column = BaseVector::create<FlatVector<Timestamp>>(type, numRows, pool);
+ for (auto pos = 0; pos < numRows; pos++) {
+ if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
+ int64_t value = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset);
+ column->set(pos, Timestamp::fromMicros(value));
+ } else {
+ column->setNull(pos, true);
+ }
+ }
+ return column;
+}
+
+VectorPtr createFlatVectorStringView(
+ const TypePtr& type,
+ int32_t columnIdx,
+ int32_t numRows,
+ int64_t fieldOffset,
+ std::vector<int64_t>& offsets,
+ uint8_t* memoryAddress,
+ memory::MemoryPool* pool) {
+ auto column = BaseVector::create<FlatVector<StringView>>(type, numRows,
pool);
+ auto size = getTotalStringSize(columnIdx, numRows, fieldOffset, offsets,
memoryAddress);
+ char* rawBuffer = column->getRawStringBufferWithSpace(size, true);
+ for (auto pos = 0; pos < numRows; pos++) {
+ if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
+ int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] +
fieldOffset);
+ int32_t length = static_cast<int32_t>(offsetAndSize);
+ int32_t wordoffset = static_cast<int32_t>(offsetAndSize >> 32);
+ auto valueSrcPtr = memoryAddress + offsets[pos] + wordoffset;
+ if (StringView::isInline(length)) {
+ column->set(pos, StringView(reinterpret_cast<char*>(valueSrcPtr),
length));
+ } else {
+ memcpy(rawBuffer, valueSrcPtr, length);
+ column->setNoCopy(pos, StringView(rawBuffer, length));
+ rawBuffer += length;
+ }
+ } else {
+ column->setNull(pos, true);
+ }
+ }
+ return column;
+}
+
+template <>
+VectorPtr createFlatVector<TypeKind::VARCHAR>(
+ const TypePtr& type,
+ int32_t columnIdx,
+ int32_t numRows,
+ int64_t fieldOffset,
+ std::vector<int64_t>& offsets,
+ uint8_t* memoryAddress,
+ memory::MemoryPool* pool) {
+ return createFlatVectorStringView(type, columnIdx, numRows, fieldOffset,
offsets, memoryAddress, pool);
+}
+
+template <>
+VectorPtr createFlatVector<TypeKind::VARBINARY>(
+ const TypePtr& type,
+ int32_t columnIdx,
+ int32_t numRows,
+ int64_t fieldOffset,
+ std::vector<int64_t>& offsets,
+ uint8_t* memoryAddress,
+ memory::MemoryPool* pool) {
+ return createFlatVectorStringView(type, columnIdx, numRows, fieldOffset,
offsets, memoryAddress, pool);
+}
+
+template <>
+VectorPtr createFlatVector<TypeKind::UNKNOWN>(
+ const TypePtr& /*type*/,
+ int32_t /*columnIdx*/,
+ int32_t numRows,
+ int64_t /*fieldOffset*/,
+ std::vector<int64_t>& /*offsets*/,
+ uint8_t* /*memoryAddress*/,
+ memory::MemoryPool* pool) {
+ auto nulls = allocateNulls(numRows, pool, bits::kNull);
+ return std::make_shared<FlatVector<UnknownValue>>(
+ pool,
+ UNKNOWN(),
+ nulls,
+ numRows,
+ nullptr, // values
+ std::vector<BufferPtr>{}); // stringBuffers
+}
+
+bool supporteType(const RowTypePtr rowType) {
+ for (auto i = 0; i < rowType->size(); i++) {
+ auto kind = rowType->childAt(i)->kind();
+ switch (kind) {
+ case TypeKind::ARRAY:
+ case TypeKind::MAP:
+ case TypeKind::ROW:
+ return false;
+ default:
+ break;
+ }
+ }
+ return true;
+}
+
+} // namespace
VeloxRowToColumnarConverter::VeloxRowToColumnarConverter(
struct ArrowSchema* cSchema,
std::shared_ptr<memory::MemoryPool> memoryPool)
@@ -32,6 +265,9 @@ VeloxRowToColumnarConverter::VeloxRowToColumnarConverter(
std::shared_ptr<ColumnarBatch>
VeloxRowToColumnarConverter::convert(int64_t numRows, int64_t* rowLength,
uint8_t* memoryAddress) {
+ if (supporteType(asRowType(rowType_))) {
+ return convertPrimitive(numRows, rowLength, memoryAddress);
+ }
std::vector<std::optional<std::string_view>> data;
int64_t offset = 0;
for (auto i = 0; i < numRows; i++) {
@@ -41,4 +277,28 @@ VeloxRowToColumnarConverter::convert(int64_t numRows,
int64_t* rowLength, uint8_
auto vp = row::UnsafeRowDeserializer::deserialize(data, rowType_,
pool_.get());
return
std::make_shared<VeloxColumnarBatch>(std::dynamic_pointer_cast<RowVector>(vp));
}
+
+std::shared_ptr<ColumnarBatch>
+VeloxRowToColumnarConverter::convertPrimitive(int64_t numRows, int64_t*
rowLength, uint8_t* memoryAddress) {
+ auto numFields = rowType_->size();
+ int64_t nullBitsetWidthInBytes = calculateBitSetWidthInBytes(numFields);
+ std::vector<int64_t> offsets;
+ offsets.resize(numRows);
+ for (auto i = 1; i < numRows; i++) {
+ offsets[i] = offsets[i - 1] + rowLength[i - 1];
+ }
+
+ std::vector<VectorPtr> columns;
+ columns.resize(numFields);
+
+ for (auto i = 0; i < numFields; i++) {
+ auto fieldOffset = getFieldOffset(nullBitsetWidthInBytes, i);
+ auto& type = rowType_->childAt(i);
+ columns[i] = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
+ createFlatVector, type->kind(), type, i, numRows, fieldOffset,
offsets, memoryAddress, pool_.get());
+ }
+
+ auto rowVector = std::make_shared<RowVector>(pool_.get(), rowType_,
BufferPtr(nullptr), numRows, std::move(columns));
+ return std::make_shared<VeloxColumnarBatch>(rowVector);
+}
} // namespace gluten
diff --git a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h
b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h
index 30006c4f0..c064b9d3c 100644
--- a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h
+++ b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h
@@ -33,7 +33,8 @@ class VeloxRowToColumnarConverter final : public
RowToColumnarConverter {
std::shared_ptr<ColumnarBatch> convert(int64_t numRows, int64_t* rowLength,
uint8_t* memoryAddress);
- protected:
+ private:
+ std::shared_ptr<ColumnarBatch> convertPrimitive(int64_t numRows, int64_t*
rowLength, uint8_t* memoryAddress);
facebook::velox::TypePtr rowType_;
std::shared_ptr<facebook::velox::memory::MemoryPool> pool_;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]