This is an automated email from the ASF dual-hosted git repository.

zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 7bb00d1632 [VL] Delta: Add native Delta DV reader support (#12040)
7bb00d1632 is described below

commit 7bb00d1632519681e19e6b3cae9700b974ac42ad
Author: Mohammad Linjawi <[email protected]>
AuthorDate: Fri May 22 10:17:15 2026 +0300

    [VL] Delta: Add native Delta DV reader support (#12040)
---
 cpp/core/compute/Runtime.h                         |   5 +
 cpp/velox/CMakeLists.txt                           |   7 +-
 cpp/velox/compute/VeloxBackend.cc                  |   7 +
 cpp/velox/compute/VeloxBackend.h                   |   4 +
 cpp/velox/compute/VeloxConnectorIds.h              |   2 +
 cpp/velox/compute/VeloxRuntime.cc                  |  13 ++
 cpp/velox/compute/WholeStageResultIterator.cc      |   1 +
 cpp/velox/compute/delta/DeltaConnector.cpp         |  48 +++++
 cpp/velox/compute/delta/DeltaConnector.h           |  70 +++++++
 cpp/velox/compute/delta/DeltaDataSource.cpp        |  98 +++++++++
 cpp/velox/compute/delta/DeltaDataSource.h          |  78 ++++++++
 .../compute/delta/DeltaDeletionVectorReader.cpp    | 209 ++++++++++++++++++++
 .../compute/delta/DeltaDeletionVectorReader.h      | 110 +++++++++++
 cpp/velox/compute/delta/DeltaSplit.cpp             |  75 +++++++
 cpp/velox/compute/delta/DeltaSplit.h               | 122 ++++++++++++
 cpp/velox/compute/delta/DeltaSplitReader.cpp       | 208 +++++++++++++++++++
 cpp/velox/compute/delta/DeltaSplitReader.h         | 118 +++++++++++
 cpp/velox/compute/delta/tests/CMakeLists.txt       |  12 ++
 .../compute/delta/tests/DeltaConnectorTest.cpp     | 180 +++++++++++++++++
 .../delta/tests/DeltaDeletionVectorReaderTest.cpp  | 219 +++++++++++++++++++++
 cpp/velox/compute/delta/tests/DeltaSplitTest.cpp   |  95 +++++++++
 21 files changed, 1680 insertions(+), 1 deletion(-)

diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index 9d8315731f..4ab944898b 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -56,6 +56,11 @@ struct SparkTaskInfo {
   }
 };
 
+struct SplitPayloadBufferView {
+  const uint8_t* data;
+  int32_t size;
+};
+
 class Runtime : public std::enable_shared_from_this<Runtime> {
  public:
   using Factory = std::function<Runtime*(
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index a8b0d668ca..a97c6e60b3 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -160,6 +160,11 @@ set(VELOX_SRCS
     compute/VeloxRuntime.cc
     compute/VeloxPlanConverter.cc
     compute/WholeStageResultIterator.cc
+    compute/delta/DeltaConnector.cpp
+    compute/delta/DeltaDataSource.cpp
+    compute/delta/DeltaDeletionVectorReader.cpp
+    compute/delta/DeltaSplit.cpp
+    compute/delta/DeltaSplitReader.cpp
     compute/delta/RoaringBitmapArray.cpp
     compute/iceberg/IcebergPlanConverter.cc
     jni/JniFileSystem.cc
@@ -403,8 +408,8 @@ find_package(
 target_link_libraries(velox PUBLIC ICU::i18n ICU::uc ICU::data)
 
 if(BUILD_TESTS)
-  add_subdirectory(tests)
   add_subdirectory(compute/delta/tests)
+  add_subdirectory(tests)
 endif()
 
 if(BUILD_BENCHMARKS)
diff --git a/cpp/velox/compute/VeloxBackend.cc 
b/cpp/velox/compute/VeloxBackend.cc
index 85a8622508..801fc9d835 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -21,6 +21,7 @@
 #include <folly/executors/CPUThreadPoolExecutor.h>
 #include <folly/executors/task_queue/UnboundedBlockingQueue.h>
 
+#include "compute/delta/DeltaConnector.h"
 #include "operators/functions/RegistrationAllFunctions.h"
 #include "operators/plannodes/RowVectorStream.h"
 #include "utils/ConfigExtractor.h"
@@ -323,6 +324,12 @@ std::shared_ptr<facebook::velox::connector::Connector> 
VeloxBackend::createHiveC
   return std::make_shared<velox::connector::hive::HiveConnector>(connectorId, 
hiveConnectorConfig_, ioExecutor);
 }
 
+std::shared_ptr<facebook::velox::connector::Connector> 
VeloxBackend::createDeltaConnector(
+    const std::string& connectorId,
+    folly::Executor* ioExecutor) const {
+  return std::make_shared<delta::DeltaConnector>(connectorId, 
hiveConnectorConfig_, ioExecutor);
+}
+
 std::shared_ptr<facebook::velox::connector::Connector> 
VeloxBackend::createValueStreamConnector(
     const std::string& connectorId,
     bool dynamicFilterEnabled) const {
diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h
index 68791ec0f9..2796ce20c3 100644
--- a/cpp/velox/compute/VeloxBackend.h
+++ b/cpp/velox/compute/VeloxBackend.h
@@ -74,6 +74,10 @@ class VeloxBackend {
       const std::string& connectorId,
       folly::Executor* ioExecutor) const;
 
+  std::shared_ptr<facebook::velox::connector::Connector> createDeltaConnector(
+      const std::string& connectorId,
+      folly::Executor* ioExecutor) const;
+
   std::shared_ptr<facebook::velox::connector::Connector> 
createValueStreamConnector(
       const std::string& connectorId,
       bool dynamicFilterEnabled) const;
diff --git a/cpp/velox/compute/VeloxConnectorIds.h 
b/cpp/velox/compute/VeloxConnectorIds.h
index e6082bae8b..a0e37ba8b6 100644
--- a/cpp/velox/compute/VeloxConnectorIds.h
+++ b/cpp/velox/compute/VeloxConnectorIds.h
@@ -23,9 +23,11 @@ namespace gluten {
 
 struct VeloxConnectorIds {
   std::string hive;
+  std::string delta;
   std::string iterator;
   std::string cudfHive;
   bool hiveRegistered{false};
+  bool deltaRegistered{false};
   bool iteratorRegistered{false};
   bool cudfHiveRegistered{false};
 };
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index e3eac17a22..62e6820e9c 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -31,6 +31,7 @@
 #include "compute/ResultIterator.h"
 #include "compute/Runtime.h"
 #include "compute/VeloxPlanConverter.h"
+#include "compute/delta/DeltaConnector.h"
 #include "config/VeloxConfig.h"
 #include "operators/plannodes/IteratorSplit.h"
 #include "operators/serializer/VeloxRowToColumnarConverter.h"
@@ -213,6 +214,7 @@ std::string makeScopedConnectorId(const std::string& base, 
uint64_t runtimeId) {
 VeloxConnectorIds makeScopedConnectorIds(uint64_t runtimeId) {
   return VeloxConnectorIds{
       .hive = makeScopedConnectorId(kHiveConnectorId, runtimeId),
+      .delta = 
makeScopedConnectorId(delta::DeltaConnectorFactory::kDeltaConnectorName, 
runtimeId),
       .iterator = makeScopedConnectorId(kIteratorConnectorId, runtimeId),
       .cudfHive = makeScopedConnectorId(kCudfHiveConnectorId, runtimeId)};
 }
@@ -271,6 +273,13 @@ void VeloxRuntime::registerConnectors() {
       velox::connector::hasConnector(connectorIds_.hive),
       "Scoped hive connector not found after registration: " + 
connectorIds_.hive);
 
+  connectorIds_.deltaRegistered =
+      
velox::connector::registerConnector(backend->createDeltaConnector(connectorIds_.delta,
 ioExecutor_.get()));
+  GLUTEN_CHECK(connectorIds_.deltaRegistered, "Failed to register scoped delta 
connector: " + connectorIds_.delta);
+  GLUTEN_CHECK(
+      velox::connector::hasConnector(connectorIds_.delta),
+      "Scoped delta connector not found after registration: " + 
connectorIds_.delta);
+
   const auto valueStreamDynamicFilterEnabled =
       veloxCfg_->get<bool>(kValueStreamDynamicFilterEnabled, 
kValueStreamDynamicFilterEnabledDefault);
   connectorIds_.iteratorRegistered = velox::connector::registerConnector(
@@ -306,6 +315,10 @@ void VeloxRuntime::unregisterConnectors() {
     velox::connector::unregisterConnector(connectorIds_.iterator);
     connectorIds_.iteratorRegistered = false;
   }
+  if (connectorIds_.deltaRegistered) {
+    velox::connector::unregisterConnector(connectorIds_.delta);
+    connectorIds_.deltaRegistered = false;
+  }
   if (connectorIds_.hiveRegistered) {
     velox::connector::unregisterConnector(connectorIds_.hive);
     connectorIds_.hiveRegistered = false;
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index 2c1effba20..ccc1917f41 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -216,6 +216,7 @@ std::shared_ptr<velox::core::QueryCtx> 
WholeStageResultIterator::createNewVeloxQ
   std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>> 
connectorConfigs;
   auto hiveSessionConfig = createHiveConnectorSessionConfig(veloxCfg_);
   connectorConfigs[connectorIds_.hive] = hiveSessionConfig;
+  connectorConfigs[connectorIds_.delta] = hiveSessionConfig;
   connectorConfigs[connectorIds_.iterator] = hiveSessionConfig;
 #ifdef GLUTEN_ENABLE_GPU
   if (!connectorIds_.cudfHive.empty()) {
diff --git a/cpp/velox/compute/delta/DeltaConnector.cpp 
b/cpp/velox/compute/delta/DeltaConnector.cpp
new file mode 100644
index 0000000000..f25eb7714a
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaConnector.cpp
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "compute/delta/DeltaConnector.h"
+
+#include "compute/delta/DeltaDataSource.h"
+
+namespace gluten::delta {
+
+std::unique_ptr<DataSource> DeltaConnector::createDataSource(
+    const RowTypePtr& outputType,
+    const ConnectorTableHandlePtr& tableHandle,
+    const ColumnHandleMap& columnHandles,
+    ConnectorQueryCtx* connectorQueryCtx) {
+  return std::make_unique<DeltaDataSource>(
+      outputType, tableHandle, columnHandles, &fileHandleFactory_, 
ioExecutor_, connectorQueryCtx, hiveConfig_);
+}
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaConnector.h 
b/cpp/velox/compute/delta/DeltaConnector.h
new file mode 100644
index 0000000000..2d3a1d8df6
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaConnector.h
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "velox/connectors/hive/HiveConnector.h"
+
+namespace gluten::delta {
+
+using namespace facebook::velox;
+using namespace facebook::velox::connector;
+using namespace facebook::velox::connector::hive;
+
+class DeltaConnector final : public HiveConnector {
+ public:
+  DeltaConnector(const std::string& id, std::shared_ptr<const 
config::ConfigBase> config, folly::Executor* ioExecutor)
+      : HiveConnector(id, std::move(config), ioExecutor) {}
+
+  std::unique_ptr<DataSource> createDataSource(
+      const RowTypePtr& outputType,
+      const ConnectorTableHandlePtr& tableHandle,
+      const ColumnHandleMap& columnHandles,
+      ConnectorQueryCtx* connectorQueryCtx) override;
+};
+
+class DeltaConnectorFactory final : public ConnectorFactory {
+ public:
+  static constexpr const char* kDeltaConnectorName = "delta";
+
+  DeltaConnectorFactory() : ConnectorFactory(kDeltaConnectorName) {}
+
+  std::shared_ptr<Connector> newConnector(
+      const std::string& id,
+      std::shared_ptr<const config::ConfigBase> config,
+      folly::Executor* ioExecutor = nullptr,
+      [[maybe_unused]] folly::Executor* cpuExecutor = nullptr) override {
+    return std::make_shared<DeltaConnector>(id, std::move(config), ioExecutor);
+  }
+};
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaDataSource.cpp 
b/cpp/velox/compute/delta/DeltaDataSource.cpp
new file mode 100644
index 0000000000..f8c00c2b07
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaDataSource.cpp
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "compute/delta/DeltaDataSource.h"
+
+#include "compute/delta/DeltaSplitReader.h"
+
+namespace gluten::delta {
+
+DeltaDataSource::DeltaDataSource(
+    const RowTypePtr& outputType,
+    const ConnectorTableHandlePtr& tableHandle,
+    const ColumnHandleMap& assignments,
+    FileHandleFactory* fileHandleFactory,
+    folly::Executor* ioExecutor,
+    const ConnectorQueryCtx* connectorQueryCtx,
+    const std::shared_ptr<HiveConfig>& hiveConfig)
+    : HiveDataSource(
+          outputType,
+          tableHandle,
+          assignments,
+          fileHandleFactory,
+          ioExecutor,
+          connectorQueryCtx,
+          hiveConfig) {}
+
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+std::unique_ptr<FileSplitReader> DeltaDataSource::createSplitReader() {
+  auto bucketChannels = prepareSplit();
+  auto deltaSplit = checkedPointerCast<const HiveDeltaSplit>(split_);
+
+  return std::make_unique<DeltaSplitReader>(
+      deltaSplit,
+      tableHandle_,
+      &partitionKeys_,
+      connectorQueryCtx_,
+      fileConfig_,
+      readerOutputType_,
+      dataIoStats_,
+      metadataIoStats_,
+      ioStats_,
+      fileHandleFactory_,
+      ioExecutor_,
+      scanSpec_,
+      &infoColumns_,
+      std::move(bucketChannels),
+      /*subfieldFiltersForValidation=*/getFilters());
+}
+#else
+std::unique_ptr<SplitReader> DeltaDataSource::createSplitReader() {
+  auto deltaSplit = checkedPointerCast<const HiveDeltaSplit>(split_);
+
+  return std::make_unique<DeltaSplitReader>(
+      deltaSplit,
+      hiveTableHandle_,
+      &partitionKeys_,
+      connectorQueryCtx_,
+      hiveConfig_,
+      readerOutputType_,
+      ioStatistics_,
+      ioStats_,
+      fileHandleFactory_,
+      ioExecutor_,
+      scanSpec_,
+      /*subfieldFiltersForValidation=*/getFilters());
+}
+#endif
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaDataSource.h 
b/cpp/velox/compute/delta/DeltaDataSource.h
new file mode 100644
index 0000000000..a9fe1a4753
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaDataSource.h
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "velox/connectors/hive/HiveDataSource.h"
+
+#ifndef GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+#if __has_include("velox/connectors/hive/FileSplitReader.h")
+#define GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER 1
+#else
+#define GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER 0
+#endif
+#endif
+
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+#include "velox/connectors/hive/FileSplitReader.h"
+#else
+namespace facebook::velox::connector::hive {
+class SplitReader;
+}
+#endif
+
+namespace gluten::delta {
+
+using namespace facebook::velox;
+using namespace facebook::velox::connector;
+using namespace facebook::velox::connector::hive;
+
+class DeltaDataSource : public HiveDataSource {
+ public:
+  DeltaDataSource(
+      const RowTypePtr& outputType,
+      const ConnectorTableHandlePtr& tableHandle,
+      const ColumnHandleMap& assignments,
+      FileHandleFactory* fileHandleFactory,
+      folly::Executor* ioExecutor,
+      const ConnectorQueryCtx* connectorQueryCtx,
+      const std::shared_ptr<HiveConfig>& hiveConfig);
+
+ protected:
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+  std::unique_ptr<FileSplitReader> createSplitReader() override;
+#else
+  std::unique_ptr<SplitReader> createSplitReader() override;
+#endif
+};
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaDeletionVectorReader.cpp 
b/cpp/velox/compute/delta/DeltaDeletionVectorReader.cpp
new file mode 100644
index 0000000000..a48060d060
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaDeletionVectorReader.cpp
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "compute/delta/DeltaDeletionVectorReader.h"
+
+#include <cstring>
+#include "velox/common/base/BitUtil.h"
+#include "velox/common/base/Exceptions.h"
+
+namespace gluten::delta {
+
+namespace {
+
+constexpr uint64_t kDeltaBitmapArrayMagicBytes = 4;
+constexpr uint64_t kDeltaNativeBitmapArrayLengthBytes = 4;
+constexpr uint64_t kDeltaStoredPayloadLengthBytes = 4;
+constexpr uint32_t kDeltaPortableBitmapArrayMagicNumber = 1681511377;
+constexpr uint32_t kDeltaNativeBitmapArrayMagicNumber = 1681511376;
+
+uint32_t readUint32LittleEndian(const char* data) {
+  const auto* bytes = reinterpret_cast<const uint8_t*>(data);
+  return static_cast<uint32_t>(bytes[0]) | (static_cast<uint32_t>(bytes[1]) << 
8) |
+      (static_cast<uint32_t>(bytes[2]) << 16) | 
(static_cast<uint32_t>(bytes[3]) << 24);
+}
+
+uint64_t readUint64LittleEndian(const char* data) {
+  const auto* bytes = reinterpret_cast<const uint8_t*>(data);
+  return static_cast<uint64_t>(bytes[0]) | (static_cast<uint64_t>(bytes[1]) << 
8) |
+      (static_cast<uint64_t>(bytes[2]) << 16) | 
(static_cast<uint64_t>(bytes[3]) << 24) |
+      (static_cast<uint64_t>(bytes[4]) << 32) | 
(static_cast<uint64_t>(bytes[5]) << 40) |
+      (static_cast<uint64_t>(bytes[6]) << 48) | 
(static_cast<uint64_t>(bytes[7]) << 56);
+}
+
+roaring::Roaring64Map deserializeDeltaBitmapArray(std::string_view 
serializedPayload, const std::string& dvPath) {
+  VELOX_USER_CHECK_GE(
+      serializedPayload.size(),
+      kDeltaBitmapArrayMagicBytes,
+      "Deletion vector payload is too small for Delta bitmap array: {}",
+      dvPath);
+
+  const auto magic = readUint32LittleEndian(serializedPayload.data());
+  if (magic == kDeltaPortableBitmapArrayMagicNumber) {
+    const auto portablePayload = 
serializedPayload.substr(kDeltaBitmapArrayMagicBytes);
+    return roaring::Roaring64Map::readSafe(portablePayload.data(), 
portablePayload.size());
+  }
+
+  if (magic == kDeltaNativeBitmapArrayMagicNumber) {
+    VELOX_USER_CHECK_GE(
+        serializedPayload.size(),
+        kDeltaBitmapArrayMagicBytes + kDeltaNativeBitmapArrayLengthBytes,
+        "Deletion vector payload is too small for Delta native bitmap array: 
{}",
+        dvPath);
+
+    const auto bitmapCount = readUint32LittleEndian(serializedPayload.data() + 
kDeltaBitmapArrayMagicBytes);
+    size_t offset = kDeltaBitmapArrayMagicBytes + 
kDeltaNativeBitmapArrayLengthBytes;
+    roaring::Roaring64Map result;
+
+    for (uint64_t bitmapIndex = 0; bitmapIndex < bitmapCount; ++bitmapIndex) {
+      VELOX_USER_CHECK_LE(
+          offset + kDeltaStoredPayloadLengthBytes,
+          serializedPayload.size(),
+          "Deletion vector payload ended before bitmap {} size for {}",
+          bitmapIndex,
+          dvPath);
+
+      const auto bitmapSize = readUint32LittleEndian(serializedPayload.data() 
+ offset);
+      offset += kDeltaStoredPayloadLengthBytes;
+
+      VELOX_USER_CHECK_LE(
+          offset + bitmapSize,
+          serializedPayload.size(),
+          "Deletion vector bitmap {} range exceeds payload size for {}",
+          bitmapIndex,
+          dvPath);
+
+      auto bitmap = roaring::Roaring::readSafe(serializedPayload.data() + 
offset, bitmapSize);
+      VELOX_USER_CHECK_EQ(
+          bitmap.getSizeInBytes(true),
+          bitmapSize,
+          "Deletion vector bitmap {} size mismatch for {}: expected {}, got 
{}",
+          bitmapIndex,
+          dvPath,
+          bitmapSize,
+          bitmap.getSizeInBytes(true));
+
+      const uint64_t rowBase = bitmapIndex << 32;
+      for (auto it = bitmap.begin(); it != bitmap.end(); ++it) {
+        result.add(rowBase | static_cast<uint64_t>(*it));
+      }
+      offset += bitmapSize;
+    }
+
+    VELOX_USER_CHECK_EQ(
+        offset,
+        serializedPayload.size(),
+        "Deletion vector payload has {} unexpected trailing bytes for {}",
+        serializedPayload.size() - offset,
+        dvPath);
+    return result;
+  }
+
+  VELOX_USER_FAIL("Unexpected Delta bitmap array magic number {} for {}", 
magic, dvPath);
+}
+
+} // namespace
+
+void DeltaDeletionVectorReader::loadSerializedDeletionVectorInternal(
+    std::string_view serializedPayload,
+    const std::string& debugName,
+    std::optional<uint64_t> expectedCardinality) {
+  VELOX_USER_CHECK_GT(serializedPayload.size(), 0, "Serialized deletion vector 
is empty: {}", debugName);
+
+  deletionBitmap_ = deserializeDeltaBitmapArray(serializedPayload, debugName);
+
+  if (expectedCardinality.has_value()) {
+    const auto actualCardinality = deletionBitmap_->cardinality();
+    VELOX_USER_CHECK_EQ(
+        actualCardinality,
+        expectedCardinality.value(),
+        "Deletion vector cardinality mismatch for {}: expected {}, got {}",
+        debugName,
+        expectedCardinality.value(),
+        actualCardinality);
+  }
+}
+
+void DeltaDeletionVectorReader::loadSerializedDeletionVector(
+    std::string_view serializedPayload,
+    std::optional<uint64_t> expectedCardinality) {
+  try {
+    loadSerializedDeletionVectorInternal(serializedPayload, "serialized 
deletion vector", expectedCardinality);
+  } catch (const std::exception& e) {
+    VELOX_USER_FAIL("Failed to load serialized deletion vector: {}", e.what());
+  }
+}
+
+bool DeltaDeletionVectorReader::isRowDeleted(uint64_t rowPosition) {
+  if (!deletionBitmap_.has_value()) {
+    return false;
+  }
+
+  return deletionBitmap_->contains(rowPosition);
+}
+
+void DeltaDeletionVectorReader::applyDeletionFilter(uint64_t baseReadOffset, 
uint64_t size, BufferPtr deleteBitmap) {
+  VELOX_CHECK_NOT_NULL(deleteBitmap, "Delete bitmap buffer is required");
+
+  if (!deletionBitmap_.has_value()) {
+    std::memset(deleteBitmap->asMutable<uint8_t>(), 0, bits::nbytes(size));
+    deleteBitmap->setSize(0);
+    return;
+  }
+
+  auto* rawBitmap = deleteBitmap->asMutable<uint64_t>();
+  std::memset(rawBitmap, 0, bits::nbytes(size));
+
+  bool hasDeletedRows = false;
+  uint64_t highestDeletedIndex = 0;
+  for (uint64_t i = 0; i < size; ++i) {
+    const uint64_t absoluteRowPos = baseReadOffset + i;
+    if (deletionBitmap_->contains(absoluteRowPos)) {
+      bits::setBit(rawBitmap, i);
+      hasDeletedRows = true;
+      highestDeletedIndex = i;
+    }
+  }
+
+  deleteBitmap->setSize(hasDeletedRows ? bits::nbytes(highestDeletedIndex + 1) 
: 0);
+}
+
+uint64_t DeltaDeletionVectorReader::estimatedDeletedRowCount() const {
+  if (!deletionBitmap_.has_value()) {
+    return 0;
+  }
+
+  // Return actual cardinality instead of estimated size
+  return deletionBitmap_->cardinality();
+}
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaDeletionVectorReader.h 
b/cpp/velox/compute/delta/DeltaDeletionVectorReader.h
new file mode 100644
index 0000000000..7eac6ea660
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaDeletionVectorReader.h
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "velox/common/base/BitUtil.h"
+#include "velox/vector/ComplexVector.h"
+
+#include <memory>
+#include <optional>
+#include <roaring/roaring64map.hh>
+#include <string>
+#include <string_view>
+
+namespace gluten::delta {
+
+using namespace facebook::velox;
+
+/// Reads and manages Delta Lake deletion vectors for filtering deleted rows
+/// during table scans.
+///
+/// The JVM Delta side materializes the deletion vector and hands the 
serialized
+/// bitmap payload to native. This reader only deserializes that payload and
+/// applies row filtering during scan.
+///
+/// Usage example:
+/// @code
+///   auto reader = std::make_unique<DeltaDeletionVectorReader>();
+///   reader->loadSerializedDeletionVector(serializedPayload, 
expectedCardinality);
+///   if (reader->isRowDeleted(42)) {
+///     // Skip this row during scan
+///   }
+/// @endcode
+class DeltaDeletionVectorReader {
+ public:
+  DeltaDeletionVectorReader() = default;
+
+  /// Loads a deletion vector from an already decoded serialized Delta payload.
+  /// @param serializedPayload Materialized Delta DV bitmap payload.
+  /// @param expectedCardinality Optional number of deleted row positions from
+  /// Delta metadata. When set, the reader verifies this value against the
+  /// deserialized bitmap cardinality and fails loading on mismatch.
+  void loadSerializedDeletionVector(
+      std::string_view serializedPayload,
+      std::optional<uint64_t> expectedCardinality = std::nullopt);
+
+  /// Checks if a specific row position is marked as deleted.
+  /// Note: This method is not const because it may update internal caching
+  /// state.
+  /// @param rowPosition 0-based row position in the data file
+  /// @return true if the row is deleted, false otherwise
+  bool isRowDeleted(uint64_t rowPosition);
+
+  /// Applies deletion filter to a batch of rows, updating the deletion bitmap.
+  /// This is called during scan to mark deleted rows in the output bitmap.
+  /// @param baseReadOffset Starting row position for this batch (absolute)
+  /// @param size Number of rows in the batch
+  /// @param deleteBitmap Output bitmap marking deleted rows (1 = deleted, 0 =
+  /// keep)
+  void applyDeletionFilter(uint64_t baseReadOffset, uint64_t size, BufferPtr 
deleteBitmap);
+
+  /// Returns true if no deletion vector is loaded.
+  bool empty() const {
+    return !deletionBitmap_.has_value();
+  }
+
+  /// Returns the approximate number of deleted rows in the loaded DV.
+  /// Note: This is an approximation based on bitmap size.
+  uint64_t estimatedDeletedRowCount() const;
+
+ private:
+  void loadSerializedDeletionVectorInternal(
+      std::string_view serializedPayload,
+      const std::string& debugName,
+      std::optional<uint64_t> expectedCardinality);
+
+  // The loaded deletion vector bitmap
+  std::optional<roaring::Roaring64Map> deletionBitmap_;
+};
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaSplit.cpp 
b/cpp/velox/compute/delta/DeltaSplit.cpp
new file mode 100644
index 0000000000..830a326be1
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaSplit.cpp
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "compute/delta/DeltaSplit.h"
+
+namespace gluten::delta {
+
+HiveDeltaSplit::HiveDeltaSplit(
+    const std::string& connectorId,
+    const std::string& filePath,
+    dwio::common::FileFormat fileFormat,
+    uint64_t start,
+    uint64_t length,
+    const std::unordered_map<std::string, std::optional<std::string>>& 
partitionKeys,
+    std::optional<int32_t> tableBucketNumber,
+    const std::unordered_map<std::string, std::string>& customSplitInfo,
+    const std::shared_ptr<std::string>& extraFileInfo,
+    const std::unordered_map<std::string, std::string>& serdeParameters,
+    bool cacheable,
+    std::optional<DeltaDeletionVectorDescriptor> deletionVector,
+    std::optional<DeltaFileStatistics> statistics,
+    DeltaRowIndexFilterType filterType,
+    const std::unordered_map<std::string, std::string>& infoColumns,
+    std::optional<FileProperties> fileProperties)
+    : HiveConnectorSplit(
+          connectorId,
+          filePath,
+          fileFormat,
+          start,
+          length,
+          partitionKeys,
+          tableBucketNumber,
+          customSplitInfo,
+          extraFileInfo,
+          serdeParameters,
+          /*splitWeight=*/0,
+          cacheable,
+          infoColumns,
+          fileProperties,
+          std::nullopt,
+          std::nullopt),
+      deletionVector(std::move(deletionVector)),
+      statistics(std::move(statistics)),
+      filterType(filterType) {}
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaSplit.h 
b/cpp/velox/compute/delta/DeltaSplit.h
new file mode 100644
index 0000000000..df05e62fc3
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaSplit.h
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <limits>
+#include <optional>
+#include <vector>
+
+#include "compute/Runtime.h"
+#include "velox/connectors/hive/HiveConnectorSplit.h"
+
+namespace gluten::delta {
+
+using namespace facebook::velox;
+using namespace facebook::velox::connector;
+using namespace facebook::velox::connector::hive;
+
+enum class DeltaRowIndexFilterType {
+  kKeepAll,
+  kIfContained,
+  kIfNotContained,
+};
+
+struct DeltaDeletionVectorDescriptor {
+  std::optional<uint64_t> cardinality;
+  std::optional<SplitPayloadBufferView> serializedPayloadView;
+
+  static DeltaDeletionVectorDescriptor serialized(
+      std::optional<uint64_t> cardinality = std::nullopt,
+      std::optional<SplitPayloadBufferView> serializedPayloadView = 
std::nullopt) {
+    return {cardinality, serializedPayloadView};
+  }
+
+  bool hasMaterializedPayload() const {
+    return serializedPayloadView.has_value();
+  }
+};
+
+/// File-level statistics for a Delta data file.
+/// Used to validate consistency with deletion vectors and
+/// calculate logical row counts.
+struct DeltaFileStatistics {
+  /// Physical number of rows in the Parquet file.
+  /// Required when deletion vector is present (per Delta spec).
+  std::optional<int64_t> numRecords;
+
+  /// Whether column statistics (min/max) are tight bounds.
+  /// - true: min/max values exist in the valid (non-deleted) rows
+  /// - false: min/max are bounds only, may not exist in valid rows
+  /// When false with a DV, statistics may be stale and unsuitable
+  /// for aggregations like max(column).
+  std::optional<bool> tightBounds;
+
+  /// Calculate the logical row count accounting for deletion vectors.
+  /// Returns the number of valid (non-deleted) rows.
+  /// Returns -1 if numRecords is not available.
+  int64_t logicalRowCount(const std::optional<DeltaDeletionVectorDescriptor>& 
dv) const {
+    if (!numRecords.has_value()) {
+      return -1; // Unknown
+    }
+    if (!dv.has_value() || !dv->cardinality.has_value()) {
+      return *numRecords; // No deletions
+    }
+    return *numRecords - static_cast<int64_t>(*dv->cardinality);
+  }
+};
+
+struct HiveDeltaSplit : public connector::hive::HiveConnectorSplit {
+  std::optional<DeltaDeletionVectorDescriptor> deletionVector;
+  std::optional<DeltaFileStatistics> statistics;
+  DeltaRowIndexFilterType filterType;
+
+  HiveDeltaSplit(
+      const std::string& connectorId,
+      const std::string& filePath,
+      dwio::common::FileFormat fileFormat,
+      uint64_t start = 0,
+      uint64_t length = std::numeric_limits<uint64_t>::max(),
+      const std::unordered_map<std::string, std::optional<std::string>>& 
partitionKeys = {},
+      std::optional<int32_t> tableBucketNumber = std::nullopt,
+      const std::unordered_map<std::string, std::string>& customSplitInfo = {},
+      const std::shared_ptr<std::string>& extraFileInfo = {},
+      const std::unordered_map<std::string, std::string>& serdeParameters = {},
+      bool cacheable = true,
+      std::optional<DeltaDeletionVectorDescriptor> deletionVector = 
std::nullopt,
+      std::optional<DeltaFileStatistics> statistics = std::nullopt,
+      DeltaRowIndexFilterType filterType = DeltaRowIndexFilterType::kKeepAll,
+      const std::unordered_map<std::string, std::string>& infoColumns = {},
+      std::optional<FileProperties> fileProperties = std::nullopt);
+};
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaSplitReader.cpp 
b/cpp/velox/compute/delta/DeltaSplitReader.cpp
new file mode 100644
index 0000000000..0b71cf64cf
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaSplitReader.cpp
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "compute/delta/DeltaSplitReader.h"
+
+#include <string_view>
+
+#include "compute/delta/DeltaSplit.h"
+#include "velox/connectors/hive/HiveConfig.h"
+#include "velox/dwio/common/BufferUtil.h"
+
+using namespace facebook::velox::dwio::common;
+
+namespace gluten::delta {
+
+DeltaSplitReader::DeltaSplitReader(
+    const std::shared_ptr<const HiveDeltaSplit>& hiveSplit,
+    const DeltaTableHandlePtr& tableHandle,
+    const DeltaColumnHandleMap* partitionKeys,
+    const ConnectorQueryCtx* connectorQueryCtx,
+    const std::shared_ptr<const DeltaConfig>& fileConfig,
+    const RowTypePtr& readerOutputType,
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+    const std::shared_ptr<io::IoStatistics>& dataIoStats,
+    const std::shared_ptr<io::IoStatistics>& metadataIoStats,
+#else
+    const std::shared_ptr<io::IoStatistics>& ioStatistics,
+#endif
+    const std::shared_ptr<IoStats>& ioStats,
+    FileHandleFactory* fileHandleFactory,
+    folly::Executor* executor,
+    const std::shared_ptr<common::ScanSpec>& scanSpec,
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+    const std::unordered_map<std::string, FileColumnHandlePtr>* infoColumns,
+    std::vector<column_index_t> bucketChannels,
+#endif
+    const common::SubfieldFilters* subfieldFiltersForValidation)
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+    : HiveSplitReader(
+          hiveSplit,
+          tableHandle,
+          partitionKeys,
+          connectorQueryCtx,
+          fileConfig,
+          readerOutputType,
+          dataIoStats,
+          metadataIoStats,
+          ioStats,
+          fileHandleFactory,
+          executor,
+          scanSpec,
+          infoColumns,
+          std::move(bucketChannels),
+          subfieldFiltersForValidation),
+#else
+    : SplitReader(
+          hiveSplit,
+          tableHandle,
+          partitionKeys,
+          connectorQueryCtx,
+          fileConfig,
+          readerOutputType,
+          ioStatistics,
+          ioStats,
+          fileHandleFactory,
+          executor,
+          scanSpec,
+          subfieldFiltersForValidation),
+#endif
+      baseReadRowNumber_(0),
+      deleteBitmap_(nullptr) {
+}
+
+void DeltaSplitReader::prepareSplit(
+    std::shared_ptr<common::MetadataFilter> metadataFilter,
+    dwio::common::RuntimeStatistics& runtimeStats,
+    const folly::F14FastMap<std::string, std::string>& fileReadOps) {
+  (void)fileReadOps;
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+  HiveSplitReader::prepareSplit(std::move(metadataFilter), runtimeStats, 
fileReadOps);
+  if (emptySplit() || !baseRowReader_) {
+    return;
+  }
+#else
+  SplitReader::prepareSplit(std::move(metadataFilter), runtimeStats, 
fileReadOps);
+  if (emptySplit_ || !baseRowReader_) {
+    return;
+  }
+#endif
+
+  baseReadRowNumber_ = 0;
+  deleteBitmap_.reset();
+  deletionVectorReader_.reset();
+
+  auto deltaSplit = checkedPointerCast<const HiveDeltaSplit>(hiveSplit_);
+  if (!deltaSplit->deletionVector.has_value()) {
+    return;
+  }
+
+  const auto& descriptor = *deltaSplit->deletionVector;
+
+  // Validate statistics if provided
+  if (deltaSplit->statistics.has_value()) {
+    validateStatisticsForDeletionVectors(*deltaSplit->statistics, descriptor);
+  }
+
+  VELOX_USER_CHECK(
+      descriptor.hasMaterializedPayload(),
+      "Delta deletion vector payload was not materialized on the JVM side for 
split {}",
+      hiveSplit_->filePath);
+
+  deletionVectorReader_ = std::make_unique<DeltaDeletionVectorReader>();
+  const auto& payloadView = descriptor.serializedPayloadView.value();
+  deletionVectorReader_->loadSerializedDeletionVector(
+      std::string_view(reinterpret_cast<const char*>(payloadView.data), 
payloadView.size), descriptor.cardinality);
+}
+
+uint64_t DeltaSplitReader::next(uint64_t size, VectorPtr& output) {
+  Mutation mutation;
+  mutation.randomSkip = baseReaderOpts_.randomSkip().get();
+  mutation.deletedRows = nullptr;
+
+  const auto actualSize = baseRowReader_->nextReadSize(size);
+  baseReadRowNumber_ = baseRowReader_->nextRowNumber();
+  if (actualSize == RowReader::kAtEnd) {
+    return 0;
+  }
+
+  const auto deltaSplit = checkedPointerCast<const HiveDeltaSplit>(hiveSplit_);
+  if (deletionVectorReader_ && !deletionVectorReader_->empty()) {
+    const auto numBytes = bits::nbytes(actualSize);
+    ensureCapacity<int8_t>(deleteBitmap_, numBytes, 
connectorQueryCtx_->memoryPool(), false, true);
+    deleteBitmap_->setSize(numBytes);
+    deletionVectorReader_->applyDeletionFilter(baseReadRowNumber_, actualSize, 
deleteBitmap_);
+    if (deltaSplit->filterType == DeltaRowIndexFilterType::kIfNotContained) {
+      bits::negate(deleteBitmap_->asMutable<uint64_t>(), actualSize);
+      deleteBitmap_->setSize(numBytes);
+    }
+  } else if (deleteBitmap_) {
+    deleteBitmap_->setSize(0);
+  }
+
+  mutation.deletedRows = deleteBitmap_ && deleteBitmap_->size() > 0 ? 
deleteBitmap_->as<uint64_t>() : nullptr;
+
+  auto rowsScanned = baseRowReader_->next(actualSize, output, &mutation);
+  if (rowsScanned > 0 && output->size() > 0 && !bucketChannels().empty()) {
+    applyBucketConversion(output, 
bucketConversionRows(*output->asChecked<RowVector>()));
+  }
+  return rowsScanned;
+}
+
+void DeltaSplitReader::validateStatisticsForDeletionVectors(
+    const DeltaFileStatistics& stats,
+    const DeltaDeletionVectorDescriptor& dv) {
+  // Per Delta spec: numRecords is required when DV is present
+  if (!stats.numRecords.has_value()) {
+    VELOX_USER_FAIL(
+        "File statistics must include numRecords when deletion vector "
+        "is present. This is required by the Delta Lake protocol.");
+  }
+
+  // Validate cardinality doesn't exceed numRecords
+  if (dv.cardinality.has_value() && static_cast<int64_t>(*dv.cardinality) > 
*stats.numRecords) {
+    VELOX_USER_FAIL(
+        "Deletion vector cardinality ({}) exceeds file numRecords ({}). "
+        "This indicates data corruption or an invalid deletion vector.",
+        *dv.cardinality,
+        *stats.numRecords);
+  }
+
+  // Log warning if tightBounds is false (statistics may be stale)
+  if (stats.tightBounds.has_value() && !*stats.tightBounds) {
+    LOG(WARNING) << "File has deletion vector with loose bounds 
(tightBounds=false). "
+                 << "Column statistics (min/max) may not be accurate for 
aggregations. "
+                 << "Consider running OPTIMIZE to compact the deletion 
vector.";
+  }
+}
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/DeltaSplitReader.h 
b/cpp/velox/compute/delta/DeltaSplitReader.h
new file mode 100644
index 0000000000..a9d4d2940c
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaSplitReader.h
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "compute/delta/DeltaDeletionVectorReader.h"
+#include "compute/delta/DeltaSplit.h"
+
+#ifndef GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+#if __has_include("velox/connectors/hive/FileSplitReader.h")
+#define GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER 1
+#else
+#define GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER 0
+#endif
+#endif
+
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+#include "velox/connectors/hive/HiveSplitReader.h"
+#elif __has_include("velox/connectors/hive/SplitReader.h")
+#include "velox/connectors/hive/SplitReader.h"
+#else
+#include "velox/connectors/hive/HiveDataSource.h"
+#endif
+#include "velox/connectors/hive/TableHandle.h"
+
+namespace gluten::delta {
+
+using namespace facebook::velox;
+using namespace facebook::velox::connector;
+using namespace facebook::velox::connector::hive;
+
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+using DeltaSplitReaderBase = HiveSplitReader;
+using DeltaConfig = FileConfig;
+using DeltaTableHandlePtr = FileTableHandlePtr;
+using DeltaColumnHandleMap = std::unordered_map<std::string, 
FileColumnHandlePtr>;
+#else
+using DeltaSplitReaderBase = SplitReader;
+using DeltaConfig = HiveConfig;
+using DeltaTableHandlePtr = HiveTableHandlePtr;
+using DeltaColumnHandleMap = HiveColumnHandleMap;
+#endif
+
+class DeltaSplitReader : public DeltaSplitReaderBase {
+ public:
+  DeltaSplitReader(
+      const std::shared_ptr<const HiveDeltaSplit>& hiveSplit,
+      const DeltaTableHandlePtr& tableHandle,
+      const DeltaColumnHandleMap* partitionKeys,
+      const ConnectorQueryCtx* connectorQueryCtx,
+      const std::shared_ptr<const DeltaConfig>& fileConfig,
+      const RowTypePtr& readerOutputType,
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+      const std::shared_ptr<io::IoStatistics>& dataIoStats,
+      const std::shared_ptr<io::IoStatistics>& metadataIoStats,
+#else
+      const std::shared_ptr<io::IoStatistics>& ioStatistics,
+#endif
+      const std::shared_ptr<IoStats>& ioStats,
+      FileHandleFactory* fileHandleFactory,
+      folly::Executor* executor,
+      const std::shared_ptr<common::ScanSpec>& scanSpec,
+#if GLUTEN_VELOX_DELTA_USE_FILE_SPLIT_READER
+      const std::unordered_map<std::string, FileColumnHandlePtr>* infoColumns,
+      std::vector<column_index_t> bucketChannels = {},
+#endif
+      const common::SubfieldFilters* subfieldFiltersForValidation = nullptr);
+
+  void prepareSplit(
+      std::shared_ptr<common::MetadataFilter> metadataFilter,
+      dwio::common::RuntimeStatistics& runtimeStats,
+      const folly::F14FastMap<std::string, std::string>& fileReadOps = {}) 
override;
+
+  uint64_t next(uint64_t size, VectorPtr& output) override;
+
+ private:
+  /// Validate that file statistics are consistent with deletion vector.
+  /// Per Delta spec: numRecords is required when DV is present.
+  /// Also validates that cardinality doesn't exceed numRecords.
+  void validateStatisticsForDeletionVectors(const DeltaFileStatistics& stats, 
const DeltaDeletionVectorDescriptor& dv);
+
+  // Delta deletion vectors use file-global row positions, not split-relative
+  // row numbers.
+  uint64_t baseReadRowNumber_;
+  std::unique_ptr<DeltaDeletionVectorReader> deletionVectorReader_;
+  BufferPtr deleteBitmap_;
+};
+
+} // namespace gluten::delta
diff --git a/cpp/velox/compute/delta/tests/CMakeLists.txt 
b/cpp/velox/compute/delta/tests/CMakeLists.txt
index 9641cc341c..eabe94b722 100644
--- a/cpp/velox/compute/delta/tests/CMakeLists.txt
+++ b/cpp/velox/compute/delta/tests/CMakeLists.txt
@@ -23,3 +23,15 @@ add_test(
   NAME velox_roaring_bitmap_array_test
   COMMAND velox_roaring_bitmap_array_test
   WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
+
+add_executable(
+  velox_delta_read_test DeltaConnectorTest.cpp
+                        DeltaDeletionVectorReaderTest.cpp DeltaSplitTest.cpp)
+
+target_link_libraries(velox_delta_read_test velox roaring
+                      facebook::velox::exec_test_lib GTest::gtest)
+
+add_test(
+  NAME velox_delta_read_test
+  COMMAND velox_delta_read_test
+  WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
diff --git a/cpp/velox/compute/delta/tests/DeltaConnectorTest.cpp 
b/cpp/velox/compute/delta/tests/DeltaConnectorTest.cpp
new file mode 100644
index 0000000000..3669f911d9
--- /dev/null
+++ b/cpp/velox/compute/delta/tests/DeltaConnectorTest.cpp
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "compute/delta/DeltaConnector.h"
+#include "compute/delta/DeltaSplit.h"
+#include "compute/delta/RoaringBitmapArray.h"
+#include "folly/init/Init.h"
+#include "velox/connectors/Connector.h"
+#include "velox/connectors/hive/HiveConfig.h"
+#include "velox/exec/tests/utils/AssertQueryBuilder.h"
+#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
+#include "velox/exec/tests/utils/PlanBuilder.h"
+
+#include <limits>
+
+namespace gluten::delta {
+
+namespace {
+
+class DeltaConnectorTest : public ::testing::Test {
+ protected:
+  static constexpr const char* kConnectorId = "test-delta";
+
+  void TearDown() override {
+    unregisterConnector(kConnectorId);
+  }
+
+  void registerDeltaConnector(
+      std::shared_ptr<const config::ConfigBase> config =
+          std::make_shared<config::ConfigBase>(std::unordered_map<std::string, 
std::string>{})) {
+    unregisterConnector(kConnectorId);
+
+    DeltaConnectorFactory factory;
+    registerConnector(factory.newConnector(kConnectorId, std::move(config)));
+  }
+};
+
+TEST_F(DeltaConnectorTest, connectorConfiguration) {
+  auto customConfig = 
std::make_shared<config::ConfigBase>(std::unordered_map<std::string, 
std::string>{
+      {hive::HiveConfig::kEnableFileHandleCache, "true"}, 
{hive::HiveConfig::kNumCacheFileHandles, "1000"}});
+
+  registerDeltaConnector(customConfig);
+
+  auto deltaConnector = getConnector(kConnectorId);
+  ASSERT_NE(deltaConnector, nullptr);
+
+  hive::HiveConfig hiveConfig(deltaConnector->connectorConfig());
+  ASSERT_TRUE(hiveConfig.isFileHandleCacheEnabled());
+  ASSERT_EQ(hiveConfig.numCacheFileHandles(), 1000);
+}
+
+TEST_F(DeltaConnectorTest, connectorProperties) {
+  registerDeltaConnector();
+
+  auto deltaConnector = getConnector(kConnectorId);
+  ASSERT_NE(deltaConnector, nullptr);
+  ASSERT_TRUE(deltaConnector->canAddDynamicFilter());
+  ASSERT_TRUE(deltaConnector->supportsSplitPreload());
+}
+
+class DeltaConnectorExecutionTest : public 
facebook::velox::exec::test::HiveConnectorTestBase {
+ protected:
+  static constexpr const char* kConnectorId = "test-delta";
+
+  void SetUp() override {
+    facebook::velox::exec::test::HiveConnectorTestBase::SetUp();
+    registerDeltaConnector();
+  }
+
+  void TearDown() override {
+    unregisterConnector(kConnectorId);
+    facebook::velox::exec::test::HiveConnectorTestBase::TearDown();
+  }
+
+  void registerDeltaConnector(
+      std::shared_ptr<const config::ConfigBase> config =
+          std::make_shared<config::ConfigBase>(std::unordered_map<std::string, 
std::string>{})) {
+    unregisterConnector(kConnectorId);
+
+    DeltaConnectorFactory factory;
+    registerConnector(factory.newConnector(kConnectorId, std::move(config)));
+  }
+
+  std::string createSerializedPayload(const std::vector<int64_t>& deletedRows) 
{
+    RoaringBitmapArray bitmap;
+    for (auto row : deletedRows) {
+      bitmap.addSafe(row);
+    }
+
+    const auto serializedSize = bitmap.serializedSizeInBytes();
+    auto buffer = AlignedBuffer::allocate<char>(serializedSize, pool());
+    bitmap.serialize(buffer->asMutable<char>());
+    return std::string(buffer->as<char>(), serializedSize);
+  }
+
+  std::shared_ptr<HiveDeltaSplit>
+  makeDeltaSplit(const std::string& filePath, const std::string& 
serializedPayload, uint64_t cardinality) {
+    SplitPayloadBufferView payloadView{
+        reinterpret_cast<const uint8_t*>(serializedPayload.data()), 
static_cast<int32_t>(serializedPayload.size())};
+
+    return std::make_shared<HiveDeltaSplit>(
+        kConnectorId,
+        filePath,
+        facebook::velox::dwio::common::FileFormat::DWRF,
+        0,
+        std::numeric_limits<uint64_t>::max(),
+        std::unordered_map<std::string, std::optional<std::string>>{},
+        std::nullopt,
+        std::unordered_map<std::string, std::string>{{"table_format", 
"delta"}},
+        nullptr,
+        std::unordered_map<std::string, std::string>{},
+        true,
+        DeltaDeletionVectorDescriptor::serialized(cardinality, payloadView));
+  }
+};
+
+TEST_F(DeltaConnectorExecutionTest, 
filtersRowsUsingMaterializedDeletionVector) {
+  const auto rowType = ROW({"id"}, {BIGINT()});
+  const auto input = makeRowVector({"id"}, {makeFlatVector<int64_t>({10, 11, 
12, 13, 14, 15, 16, 17, 18, 19})});
+  const auto file = facebook::velox::exec::test::TempFilePath::create();
+  writeToFile(file->getPath(), input);
+
+  const auto plan = facebook::velox::exec::test::PlanBuilder(pool())
+                        .startTableScan()
+                        .connectorId(kConnectorId)
+                        .outputType(rowType)
+                        .endTableScan()
+                        .planNode();
+
+  const auto payload = createSerializedPayload({2, 5, 8});
+  const auto split = makeDeltaSplit(file->getPath(), payload, 3);
+  const auto expected = makeRowVector({"id"}, {makeFlatVector<int64_t>({10, 
11, 13, 14, 16, 17, 19})});
+
+  
facebook::velox::exec::test::AssertQueryBuilder(plan).split(split).assertResults(expected);
+
+  const auto nonMatchingPayload = createSerializedPayload({42});
+  const auto nonMatchingSplit = makeDeltaSplit(file->getPath(), 
nonMatchingPayload, 1);
+  
facebook::velox::exec::test::AssertQueryBuilder(plan).split(nonMatchingSplit).assertResults(input);
+}
+
+} // namespace
+
+} // namespace gluten::delta
+
+int main(int argc, char** argv) {
+  testing::InitGoogleTest(&argc, argv);
+  folly::Init init(&argc, &argv, false);
+  return RUN_ALL_TESTS();
+}
diff --git a/cpp/velox/compute/delta/tests/DeltaDeletionVectorReaderTest.cpp 
b/cpp/velox/compute/delta/tests/DeltaDeletionVectorReaderTest.cpp
new file mode 100644
index 0000000000..f214e427e1
--- /dev/null
+++ b/cpp/velox/compute/delta/tests/DeltaDeletionVectorReaderTest.cpp
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "compute/delta/DeltaDeletionVectorReader.h"
+#include "compute/delta/RoaringBitmapArray.h"
+#include "velox/common/base/tests/GTestUtils.h"
+
+#include <gtest/gtest.h>
+
+using namespace facebook::velox;
+using namespace gluten::delta;
+
+class DeltaDeletionVectorReaderTest : public ::testing::Test {
+ protected:
+  static void SetUpTestSuite() {
+    
memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{});
+  }
+
+  void SetUp() override {
+    pool_ = memory::memoryManager()->addLeafPool();
+  }
+
+  std::string createSerializedPayload(const std::vector<int64_t>& deletedRows) 
{
+    RoaringBitmapArray bitmap;
+    for (auto row : deletedRows) {
+      bitmap.addSafe(row);
+    }
+
+    const auto serializedSize = bitmap.serializedSizeInBytes();
+    auto buffer = AlignedBuffer::allocate<char>(serializedSize, pool_.get());
+    bitmap.serialize(buffer->asMutable<char>());
+    return std::string(buffer->as<char>(), serializedSize);
+  }
+
+  std::shared_ptr<memory::MemoryPool> pool_;
+};
+
+TEST_F(DeltaDeletionVectorReaderTest, LoadSerializedPayload) {
+  auto payload = createSerializedPayload({2, 7, 12});
+
+  DeltaDeletionVectorReader reader;
+  reader.loadSerializedDeletionVector(payload, 3);
+
+  EXPECT_TRUE(reader.isRowDeleted(2));
+  EXPECT_TRUE(reader.isRowDeleted(7));
+  EXPECT_TRUE(reader.isRowDeleted(12));
+  EXPECT_FALSE(reader.isRowDeleted(0));
+  EXPECT_FALSE(reader.isRowDeleted(3));
+  EXPECT_FALSE(reader.isRowDeleted(20));
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, LoadPortablePayload) {
+  // Captured from a Delta 3.3.2 table after `DELETE WHERE id < 10`.
+  const std::vector<uint8_t> payloadBytes = {0xd1, 0xd3, 0x39, 0x64, 0x01, 
0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+                                             0x00, 0x00, 0x00, 0x00, 0x00, 
0x3b, 0x30, 0x00, 0x00, 0x01, 0x00,
+                                             0x00, 0x09, 0x00, 0x01, 0x00, 
0x00, 0x00, 0x09, 0x00};
+
+  DeltaDeletionVectorReader reader;
+  reader.loadSerializedDeletionVector(
+      std::string_view(reinterpret_cast<const char*>(payloadBytes.data()), 
payloadBytes.size()), 10);
+
+  for (uint64_t deleted = 0; deleted < 10; ++deleted) {
+    EXPECT_TRUE(reader.isRowDeleted(deleted));
+  }
+  EXPECT_FALSE(reader.isRowDeleted(10));
+  EXPECT_FALSE(reader.isRowDeleted(100));
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, ApplyDeletionFilter) {
+  auto payload = createSerializedPayload({2, 5, 8});
+
+  DeltaDeletionVectorReader reader;
+  reader.loadSerializedDeletionVector(payload);
+
+  auto deleteBitmap = AlignedBuffer::allocate<uint64_t>(bits::nwords(10), 
pool_.get());
+  reader.applyDeletionFilter(0, 10, deleteBitmap);
+
+  auto* rawBitmap = deleteBitmap->as<uint64_t>();
+  EXPECT_TRUE(bits::isBitSet(rawBitmap, 2));
+  EXPECT_TRUE(bits::isBitSet(rawBitmap, 5));
+  EXPECT_TRUE(bits::isBitSet(rawBitmap, 8));
+  EXPECT_FALSE(bits::isBitSet(rawBitmap, 1));
+  EXPECT_FALSE(bits::isBitSet(rawBitmap, 7));
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, ApplyDeletionFilterWithOffset) {
+  auto payload = createSerializedPayload({10, 15, 20});
+
+  DeltaDeletionVectorReader reader;
+  reader.loadSerializedDeletionVector(payload);
+
+  auto deleteBitmap = AlignedBuffer::allocate<uint64_t>(bits::nwords(15), 
pool_.get());
+  reader.applyDeletionFilter(10, 15, deleteBitmap);
+
+  auto* rawBitmap = deleteBitmap->as<uint64_t>();
+  EXPECT_TRUE(bits::isBitSet(rawBitmap, 0));
+  EXPECT_TRUE(bits::isBitSet(rawBitmap, 5));
+  EXPECT_TRUE(bits::isBitSet(rawBitmap, 10));
+  EXPECT_FALSE(bits::isBitSet(rawBitmap, 1));
+  EXPECT_FALSE(bits::isBitSet(rawBitmap, 14));
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, EmptyReader) {
+  DeltaDeletionVectorReader reader;
+
+  EXPECT_TRUE(reader.empty());
+  EXPECT_FALSE(reader.isRowDeleted(0));
+
+  auto deleteBitmap = AlignedBuffer::allocate<uint64_t>(bits::nwords(10), 
pool_.get());
+  reader.applyDeletionFilter(0, 10, deleteBitmap);
+
+  auto* rawBitmap = deleteBitmap->as<uint64_t>();
+  for (int i = 0; i < 10; ++i) {
+    EXPECT_FALSE(bits::isBitSet(rawBitmap, i));
+  }
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, MultipleLoadsOverwrite) {
+  DeltaDeletionVectorReader reader;
+  reader.loadSerializedDeletionVector(createSerializedPayload({1, 2, 3}));
+  EXPECT_TRUE(reader.isRowDeleted(1));
+  EXPECT_FALSE(reader.isRowDeleted(10));
+
+  reader.loadSerializedDeletionVector(createSerializedPayload({10, 20, 30}));
+  EXPECT_FALSE(reader.isRowDeleted(1));
+  EXPECT_TRUE(reader.isRowDeleted(10));
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, EmptyPayloadThrows) {
+  DeltaDeletionVectorReader reader;
+  VELOX_ASSERT_THROW(reader.loadSerializedDeletionVector(std::string_view()), 
"Serialized deletion vector is empty");
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, CorruptedMagicNumberThrows) {
+  const uint32_t wrongMagic = 12345678;
+  const std::string payload(reinterpret_cast<const char*>(&wrongMagic), 
sizeof(wrongMagic));
+
+  DeltaDeletionVectorReader reader;
+  VELOX_ASSERT_THROW(reader.loadSerializedDeletionVector(payload), "Unexpected 
Delta bitmap array magic number");
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, CardinalityValidationSuccess) {
+  auto payload = createSerializedPayload({1, 2, 3, 4, 5});
+
+  DeltaDeletionVectorReader reader;
+  EXPECT_NO_THROW(reader.loadSerializedDeletionVector(payload, 5));
+  EXPECT_EQ(reader.estimatedDeletedRowCount(), 5);
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, CardinalityValidationMismatchThrows) {
+  auto payload = createSerializedPayload({1, 2, 3, 4, 5});
+
+  DeltaDeletionVectorReader reader;
+  EXPECT_THROW(reader.loadSerializedDeletionVector(payload, 3), 
VeloxUserError);
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, LargeCardinalityValidation) {
+  std::vector<int64_t> deletedRows;
+  for (int64_t i = 0; i < 10000; i += 10) {
+    deletedRows.push_back(i);
+  }
+
+  DeltaDeletionVectorReader reader;
+  reader.loadSerializedDeletionVector(createSerializedPayload(deletedRows), 
1000);
+  EXPECT_EQ(reader.estimatedDeletedRowCount(), 1000);
+}
+
+TEST_F(DeltaDeletionVectorReaderTest, BatchFilteringPartialOverlap) {
+  std::vector<int64_t> deletedRows;
+  for (int64_t i = 45; i <= 55; ++i) {
+    deletedRows.push_back(i);
+  }
+
+  DeltaDeletionVectorReader reader;
+  reader.loadSerializedDeletionVector(createSerializedPayload(deletedRows));
+
+  auto deleteBitmap = AlignedBuffer::allocate<uint64_t>(bits::nwords(20), 
pool_.get());
+  reader.applyDeletionFilter(40, 20, deleteBitmap);
+
+  auto* rawBitmap = deleteBitmap->as<uint64_t>();
+  for (int i = 0; i < 5; ++i) {
+    EXPECT_FALSE(bits::isBitSet(rawBitmap, i));
+  }
+  for (int i = 5; i <= 15; ++i) {
+    EXPECT_TRUE(bits::isBitSet(rawBitmap, i));
+  }
+  for (int i = 16; i < 20; ++i) {
+    EXPECT_FALSE(bits::isBitSet(rawBitmap, i));
+  }
+}
diff --git a/cpp/velox/compute/delta/tests/DeltaSplitTest.cpp 
b/cpp/velox/compute/delta/tests/DeltaSplitTest.cpp
new file mode 100644
index 0000000000..c081b278e2
--- /dev/null
+++ b/cpp/velox/compute/delta/tests/DeltaSplitTest.cpp
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "compute/delta/DeltaSplit.h"
+
+using namespace gluten::delta;
+
+TEST(DeltaSplitTest, DescriptorCarriesPayloadView) {
+  const std::string payload = "payload";
+  gluten::SplitPayloadBufferView payloadView{
+      reinterpret_cast<const uint8_t*>(payload.data()), 
static_cast<int32_t>(payload.size())};
+
+  auto descriptor = DeltaDeletionVectorDescriptor::serialized(3, payloadView);
+
+  ASSERT_TRUE(descriptor.serializedPayloadView.has_value());
+  EXPECT_EQ(descriptor.serializedPayloadView->size, payload.size());
+  EXPECT_EQ(descriptor.cardinality, 3);
+  EXPECT_TRUE(descriptor.hasMaterializedPayload());
+}
+
+TEST(DeltaSplitTest, SplitCarriesDeletionVectorDescriptor) {
+  const std::string payload = "serialized";
+  gluten::SplitPayloadBufferView payloadView{
+      reinterpret_cast<const uint8_t*>(payload.data()), 
static_cast<int32_t>(payload.size())};
+  auto descriptor = DeltaDeletionVectorDescriptor::serialized(2, payloadView);
+
+  auto split = std::make_shared<HiveDeltaSplit>(
+      "test-delta",
+      "/tmp/data.parquet",
+      facebook::velox::dwio::common::FileFormat::PARQUET,
+      0,
+      1024,
+      std::unordered_map<std::string, std::optional<std::string>>{},
+      std::nullopt,
+      std::unordered_map<std::string, std::string>{{"table_format", "delta"}},
+      nullptr,
+      std::unordered_map<std::string, std::string>{},
+      true,
+      descriptor,
+      std::nullopt,
+      DeltaRowIndexFilterType::kIfContained,
+      std::unordered_map<std::string, std::string>{},
+      std::nullopt);
+
+  ASSERT_TRUE(split->deletionVector.has_value());
+  EXPECT_EQ(split->deletionVector->cardinality, 2);
+  ASSERT_TRUE(split->deletionVector->serializedPayloadView.has_value());
+  EXPECT_EQ(split->deletionVector->serializedPayloadView->size, 
payload.size());
+  EXPECT_EQ(split->filterType, DeltaRowIndexFilterType::kIfContained);
+}
+
+TEST(DeltaSplitTest, LogicalRowCountSubtractsDeletionVectorCardinality) {
+  DeltaFileStatistics stats{.numRecords = 10, .tightBounds = true};
+  auto descriptor = DeltaDeletionVectorDescriptor::serialized(3);
+
+  EXPECT_EQ(stats.logicalRowCount(descriptor), 7);
+}
+
+TEST(DeltaSplitTest, LogicalRowCountPreservesUnknownCounts) {
+  DeltaFileStatistics stats{.numRecords = std::nullopt, .tightBounds = 
std::nullopt};
+  auto descriptor = DeltaDeletionVectorDescriptor::serialized(3);
+
+  EXPECT_EQ(stats.logicalRowCount(descriptor), -1);
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to