This is an automated email from the ASF dual-hosted git repository.

felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ce09169f8 [GLUTEN-7205] [VL] Optimize row to column for scalar type 
(#7206)
ce09169f8 is described below

commit ce09169f87940aa16d2289e8db7cae09dfea9d31
Author: Jin Chengcheng <[email protected]>
AuthorDate: Fri Sep 13 05:39:24 2024 +0800

    [GLUTEN-7205] [VL] Optimize row to column for scalar type (#7206)
    
    Optimize current deserialize data one by one to batch deserialization, the 
performance will improve more than 3 times.
---
 .../serializer/VeloxRowToColumnarConverter.cc      | 260 +++++++++++++++++++++
 .../serializer/VeloxRowToColumnarConverter.h       |   3 +-
 2 files changed, 262 insertions(+), 1 deletion(-)

diff --git a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc 
b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc
index 75fa3c3d3..5dec7db23 100644
--- a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc
+++ b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc
@@ -22,6 +22,239 @@
 
 using namespace facebook::velox;
 namespace gluten {
+namespace {
+
+inline int64_t calculateBitSetWidthInBytes(int32_t numFields) {
+  return ((numFields + 63) / 64) * 8;
+}
+
+inline int64_t getFieldOffset(int64_t nullBitsetWidthInBytes, int32_t index) {
+  return nullBitsetWidthInBytes + 8L * index;
+}
+
+inline bool isNull(uint8_t* buffer_address, int32_t index) {
+  int64_t mask = 1L << (index & 0x3f); // mod 64 and shift
+  int64_t wordOffset = (index >> 6) * 8;
+  int64_t value = *((int64_t*)(buffer_address + wordOffset));
+  return (value & mask) != 0;
+}
+
+int32_t getTotalStringSize(
+    int32_t columnIdx,
+    int32_t numRows,
+    int64_t fieldOffset,
+    std::vector<int64_t>& offsets,
+    uint8_t* memoryAddress) {
+  size_t size = 0;
+  for (auto pos = 0; pos < numRows; pos++) {
+    if (isNull(memoryAddress + offsets[pos], columnIdx)) {
+      continue;
+    }
+
+    int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + 
fieldOffset);
+    int32_t length = static_cast<int32_t>(offsetAndSize);
+    if (!StringView::isInline(length)) {
+      size += length;
+    }
+  }
+  return size;
+}
+
+template <TypeKind Kind>
+VectorPtr createFlatVector(
+    const TypePtr& type,
+    int32_t columnIdx,
+    int32_t numRows,
+    int64_t fieldOffset,
+    std::vector<int64_t>& offsets,
+    uint8_t* memoryAddress,
+    memory::MemoryPool* pool) {
+  using T = typename TypeTraits<Kind>::NativeType;
+  auto typeWidth = sizeof(T);
+  auto column = BaseVector::create<FlatVector<T>>(type, numRows, pool);
+  auto rawValues = column->template mutableRawValues<uint8_t>();
+  auto shift = __builtin_ctz((uint32_t)typeWidth);
+  for (auto pos = 0; pos < numRows; pos++) {
+    if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
+      const uint8_t* srcptr = (memoryAddress + offsets[pos] + fieldOffset);
+      uint8_t* destptr = rawValues + (pos << shift);
+      memcpy(destptr, srcptr, typeWidth);
+    } else {
+      column->setNull(pos, true);
+    }
+  }
+  return column;
+}
+
+template <>
+VectorPtr createFlatVector<TypeKind::HUGEINT>(
+    const TypePtr& type,
+    int32_t columnIdx,
+    int32_t numRows,
+    int64_t fieldOffset,
+    std::vector<int64_t>& offsets,
+    uint8_t* memoryAddress,
+    memory::MemoryPool* pool) {
+  auto column = BaseVector::create<FlatVector<int128_t>>(type, numRows, pool);
+  auto rawValues = column->mutableRawValues<uint8_t>();
+  auto typeWidth = sizeof(int128_t);
+  auto shift = __builtin_ctz((uint32_t)typeWidth);
+  for (auto pos = 0; pos < numRows; pos++) {
+    if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
+      uint8_t* destptr = rawValues + (pos << shift);
+      int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + 
fieldOffset);
+      int32_t length = static_cast<int32_t>(offsetAndSize);
+      int32_t wordoffset = static_cast<int32_t>(offsetAndSize >> 32);
+      uint8_t bytesValue[length];
+      memcpy(bytesValue, memoryAddress + offsets[pos] + wordoffset, length);
+      uint8_t bytesValue2[16]{};
+      for (int k = length - 1; k >= 0; k--) {
+        bytesValue2[length - 1 - k] = bytesValue[k];
+      }
+      if (int8_t(bytesValue[0]) < 0) {
+        memset(bytesValue2 + length, 255, 16 - length);
+      }
+      memcpy(destptr, bytesValue2, typeWidth);
+    } else {
+      column->setNull(pos, true);
+    }
+  }
+  return column;
+}
+
+template <>
+VectorPtr createFlatVector<TypeKind::BOOLEAN>(
+    const TypePtr& type,
+    int32_t columnIdx,
+    int32_t numRows,
+    int64_t fieldOffset,
+    std::vector<int64_t>& offsets,
+    uint8_t* memoryAddress,
+    memory::MemoryPool* pool) {
+  auto column = BaseVector::create<FlatVector<bool>>(type, numRows, pool);
+  auto rawValues = column->mutableRawValues<uint64_t>();
+  for (auto pos = 0; pos < numRows; pos++) {
+    if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
+      bool value = *(bool*)(memoryAddress + offsets[pos] + fieldOffset);
+      bits::setBit(rawValues, pos, value);
+    } else {
+      column->setNull(pos, true);
+    }
+  }
+  return column;
+}
+
+template <>
+VectorPtr createFlatVector<TypeKind::TIMESTAMP>(
+    const TypePtr& type,
+    int32_t columnIdx,
+    int32_t numRows,
+    int64_t fieldOffset,
+    std::vector<int64_t>& offsets,
+    uint8_t* memoryAddress,
+    memory::MemoryPool* pool) {
+  auto column = BaseVector::create<FlatVector<Timestamp>>(type, numRows, pool);
+  for (auto pos = 0; pos < numRows; pos++) {
+    if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
+      int64_t value = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset);
+      column->set(pos, Timestamp::fromMicros(value));
+    } else {
+      column->setNull(pos, true);
+    }
+  }
+  return column;
+}
+
+VectorPtr createFlatVectorStringView(
+    const TypePtr& type,
+    int32_t columnIdx,
+    int32_t numRows,
+    int64_t fieldOffset,
+    std::vector<int64_t>& offsets,
+    uint8_t* memoryAddress,
+    memory::MemoryPool* pool) {
+  auto column = BaseVector::create<FlatVector<StringView>>(type, numRows, 
pool);
+  auto size = getTotalStringSize(columnIdx, numRows, fieldOffset, offsets, 
memoryAddress);
+  char* rawBuffer = column->getRawStringBufferWithSpace(size, true);
+  for (auto pos = 0; pos < numRows; pos++) {
+    if (!isNull(memoryAddress + offsets[pos], columnIdx)) {
+      int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + 
fieldOffset);
+      int32_t length = static_cast<int32_t>(offsetAndSize);
+      int32_t wordoffset = static_cast<int32_t>(offsetAndSize >> 32);
+      auto valueSrcPtr = memoryAddress + offsets[pos] + wordoffset;
+      if (StringView::isInline(length)) {
+        column->set(pos, StringView(reinterpret_cast<char*>(valueSrcPtr), 
length));
+      } else {
+        memcpy(rawBuffer, valueSrcPtr, length);
+        column->setNoCopy(pos, StringView(rawBuffer, length));
+        rawBuffer += length;
+      }
+    } else {
+      column->setNull(pos, true);
+    }
+  }
+  return column;
+}
+
+template <>
+VectorPtr createFlatVector<TypeKind::VARCHAR>(
+    const TypePtr& type,
+    int32_t columnIdx,
+    int32_t numRows,
+    int64_t fieldOffset,
+    std::vector<int64_t>& offsets,
+    uint8_t* memoryAddress,
+    memory::MemoryPool* pool) {
+  return createFlatVectorStringView(type, columnIdx, numRows, fieldOffset, 
offsets, memoryAddress, pool);
+}
+
+template <>
+VectorPtr createFlatVector<TypeKind::VARBINARY>(
+    const TypePtr& type,
+    int32_t columnIdx,
+    int32_t numRows,
+    int64_t fieldOffset,
+    std::vector<int64_t>& offsets,
+    uint8_t* memoryAddress,
+    memory::MemoryPool* pool) {
+  return createFlatVectorStringView(type, columnIdx, numRows, fieldOffset, 
offsets, memoryAddress, pool);
+}
+
+template <>
+VectorPtr createFlatVector<TypeKind::UNKNOWN>(
+    const TypePtr& /*type*/,
+    int32_t /*columnIdx*/,
+    int32_t numRows,
+    int64_t /*fieldOffset*/,
+    std::vector<int64_t>& /*offsets*/,
+    uint8_t* /*memoryAddress*/,
+    memory::MemoryPool* pool) {
+  auto nulls = allocateNulls(numRows, pool, bits::kNull);
+  return std::make_shared<FlatVector<UnknownValue>>(
+      pool,
+      UNKNOWN(),
+      nulls,
+      numRows,
+      nullptr, // values
+      std::vector<BufferPtr>{}); // stringBuffers
+}
+
+bool supporteType(const RowTypePtr rowType) {
+  for (auto i = 0; i < rowType->size(); i++) {
+    auto kind = rowType->childAt(i)->kind();
+    switch (kind) {
+      case TypeKind::ARRAY:
+      case TypeKind::MAP:
+      case TypeKind::ROW:
+        return false;
+      default:
+        break;
+    }
+  }
+  return true;
+}
+
+} // namespace
 VeloxRowToColumnarConverter::VeloxRowToColumnarConverter(
     struct ArrowSchema* cSchema,
     std::shared_ptr<memory::MemoryPool> memoryPool)
@@ -32,6 +265,9 @@ VeloxRowToColumnarConverter::VeloxRowToColumnarConverter(
 
 std::shared_ptr<ColumnarBatch>
 VeloxRowToColumnarConverter::convert(int64_t numRows, int64_t* rowLength, 
uint8_t* memoryAddress) {
+  if (supporteType(asRowType(rowType_))) {
+    return convertPrimitive(numRows, rowLength, memoryAddress);
+  }
   std::vector<std::optional<std::string_view>> data;
   int64_t offset = 0;
   for (auto i = 0; i < numRows; i++) {
@@ -41,4 +277,28 @@ VeloxRowToColumnarConverter::convert(int64_t numRows, 
int64_t* rowLength, uint8_
   auto vp = row::UnsafeRowDeserializer::deserialize(data, rowType_, 
pool_.get());
   return 
std::make_shared<VeloxColumnarBatch>(std::dynamic_pointer_cast<RowVector>(vp));
 }
+
+std::shared_ptr<ColumnarBatch>
+VeloxRowToColumnarConverter::convertPrimitive(int64_t numRows, int64_t* 
rowLength, uint8_t* memoryAddress) {
+  auto numFields = rowType_->size();
+  int64_t nullBitsetWidthInBytes = calculateBitSetWidthInBytes(numFields);
+  std::vector<int64_t> offsets;
+  offsets.resize(numRows);
+  for (auto i = 1; i < numRows; i++) {
+    offsets[i] = offsets[i - 1] + rowLength[i - 1];
+  }
+
+  std::vector<VectorPtr> columns;
+  columns.resize(numFields);
+
+  for (auto i = 0; i < numFields; i++) {
+    auto fieldOffset = getFieldOffset(nullBitsetWidthInBytes, i);
+    auto& type = rowType_->childAt(i);
+    columns[i] = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
+        createFlatVector, type->kind(), type, i, numRows, fieldOffset, 
offsets, memoryAddress, pool_.get());
+  }
+
+  auto rowVector = std::make_shared<RowVector>(pool_.get(), rowType_, 
BufferPtr(nullptr), numRows, std::move(columns));
+  return std::make_shared<VeloxColumnarBatch>(rowVector);
+}
 } // namespace gluten
diff --git a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h 
b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h
index 30006c4f0..c064b9d3c 100644
--- a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h
+++ b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h
@@ -33,7 +33,8 @@ class VeloxRowToColumnarConverter final : public 
RowToColumnarConverter {
 
   std::shared_ptr<ColumnarBatch> convert(int64_t numRows, int64_t* rowLength, 
uint8_t* memoryAddress);
 
- protected:
+ private:
+  std::shared_ptr<ColumnarBatch> convertPrimitive(int64_t numRows, int64_t* 
rowLength, uint8_t* memoryAddress);
   facebook::velox::TypePtr rowType_;
   std::shared_ptr<facebook::velox::memory::MemoryPool> pool_;
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to