This is an automated email from the ASF dual-hosted git repository.

gongxun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git


The following commit(s) were added to refs/heads/main by this push:
     new 29d2a2a824d feat: use ColumnEncoding_Kind_DIRECT_DELTA as default in 
offset stream (#1337)
29d2a2a824d is described below

commit 29d2a2a824d85b9580f10622cd7770d82ab0d341
Author: Xun Gong <[email protected]>
AuthorDate: Wed Sep 3 11:29:41 2025 +0800

    feat: use ColumnEncoding_Kind_DIRECT_DELTA as default in offset stream 
(#1337)
    
    * feat: use ColumnEncoding_Kind_DIRECT_DELTA as default in offset stream
    
    Optimize performance of variable-length column offsets by switching from
    Zstd to delta encoding. This approach better compresses incremental integer
    sequences, cutting disk space by more than half while maintaining 
performance.
    
    The following is a comparison of file sizes for different encoding methods 
on TPC-DS 20G:
    
    Name                   PAX(ZSTD)    AOCS_SIZE    PAX(Delta)    PAX SIZE / 
AOCS * 100%
    call_center               12 kB       231 kB      10185 bytes        4.31%
    catalog_page             499 kB       653 kB       393 kB           60.18%
    catalog_returns          240 MB       171 MB       178 MB          104.09%
    catalog_sales           3033 MB      1837 MB      1977 MB          107.63%
    customer                  16 MB        12 MB        12 MB          100.00%
    customer_address        7008 kB      3161 kB      3115 kB           98.54%
    customer_demographics     28 MB      8164 kB      9292 kB          113.82%
    date_dim                3193 kB      1406 kB      1249 kB           88.85%
    household_demographics    42 kB       248 kB        28 kB           11.29%
    income_band            1239 bytes     225 kB      1239 bytes         0.54%
    inventory                 36 MB        71 MB        36 MB           50.70%
    item                    3084 kB      2479 kB      2227 kB           89.84%
    promotion                 27 kB       239 kB        18 kB            7.53%
    reason                 2730 bytes     226 kB      2280 bytes         0.99%
    ship_mode              3894 bytes     227 kB      3315 bytes         1.43%
    store                     23 kB       239 kB        18 kB            7.53%
    store_returns            400 MB       265 MB       277 MB          104.53%
    store_sales             4173 MB      2384 MB      2554 MB          107.12%
    time_dim                1702 kB       819 kB       627 kB           76.56%
    warehouse              5394 bytes     227 kB      4698 bytes         2.02%
    web_page                  21 kB       236 kB        14 kB            5.93%
    web_returns              116 MB        83 MB        85 MB          102.41%
    web_sales               1513 MB       908 MB       982 MB          108.15%
---
 contrib/pax_storage/.gitignore                     |   1 +
 contrib/pax_storage/src/cpp/cmake/pax.cmake        |   1 +
 contrib/pax_storage/src/cpp/cmake/pax_format.cmake |   1 +
 contrib/pax_storage/src/cpp/pax_gbench.cc          | 302 +++++++++++-
 contrib/pax_storage/src/cpp/pax_gbench.h           |  72 +++
 .../src/cpp/storage/columns/pax_column_test.cc     |  15 +-
 .../src/cpp/storage/columns/pax_compress_bench.cc  | 421 +++++++++++++++++
 .../src/cpp/storage/columns/pax_decoding.cc        |   3 +-
 .../src/cpp/storage/columns/pax_delta_encoding.cc  | 511 +++++++++++++++++++++
 .../src/cpp/storage/columns/pax_delta_encoding.h   | 135 ++++++
 .../cpp/storage/columns/pax_delta_encoding_test.cc | 339 ++++++++++++++
 .../src/cpp/storage/columns/pax_dict_encoding.h    |  15 +-
 .../src/cpp/storage/columns/pax_encoding.cc        |   4 +-
 .../src/cpp/storage/columns/pax_encoding.h         |   2 +
 .../columns/pax_encoding_non_fixed_column.cc       |  88 +++-
 .../columns/pax_encoding_non_fixed_column.h        |   3 +
 .../src/cpp/storage/columns/pax_encoding_test.cc   |  92 ++++
 .../src/cpp/storage/columns/pax_rlev2_encoding.h   |   4 +
 .../cpp/storage/columns/pax_vec_encoding_column.cc |   2 +-
 .../cpp/storage/columns/pax_vec_encoding_column.h  |   3 +
 .../pax_storage/src/cpp/storage/micro_partition.h  |   1 -
 .../pax_storage/src/cpp/storage/orc/orc_writer.cc  |  18 +-
 contrib/pax_storage/src/cpp/storage/pax.cc         |  22 +-
 contrib/pax_storage/src/cpp/storage/pax_defined.h  |   2 +-
 .../src/test/regress/expected/gp_toolkit.out       |   2 +-
 25 files changed, 1989 insertions(+), 70 deletions(-)

diff --git a/contrib/pax_storage/.gitignore b/contrib/pax_storage/.gitignore
index 51a328f84e0..87aa2a4a742 100644
--- a/contrib/pax_storage/.gitignore
+++ b/contrib/pax_storage/.gitignore
@@ -12,6 +12,7 @@
 Thumbs.db
 
 # Temp files dir
+bench_data
 .tmp/**
 build*/**
 results/**
diff --git a/contrib/pax_storage/src/cpp/cmake/pax.cmake 
b/contrib/pax_storage/src/cpp/cmake/pax.cmake
index 71775bac2dd..099a66f30d8 100644
--- a/contrib/pax_storage/src/cpp/cmake/pax.cmake
+++ b/contrib/pax_storage/src/cpp/cmake/pax.cmake
@@ -51,6 +51,7 @@ set(pax_storage_src
     storage/columns/pax_dict_encoding.cc
     storage/columns/pax_decoding.cc
     storage/columns/pax_encoding.cc
+    storage/columns/pax_delta_encoding.cc
     storage/columns/pax_rlev2_decoding.cc
     storage/columns/pax_rlev2_encoding.cc
     storage/columns/pax_vec_bitpacked_column.cc
diff --git a/contrib/pax_storage/src/cpp/cmake/pax_format.cmake 
b/contrib/pax_storage/src/cpp/cmake/pax_format.cmake
index 4bdc25671f9..5a12185a0e6 100644
--- a/contrib/pax_storage/src/cpp/cmake/pax_format.cmake
+++ b/contrib/pax_storage/src/cpp/cmake/pax_format.cmake
@@ -41,6 +41,7 @@ set(pax_storage_src
     storage/columns/pax_dict_encoding.cc
     storage/columns/pax_decoding.cc
     storage/columns/pax_encoding.cc
+    storage/columns/pax_delta_encoding.cc
     storage/columns/pax_rlev2_decoding.cc
     storage/columns/pax_rlev2_encoding.cc
     storage/columns/pax_vec_column.cc
diff --git a/contrib/pax_storage/src/cpp/pax_gbench.cc 
b/contrib/pax_storage/src/cpp/pax_gbench.cc
index 82dbaaa7bb2..b6a0ecb0c76 100644
--- a/contrib/pax_storage/src/cpp/pax_gbench.cc
+++ b/contrib/pax_storage/src/cpp/pax_gbench.cc
@@ -25,12 +25,310 @@
  *-------------------------------------------------------------------------
  */
 
+#include "pax_gbench.h"
+
+#include "comm/cbdb_api.h"
+
 #include <benchmark/benchmark.h>
 
-static void example_benchmark(benchmark::State &state) {
+#include <functional>
+#include <memory>
+#include <vector>
+
+#include "access/paxc_rel_options.h"
+#include "comm/cbdb_wrappers.h"
+#include "cpp-stub/src/stub.h"
+#include "storage/micro_partition_iterator.h"
+#include "storage/pax.h"
+#include "storage/strategy.h"
+
+namespace pax::bench {
+
+// Create memory context for benchmark
+void CreateMemoryContext() {
+  MemoryContext test_memory_context = AllocSetContextCreate(
+      (MemoryContext)NULL, "TestMemoryContext", 80 * 1024 * 1024,
+      80 * 1024 * 1024, 80 * 1024 * 1024);
+  MemoryContextSwitchTo(test_memory_context);
+}
+
+// Global registry
+class BenchmarkRegistry {
+ private:
+  std::vector<InitFunction> init_functions_;
+  std::vector<CleanupFunction> cleanup_functions_;
+  bool initialized_ = false;
+
+ public:
+  void RegisterInitFunction(InitFunction func) {
+    init_functions_.push_back(func);
+  }
+
+  void RegisterCleanupFunction(CleanupFunction func) {
+    cleanup_functions_.push_back(func);
+  }
+
+  void RunAllInitFunctions() {
+    if (initialized_) return;
+
+    printf("Running PAX Benchmark Suite...\n");
+    printf("Initializing all benchmark modules...\n\n");
+
+    for (const auto &func : init_functions_) {
+      func();
+    }
+    initialized_ = true;
+  }
+
+  void RunAllCleanupFunctions() {
+    if (!initialized_) return;
+
+    printf("\nCleaning up all benchmark modules...\n");
+
+    // Cleanup functions executed in reverse order
+    for (auto it = cleanup_functions_.rbegin(); it != 
cleanup_functions_.rend();
+         ++it) {
+      (*it)();
+    }
+    initialized_ = false;
+  }
+};
+
+// Global registry access function
+BenchmarkRegistry &GetBenchmarkRegistry() {
+  static BenchmarkRegistry instance;
+  return instance;
+}
+
+// Registration functions
+void RegisterBenchmarkInit(InitFunction func) {
+  GetBenchmarkRegistry().RegisterInitFunction(func);
+}
+
+void RegisterBenchmarkCleanup(CleanupFunction func) {
+  GetBenchmarkRegistry().RegisterCleanupFunction(func);
+}
+
+// Global Mock functions for benchmark framework
+bool MockMinMaxGetStrategyProcinfo(Oid, Oid, Oid *, FmgrInfo *,
+                                   StrategyNumber) {
+  return false;
+}
+
+int32 MockGetFastSequences(Oid) {
+  static int32 mock_id = 0;
+  return mock_id++;
+}
+
+void MockInsertMicroPartitionPlaceHolder(Oid, int) {}
+void MockDeleteMicroPartitionEntry(Oid, Snapshot, int) {}
+void MockExecStoreVirtualTuple(TupleTableSlot *) {}
+
+std::string MockBuildPaxDirectoryPath(RelFileNode rnode, BackendId backend_id) 
{
+  // Create a simple file path for benchmarks
+  return std::string("./bench_data");
+}
+
+std::vector<int> MockGetMinMaxColumnIndexes(Relation) {
+  return std::vector<int>();
+}
+
+std::vector<int> MockBloomFilterColumnIndexes(Relation) {
+  return std::vector<int>();
+}
+
+std::vector<std::tuple<ColumnEncoding_Kind, int>> MockGetRelEncodingOptions(
+    Relation relation) {
+  std::vector<std::tuple<ColumnEncoding_Kind, int>> encoding_opts;
+
+  // Get number of columns from relation
+  int num_columns = 10;  // default for benchmark
+  if (relation && relation->rd_att) {
+    num_columns = relation->rd_att->natts;
+  }
+
+  // Create encoding options for each column (NO_ENCODED, 0)
+  for (int i = 0; i < num_columns; i++) {
+    encoding_opts.emplace_back(
+        std::make_tuple(ColumnEncoding_Kind_NO_ENCODED, 0));
+  }
+
+  return encoding_opts;
+}
+
+// Mock TupleDescInitEntry that doesn't rely on SYSCACHE
+void MockTupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber,
+                            const char *attributeName, Oid oidtypeid,
+                            int32 typmod, int attdim) {
+  // Basic validation
+  if (attributeNumber < 1 || attributeNumber > desc->natts) {
+    return;
+  }
+
+  Form_pg_attribute att = TupleDescAttr(desc, attributeNumber - 1);
+
+  // Set basic attribute properties
+  namestrcpy(&(att->attname), attributeName);
+  att->atttypid = oidtypeid;
+  att->atttypmod = typmod;
+  att->attndims = attdim;
+  att->attnum = attributeNumber;
+  att->attnotnull = false;
+  att->atthasdef = false;
+  att->attidentity = '\0';
+  att->attgenerated = '\0';
+  att->attisdropped = false;
+  att->attislocal = true;
+  att->attinhcount = 0;
+  att->attcollation = InvalidOid;
+
+  // Set type-specific properties based on OID (hardcoded for common types)
+  switch (oidtypeid) {
+    case INT2OID:  // smallint
+      att->attlen = 2;
+      att->attalign = 's';
+      att->attstorage = 'p';
+      att->attbyval = true;
+      break;
+    case INT4OID:  // integer
+      att->attlen = 4;
+      att->attalign = 'i';
+      att->attstorage = TYPSTORAGE_PLAIN;
+      att->attbyval = true;
+      break;
+    case INT8OID:  // bigint
+      att->attlen = 8;
+      att->attalign = 'd';
+      att->attstorage = TYPSTORAGE_PLAIN;
+      att->attbyval = FLOAT8PASSBYVAL;
+      break;
+    case FLOAT8OID:  // double precision
+      att->attlen = 8;
+      att->attalign = 'd';
+      att->attstorage = 'p';
+      att->attbyval = FLOAT8PASSBYVAL;
+      break;
+    case BOOLOID:  // boolean
+      att->attlen = 1;
+      att->attalign = 'c';
+      att->attstorage = 'p';
+      att->attbyval = true;
+      break;
+    case TEXTOID:  // text
+      att->attlen = -1;
+      att->attalign = 'i';
+      att->attstorage = TYPSTORAGE_PLAIN;
+      att->attbyval = false;
+      att->attcollation = DEFAULT_COLLATION_OID;
+      break;
+    case NUMERICOID:  // numeric
+      att->attlen = -1;
+      att->attalign = TYPALIGN_INT;
+      att->attstorage = TYPSTORAGE_PLAIN;
+      att->attbyval = false;
+      break;
+    case TIMESTAMPOID:  // timestamp
+      att->attlen = 8;
+      att->attalign = 'd';
+      att->attstorage = TYPSTORAGE_PLAIN;
+      att->attbyval = FLOAT8PASSBYVAL;
+      break;
+    default:
+      // Default values for unknown types
+      att->attlen = -1;
+      att->attalign = 'i';
+      att->attstorage = 'p';
+      att->attbyval = false;
+      break;
+  }
+}
+
+// Global initialization function for general benchmark framework
+void GlobalBenchmarkInit() {
+  static bool global_initialized = false;
+  if (global_initialized) return;
+
+  printf("Initializing PAX benchmark framework...\n");
+
+  // Initialize memory context
+  MemoryContextInit();
+
+  // Setup global Mock functions
+  static std::unique_ptr<Stub> stub_global = std::make_unique<Stub>();
+
+  stub_global->set(MinMaxGetPgStrategyProcinfo, MockMinMaxGetStrategyProcinfo);
+  stub_global->set(CPaxGetFastSequences, MockGetFastSequences);
+  stub_global->set(cbdb::BuildPaxDirectoryPath, MockBuildPaxDirectoryPath);
+  stub_global->set(cbdb::InsertMicroPartitionPlaceHolder,
+                   MockInsertMicroPartitionPlaceHolder);
+  stub_global->set(cbdb::DeleteMicroPartitionEntry,
+                   MockDeleteMicroPartitionEntry);
+  stub_global->set(cbdb::GetMinMaxColumnIndexes, MockGetMinMaxColumnIndexes);
+  stub_global->set(cbdb::GetBloomFilterColumnIndexes,
+                   MockBloomFilterColumnIndexes);
+  stub_global->set(cbdb::GetRelEncodingOptions, MockGetRelEncodingOptions);
+  stub_global->set(ExecStoreVirtualTuple, MockExecStoreVirtualTuple);
+  stub_global->set(TupleDescInitEntry, MockTupleDescInitEntry);
+
+  // Create basic test directory
+  system("mkdir -p ./bench_data");
+
+  global_initialized = true;
+  printf("PAX benchmark framework initialized.\n");
+}
+
+// Global cleanup function for general benchmark framework
+void GlobalBenchmarkCleanup() {
+  printf("Cleaning up PAX benchmark framework...\n");
+
+  // Clean up test directory
+  // system("rm -rf ./bench_data");
+
+  // Reset memory context
+  if (TopMemoryContext) {
+    MemoryContextReset(TopMemoryContext);
+  }
+
+  printf("PAX benchmark framework cleaned up.\n");
+}
+
+// Example benchmark test
+static void example_benchmark(::benchmark::State &state) {
   for (auto _ : state) {
+    // Empty example test
   }
 }
 BENCHMARK(example_benchmark);
 
-BENCHMARK_MAIN();
\ No newline at end of file
+}  // namespace pax::benchmark
+
+// Global cleanup function (C-style for atexit)
+static void cleanup_all() {
+  pax::bench::GetBenchmarkRegistry().RunAllCleanupFunctions();
+  pax::bench::GlobalBenchmarkCleanup();
+}
+
+// Main entry function
+int main(int argc, char **argv) {
+  // Register global cleanup function
+  std::atexit(cleanup_all);
+
+  // Global initialization
+  pax::bench::GlobalBenchmarkInit();
+
+  // Run all registered initialization functions
+  pax::bench::GetBenchmarkRegistry().RunAllInitFunctions();
+
+  // Initialize benchmark framework
+  ::benchmark::Initialize(&argc, argv);
+  if (::benchmark::ReportUnrecognizedArguments(argc, argv)) return 1;
+
+  printf("\n=== Starting PAX Benchmark Suite ===\n");
+  printf("Use --benchmark_filter=<pattern> to run specific tests\n");
+  printf("Use --benchmark_list_tests to see all available tests\n\n");
+
+  // Run benchmark
+  ::benchmark::RunSpecifiedBenchmarks();
+
+  return 0;
+}
\ No newline at end of file
diff --git a/contrib/pax_storage/src/cpp/pax_gbench.h 
b/contrib/pax_storage/src/cpp/pax_gbench.h
new file mode 100644
index 00000000000..44376022693
--- /dev/null
+++ b/contrib/pax_storage/src/cpp/pax_gbench.h
@@ -0,0 +1,72 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * pax_gbench.h
+ *
+ * IDENTIFICATION
+ *       contrib/pax_storage/src/cpp/pax_gbench.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#pragma once
+
+#include <functional>
+#include <benchmark/benchmark.h>
+
+namespace pax {
+
+namespace bench {
+
+// Generic initialization and cleanup function types
+using InitFunction = std::function<void()>;
+using CleanupFunction = std::function<void()>;
+
+// Create memory context for benchmark
+extern void CreateMemoryContext();
+
+// Forward declaration
+class BenchmarkRegistry;
+
+// Global registry access function
+BenchmarkRegistry &GetBenchmarkRegistry();
+
+// Global initialization and cleanup functions
+void GlobalBenchmarkInit();
+void GlobalBenchmarkCleanup();
+
+// Registration functions (implemented in pax_gbench.cc)
+void RegisterBenchmarkInit(InitFunction func);
+void RegisterBenchmarkCleanup(CleanupFunction func);
+
+}  // namespace benchmark
+}  // namespace pax
+
+// Convenient registration macros
+#define REGISTER_BENCHMARK_INIT(func)               \
+  static bool BENCHMARK_INIT_##__COUNTER__ = []() { \
+    pax::bench::RegisterBenchmarkInit(func);    \
+    return true;                                    \
+  }()
+
+#define REGISTER_BENCHMARK_CLEANUP(func)               \
+  static bool BENCHMARK_CLEANUP_##__COUNTER__ = []() { \
+    pax::bench::RegisterBenchmarkCleanup(func);    \
+    return true;                                       \
+  }()
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_column_test.cc 
b/contrib/pax_storage/src/cpp/storage/columns/pax_column_test.cc
index dfd346ef615..b26fdff65bf 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_column_test.cc
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_column_test.cc
@@ -697,7 +697,6 @@ TEST_P(PaxNonFixedColumnCompressTest,
   auto number = ::testing::get<0>(GetParam());
   auto kind = ::testing::get<1>(GetParam());
   auto verify_range = ::testing::get<2>(GetParam());
-  auto enable_offsets_encoding = ::testing::get<2>(GetParam());
   const size_t number_of_rows = 1024;
 
   PaxEncoder::EncodingOption encoding_option;
@@ -705,10 +704,9 @@ TEST_P(PaxNonFixedColumnCompressTest,
   encoding_option.compress_level = 5;
   encoding_option.is_sign = true;
 
-  if (enable_offsets_encoding) {
-    encoding_option.offsets_encode_type = kind;
-    encoding_option.offsets_compress_level = 5;
-  }
+  encoding_option.offsets_encode_type =
+      ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+  encoding_option.offsets_compress_level = 5;
 
   non_fixed_column = new PaxNonFixedEncodingColumn(
       number_of_rows, number_of_rows, std::move(encoding_option));
@@ -744,10 +742,9 @@ TEST_P(PaxNonFixedColumnCompressTest,
   decoding_option.is_sign = true;
   decoding_option.compress_level = 5;
 
-  if (enable_offsets_encoding) {
-    decoding_option.offsets_encode_type = kind;
-    decoding_option.offsets_compress_level = 5;
-  }
+  decoding_option.offsets_encode_type =
+      ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+  decoding_option.offsets_compress_level = 5;
 
   auto non_fixed_column_for_read = new PaxNonFixedEncodingColumn(
       number_of_rows * number, sizeof(int32) * number_of_rows,
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_compress_bench.cc 
b/contrib/pax_storage/src/cpp/storage/columns/pax_compress_bench.cc
new file mode 100644
index 00000000000..0a792601e99
--- /dev/null
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_compress_bench.cc
@@ -0,0 +1,421 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * pax_compress_bench.cc
+ *
+ * IDENTIFICATION
+ *   contrib/pax_storage/src/cpp/storage/columns/pax_compress_bench.cc
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#include <algorithm>
+#include <cerrno>
+#include <chrono>
+#include <cstdint>
+#include <cstring>
+#include <fstream>
+#include <iostream>
+#include <memory>
+#include <random>
+#include <vector>
+
+#include <cstdio>
+#include <unistd.h>
+
+#include "comm/cbdb_wrappers.h"
+#include "comm/pax_memory.h"
+#include "pax_gbench.h"
+#include "storage/columns/pax_compress.h"
+#include "storage/columns/pax_decoding.h"
+#include "storage/columns/pax_delta_encoding.h"
+#include "storage/columns/pax_rlev2_encoding.h"
+#include "storage/pax_buffer.h"
+
+namespace pax::bench {
+
+namespace {
+
+// Test data and prebuilt buffers for decode/decompress benchmarks
+static const size_t kCount = 1024 * 1024;
+static std::vector<uint32_t> g_offsets;
+static std::unique_ptr<char[]> g_raw_bytes;
+static size_t g_raw_len = 0;
+
+static std::vector<char> g_rle_encoded;
+static size_t g_rle_len = 0;
+
+static std::vector<char> g_delta_encoded;
+static size_t g_delta_len = 0;
+
+static std::unique_ptr<char[]> g_zstd_compressed;
+static size_t g_zstd_len = 0;
+
+static std::shared_ptr<pax::PaxCompressor> g_zstd;
+
+// Simple helpers for bench data persistence
+static void EnsureDirExists(const char *dir_path) {
+  if (mkdir(dir_path, 0755) != 0) {
+    if (errno != EEXIST) {
+      std::cerr << "Failed to create directory: " << dir_path << std::endl;
+      std::abort();
+    }
+  }
+}
+
+static bool ReadWholeFile(const char *path, std::vector<char> &out) {
+  std::ifstream in(path, std::ios::binary);
+  if (!in.is_open()) return false;
+  in.seekg(0, std::ios::end);
+  std::streampos size = in.tellg();
+  if (size <= 0) return false;
+  out.resize(static_cast<size_t>(size));
+  in.seekg(0, std::ios::beg);
+  in.read(out.data(), size);
+  return static_cast<bool>(in);
+}
+
+static bool ReadWholeFile(const char *path, std::unique_ptr<char[]> &out,
+                          size_t &out_len) {
+  std::ifstream in(path, std::ios::binary);
+  if (!in.is_open()) return false;
+  in.seekg(0, std::ios::end);
+  std::streampos size = in.tellg();
+  if (size <= 0) return false;
+  out_len = static_cast<size_t>(size);
+  out = std::make_unique<char[]>(out_len);
+  in.seekg(0, std::ios::beg);
+  in.read(out.get(), size);
+  return static_cast<bool>(in);
+}
+
+static void WriteWholeFile(const char *path, const char *data, size_t len) {
+  std::ofstream out(path, std::ios::binary | std::ios::trunc);
+  if (!out.is_open()) {
+    std::cerr << "Failed to open file for write: " << path << std::endl;
+    std::abort();
+  }
+  out.write(data, static_cast<std::streamsize>(len));
+  if (!out) {
+    std::cerr << "Failed to write file: " << path << std::endl;
+    std::abort();
+  }
+}
+
+static const char *kBenchDataDir = "bench_data";
+static const char *kRLEV2Path = "bench_data/rle_v2_u32.bin";
+static const char *kDeltaPath = "bench_data/delta_u32.bin";
+static const char *kZSTDPath = "bench_data/zstd_u32.bin";
+static const char *kRawPath = "bench_data/raw_u32.bin";
+
+static std::vector<uint32_t> GenerateMonotonicOffsets(size_t n, uint32_t seed) 
{
+  std::vector<uint32_t> offsets;
+  offsets.resize(n);
+  offsets[0] = 0;
+  std::mt19937 rng(seed);
+  std::uniform_int_distribution<int> step_dist(1, 256);
+  for (size_t i = 1; i < n; ++i) {
+    offsets[i] = offsets[i - 1] + static_cast<uint32_t>(step_dist(rng));
+  }
+  return offsets;
+}
+
+// Lazily ensure raw bytes are available (prefer loading from disk)
+static void EnsureRawData() {
+  if (g_raw_len != 0 && g_raw_bytes) return;
+  EnsureDirExists(kBenchDataDir);
+  std::vector<char> raw_from_file;
+  if (ReadWholeFile(kRawPath, raw_from_file)) {
+    g_raw_len = raw_from_file.size();
+    g_raw_bytes = std::make_unique<char[]>(g_raw_len);
+    std::memcpy(g_raw_bytes.get(), raw_from_file.data(), g_raw_len);
+    return;
+  }
+  // Fallback: generate and persist
+  g_offsets = GenerateMonotonicOffsets(kCount, /*seed=*/12345);
+  g_raw_len = g_offsets.size() * sizeof(uint32_t);
+  g_raw_bytes = std::make_unique<char[]>(g_raw_len);
+  std::memcpy(g_raw_bytes.get(), g_offsets.data(), g_raw_len);
+  WriteWholeFile(kRawPath, g_raw_bytes.get(), g_raw_len);
+}
+
+// Lazily ensure RLEv2 encoded buffer exists (load or build from raw)
+static void EnsureRleEncoded() {
+  if (g_rle_len != 0 && !g_rle_encoded.empty()) return;
+  EnsureDirExists(kBenchDataDir);
+  if (ReadWholeFile(kRLEV2Path, g_rle_encoded)) {
+    g_rle_len = g_rle_encoded.size();
+    return;
+  }
+  EnsureRawData();
+  PaxEncoder::EncodingOption enc_opt;
+  enc_opt.column_encode_type = ColumnEncoding_Kind_RLE_V2;
+  enc_opt.is_sign = false;
+
+  PaxOrcEncoder rle_encoder(enc_opt);
+  auto rle_out = std::make_shared<DataBuffer<char>>(g_raw_len);
+  rle_encoder.SetDataBuffer(rle_out);
+  // encode directly from raw bytes to avoid depending on g_offsets
+  size_t count = g_raw_len / sizeof(uint32_t);
+  const uint32_t *vals = reinterpret_cast<const uint32_t *>(g_raw_bytes.get());
+  for (size_t i = 0; i < count; ++i) {
+    uint32_t v = vals[i];
+    rle_encoder.Append(reinterpret_cast<char *>(&v), sizeof(uint32_t));
+  }
+  rle_encoder.Flush();
+
+  g_rle_len = rle_encoder.GetBufferSize();
+  g_rle_encoded.assign(rle_encoder.GetBuffer(),
+                       rle_encoder.GetBuffer() + g_rle_len);
+  WriteWholeFile(kRLEV2Path, g_rle_encoded.data(), g_rle_len);
+}
+
+// Lazily ensure Delta encoded buffer exists (load or build from raw)
+static void EnsureDeltaEncoded() {
+  if (g_delta_len != 0 && !g_delta_encoded.empty()) return;
+  EnsureDirExists(kBenchDataDir);
+  if (ReadWholeFile(kDeltaPath, g_delta_encoded)) {
+    g_delta_len = g_delta_encoded.size();
+    return;
+  }
+  EnsureRawData();
+  PaxEncoder::EncodingOption enc_opt;
+  enc_opt.is_sign = false;
+  // type not used by PaxDeltaEncoder
+  PaxDeltaEncoder<uint32_t> delta_encoder(enc_opt);
+  auto delta_out = std::make_shared<DataBuffer<char>>(g_raw_len);
+  delta_encoder.SetDataBuffer(delta_out);
+  // Encode whole array in one shot
+  delta_encoder.Append(g_raw_bytes.get(), g_raw_len);
+  delta_encoder.Flush();
+
+  g_delta_len = delta_encoder.GetBufferSize();
+  g_delta_encoded.assign(delta_encoder.GetBuffer(),
+                         delta_encoder.GetBuffer() + g_delta_len);
+  WriteWholeFile(kDeltaPath, g_delta_encoded.data(), g_delta_len);
+}
+
+// Lazily ensure ZSTD compressed buffer exists (load or build from raw)
+static void EnsureZstdCompressed() {
+  EnsureDirExists(kBenchDataDir);
+  if (!g_zstd) {
+    g_zstd =
+        
PaxCompressor::CreateBlockCompressor(ColumnEncoding_Kind_COMPRESS_ZSTD);
+    if (!g_zstd) {
+      std::cerr << "Failed to create ZSTD compressor" << std::endl;
+      std::abort();
+    }
+  }
+  if (g_zstd_len != 0 && g_zstd_compressed) return;
+  if (ReadWholeFile(kZSTDPath, g_zstd_compressed, g_zstd_len)) {
+    return;
+  }
+  EnsureRawData();
+  size_t bound = g_zstd->GetCompressBound(g_raw_len);
+  g_zstd_compressed = std::make_unique<char[]>(bound);
+  g_zstd_len = g_zstd->Compress(g_zstd_compressed.get(), bound,
+                                g_raw_bytes.get(), g_raw_len, /*lvl=*/5);
+  if (g_zstd->IsError(g_zstd_len) || g_zstd_len == 0) {
+    std::cerr << "ZSTD one-time compress failed" << std::endl;
+    std::abort();
+  }
+  WriteWholeFile(kZSTDPath, g_zstd_compressed.get(), g_zstd_len);
+}
+
+static void PrepareOnce() {
+  pax::bench::CreateMemoryContext();
+  EnsureDirExists(kBenchDataDir);
+}
+
+static void CleanupBenchData() {
+  const char *files[] = {kRLEV2Path, kDeltaPath, kZSTDPath, kRawPath};
+  for (const char *p : files) {
+    std::remove(p);
+  }
+
+  rmdir(kBenchDataDir);
+}
+
+}  // namespace
+
+// Register module init with gbench framework
+REGISTER_BENCHMARK_INIT(PrepareOnce);
+REGISTER_BENCHMARK_CLEANUP(CleanupBenchData);
+
+// RLEv2 encode benchmark
+static void BM_RLEV2_Encode(::benchmark::State &state) {
+  // Prepare raw data only; no encoded buffers are created here
+  EnsureRawData();
+  for (auto _ : state) {
+    PaxEncoder::EncodingOption enc_opt;
+    enc_opt.column_encode_type = ColumnEncoding_Kind_RLE_V2;
+    enc_opt.is_sign = false;
+
+    PaxOrcEncoder encoder(enc_opt);
+    auto out = std::make_shared<DataBuffer<char>>(g_raw_len);
+    encoder.SetDataBuffer(out);
+
+    size_t count = g_raw_len / sizeof(uint32_t);
+    const uint32_t *vals =
+        reinterpret_cast<const uint32_t *>(g_raw_bytes.get());
+    for (size_t i = 0; i < count; ++i) {
+      uint32_t v = vals[i];
+      encoder.Append(reinterpret_cast<char *>(&v), sizeof(uint32_t));
+    }
+    encoder.Flush();
+    g_rle_len = encoder.GetBufferSize();
+    benchmark::DoNotOptimize(encoder.GetBuffer());
+    benchmark::ClobberMemory();
+  }
+  state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *
+                          static_cast<int64_t>(g_raw_len));
+  state.counters["raw_kb"] =
+      benchmark::Counter(static_cast<double>(g_raw_len) / (1024.0));
+  state.counters["rle_kb"] =
+      benchmark::Counter(static_cast<double>(g_rle_len) / (1024.0));
+}
+BENCHMARK(BM_RLEV2_Encode);
+
+// RLEv2 decode benchmark
+static void BM_RLEV2_Decode(::benchmark::State &state) {
+  // Ensure we have raw size and encoded buffer ready (prefer from disk)
+  EnsureRawData();
+  EnsureRleEncoded();
+  for (auto _ : state) {
+    PaxDecoder::DecodingOption dec_opt;
+    dec_opt.column_encode_type = ColumnEncoding_Kind_RLE_V2;
+    dec_opt.is_sign = false;
+
+    auto decoder = PaxDecoder::CreateDecoder<int32>(dec_opt);
+    auto out = std::make_shared<DataBuffer<char>>(g_raw_len);
+    decoder->SetSrcBuffer(g_rle_encoded.data(), g_rle_len);
+    decoder->SetDataBuffer(out);
+    size_t n = decoder->Decoding();
+    benchmark::DoNotOptimize(n);
+    benchmark::ClobberMemory();
+  }
+  state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *
+                          static_cast<int64_t>(g_raw_len));
+}
+BENCHMARK(BM_RLEV2_Decode);
+
+// Delta encode benchmark
+static void BM_Delta_Encode(::benchmark::State &state) {
+  EnsureRawData();
+  for (auto _ : state) {
+    PaxEncoder::EncodingOption enc_opt;
+    enc_opt.is_sign = false;
+    PaxDeltaEncoder<uint32_t> encoder(enc_opt);
+    auto out = std::make_shared<DataBuffer<char>>(g_raw_len);
+    encoder.SetDataBuffer(out);
+    encoder.Append(g_raw_bytes.get(), g_raw_len);
+    encoder.Flush();
+    g_delta_len = encoder.GetBufferSize();
+    benchmark::DoNotOptimize(encoder.GetBuffer());
+    benchmark::ClobberMemory();
+  }
+  state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *
+                          static_cast<int64_t>(g_raw_len));
+  state.counters["delta_kb"] =
+      benchmark::Counter(static_cast<double>(g_delta_len) / (1024.0));
+}
+BENCHMARK(BM_Delta_Encode);
+
+// Delta decode benchmark
+static void BM_Delta_Decode(::benchmark::State &state) {
+  EnsureRawData();
+  EnsureDeltaEncoded();
+  for (auto _ : state) {
+    PaxDecoder::DecodingOption dec_opt;
+    dec_opt.is_sign = false;
+    dec_opt.column_encode_type = ColumnEncoding_Kind_DIRECT_DELTA;
+    PaxDeltaDecoder<int32> decoder(dec_opt);
+    auto out = std::make_shared<DataBuffer<char>>(g_raw_len);
+    decoder.SetSrcBuffer(g_delta_encoded.data(), g_delta_len);
+    decoder.SetDataBuffer(out);
+    size_t n = decoder.Decoding();
+    if (n != g_raw_len / sizeof(uint32_t) && out->Used() != g_raw_len) {
+      std::cerr << "Delta decode failed, n: " << n
+                << ", g_raw_len: " << g_raw_len
+                << ", g_delta_len: " << g_delta_len
+                << ", out: Used: " << out->Used() << std::endl;
+      std::abort();
+    }
+
+    if (memcmp(out->GetBuffer(), g_raw_bytes.get(), g_raw_len) != 0) {
+      std::cerr << "Delta decode failed, out: " << out->GetBuffer()
+                << ", g_raw_bytes: " << g_raw_bytes.get() << std::endl;
+      std::abort();
+    }
+
+    benchmark::DoNotOptimize(n);
+    benchmark::ClobberMemory();
+  }
+  state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *
+                          static_cast<int64_t>(g_raw_len));
+}
+BENCHMARK(BM_Delta_Decode);
+
+// ZSTD compress benchmark
+static void BM_ZSTD_Compress(::benchmark::State &state) {
+  EnsureRawData();
+  if (!g_zstd) {
+    g_zstd =
+        
PaxCompressor::CreateBlockCompressor(ColumnEncoding_Kind_COMPRESS_ZSTD);
+    if (!g_zstd) {
+      std::cerr << "Failed to create ZSTD compressor" << std::endl;
+      std::abort();
+    }
+  }
+  size_t bound = g_zstd->GetCompressBound(g_raw_len);
+  std::unique_ptr<char[]> dst(new char[bound]);
+  for (auto _ : state) {
+    size_t n = g_zstd->Compress(dst.get(), bound, g_raw_bytes.get(), g_raw_len,
+                                /*lvl=*/5);
+    g_zstd_len = n;
+    benchmark::DoNotOptimize(n);
+    benchmark::ClobberMemory();
+  }
+  state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *
+                          static_cast<int64_t>(g_raw_len));
+  state.counters["zstd_kb"] =
+      benchmark::Counter(static_cast<double>(g_zstd_len) / (1024.0));
+}
+BENCHMARK(BM_ZSTD_Compress);
+
+// ZSTD decompress benchmark
+static void BM_ZSTD_Decompress(::benchmark::State &state) {
+  EnsureRawData();
+  EnsureZstdCompressed();
+  std::unique_ptr<char[]> dst(new char[g_raw_len]);
+  for (auto _ : state) {
+    size_t n = g_zstd->Decompress(dst.get(), g_raw_len, 
g_zstd_compressed.get(),
+                                  g_zstd_len);
+    benchmark::DoNotOptimize(n);
+    benchmark::ClobberMemory();
+  }
+  state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *
+                          static_cast<int64_t>(g_raw_len));
+}
+BENCHMARK(BM_ZSTD_Decompress);
+
+}  // namespace pax::bench
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_decoding.cc 
b/contrib/pax_storage/src/cpp/storage/columns/pax_decoding.cc
index 7ba0fcd6768..0e15ec52088 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_decoding.cc
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_decoding.cc
@@ -31,6 +31,7 @@
 #include "comm/pax_memory.h"
 #include "storage/columns/pax_dict_encoding.h"
 #include "storage/columns/pax_rlev2_decoding.h"
+#include "storage/columns/pax_delta_encoding.h"
 
 namespace pax {
 
@@ -47,7 +48,7 @@ std::shared_ptr<PaxDecoder> PaxDecoder::CreateDecoder(const 
DecodingOption &deco
       break;
     }
     case ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA: {
-      /// TODO(jiaqizho) support it
+      decoder = std::make_shared<PaxDeltaDecoder<T>>(decoder_options);
       break;
     }
     case ColumnEncoding_Kind::ColumnEncoding_Kind_DICTIONARY: {
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc 
b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc
new file mode 100644
index 00000000000..3f4b5341c4a
--- /dev/null
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc
@@ -0,0 +1,511 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * pax_delta_encoding.cc
+ *
+ * IDENTIFICATION
+ *       contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "storage/columns/pax_delta_encoding.h"
+
+#include <algorithm>
+#include <cstring>
+#include <vector>
+
+namespace pax {
+
+// delta bitpack encoder
+template <typename T>
+PaxDeltaEncoder<T>::PaxDeltaEncoder(const EncodingOption &encoder_options)
+    : PaxEncoder(encoder_options) {}
+
+template <typename T>
+void PaxDeltaEncoder<T>::Append(char *data, size_t size) {
+  CBDB_CHECK(!has_append_, cbdb::CException::kExTypeAbort,
+             fmt("PaxDeltaEncoder::Append only support Append Once"));
+  has_append_ = true;
+
+  auto T_data = reinterpret_cast<T *>(data);
+  auto T_data_len = size / sizeof(T);
+  Encode(T_data, T_data_len);
+}
+
+inline uint8_t NumBitsAllowZero(uint32_t value) {
+  if (value == 0) return 0;
+  uint8_t bits = 0;
+  while (value) {
+    bits++;
+    value >>= 1;
+  }
+  return bits;
+}
+
+// Fast bit width calculation (0 -> 0)
+inline static uint8_t FastNumBits(uint32_t v) {
+#if defined(__GNUC__) || defined(__clang__)
+  return v == 0 ? 0 : static_cast<uint8_t>(32 - __builtin_clz(v));
+#else
+  uint8_t bits = 0;
+  while (v) {
+    ++bits;
+    v >>= 1;
+  }
+  return bits;
+#endif
+}
+
+// 64-bit bit writer based on raw pointer (writes to reserved DataBuffer range)
+struct BitWriter64Ptr {
+  uint8_t *out;
+  size_t index;
+  uint64_t bit_buffer;
+  uint32_t bit_count;
+
+  BitWriter64Ptr(uint8_t *p) : out(p), index(0), bit_buffer(0), bit_count(0) {}
+
+  inline void Append(uint32_t value, uint8_t width) {
+    if (width == 0) return;
+    bit_buffer |= (static_cast<uint64_t>(value) << bit_count);
+    bit_count += width;
+    while (bit_count >= 8) {
+      out[index++] = static_cast<uint8_t>(bit_buffer & 0xFF);
+      bit_buffer >>= 8;
+      bit_count -= 8;
+    }
+  }
+
+  inline void FlushToByte() {
+    if (bit_count > 0) {
+      out[index++] = static_cast<uint8_t>(bit_buffer & 0xFF);
+      bit_buffer = 0;
+      bit_count = 0;
+    }
+  }
+};
+
+// 64-bit bit reader based on raw pointer (limited to specified payload bytes)
+struct BitReader64Ptr {
+  const uint8_t *in;
+  size_t size;
+  size_t index;
+  uint64_t bit_buffer;
+  uint32_t bit_count;
+
+  BitReader64Ptr(const uint8_t *p, size_t len)
+      : in(p), size(len), index(0), bit_buffer(0), bit_count(0) {}
+
+  inline void Ensure(uint32_t need_bits) {
+    while (bit_count < need_bits && index < size) {
+      bit_buffer |= (static_cast<uint64_t>(in[index]) << bit_count);
+      ++index;
+      bit_count += 8;
+    }
+  }
+
+  inline uint32_t Read(uint8_t width) {
+    if (width == 0) return 0;
+    Ensure(width);
+    uint32_t result;
+    if (width == 32)
+      result = static_cast<uint32_t>(bit_buffer & 0xFFFFFFFFull);
+    else
+      result = static_cast<uint32_t>(bit_buffer & ((1ull << width) - 1));
+    bit_buffer >>= width;
+    bit_count -= width;
+    return result;
+  }
+
+  inline void AlignToByte() {
+    uint32_t drop = bit_count % 8;
+    if (drop) {
+      bit_buffer >>= drop;
+      bit_count -= drop;
+    }
+  }
+};
+
+/*
+Overall layout:
+  DeltaBlockHeader (struct, fixed-size)
+    - uint32 value_per_block
+    - uint32 values_per_mini_block
+    - uint32 total_count
+  T first_value
+  [Repeated Block until total_count is exhausted]
+    - uint32 min_delta
+    - uint8  bit_widths[ mini_blocks_per_block ]
+    - uint8  payload[computed from bit_widths]
+            // bit-packed adjusted deltas, mini-block by mini-block
+            // within a block: bits are written MSB-first, end aligned to byte
+*/
+
+template <typename T>
+size_t PaxDeltaEncoder<T>::GetBoundSize(size_t src_len) const {
+  size_t value_count = src_len / sizeof(T);
+  size_t block_count = (value_count + value_per_block_ - 1) / value_per_block_;
+  /* header + first_value + block_count * (min_delta + bit_widths )
+   * + payload was eliminated to value_count*/
+  return sizeof(DeltaBlockHeader) + sizeof(T) +
+         block_count * (sizeof(uint32) + mini_blocks_per_block_) + value_count;
+}
+
+template <typename T>
+void PaxDeltaEncoder<T>::Encode(T *data, size_t count) {
+  // Estimate allocation: by element byte count, sufficient to accommodate
+  // header and bit stream
+  if (result_buffer_->Capacity() <
+      count * sizeof(T) + sizeof(DeltaBlockHeader) + sizeof(T)) {
+    result_buffer_->ReSize(count * sizeof(T) + sizeof(DeltaBlockHeader) +
+                           sizeof(T));
+  }
+
+  DeltaBlockHeader header;
+  header.value_per_block = value_per_block_;
+  header.values_per_mini_block = values_per_mini_block_;
+  header.total_count = count;
+  // add delta block header
+  result_buffer_->Write(reinterpret_cast<char *>(&header), sizeof(header));
+  result_buffer_->Brush(sizeof(header));
+  // add base value
+  result_buffer_->Write(reinterpret_cast<char *>(&data[0]), sizeof(data[0]));
+  result_buffer_->Brush(sizeof(data[0]));
+
+  size_t values_emitted = 1;
+  T previous_value = data[0];
+
+  while (values_emitted < count) {
+    uint32_t values_in_block = std::min(
+        value_per_block_, static_cast<uint32_t>(count - values_emitted));
+
+    if (deltas_scratch_.size() < values_in_block) {
+      deltas_scratch_.resize(values_in_block);
+    }
+    uint32_t *deltas = deltas_scratch_.data();
+    uint32_t min_delta = UINT32_MAX;
+    uint32_t mini_max[mini_blocks_per_block_] = {0};
+
+    for (uint32_t i = 0; i < values_in_block; ++i) {
+      T current = data[values_emitted + i];
+      uint32_t delta = static_cast<uint32_t>(current - previous_value);
+      deltas[i] = delta;
+      previous_value = current;
+      if (delta < min_delta) min_delta = delta;
+      uint32_t mini_index = i / values_per_mini_block_;
+      if (delta > mini_max[mini_index]) mini_max[mini_index] = delta;
+    }
+
+    // write block header: min_delta later
+    uint8_t bit_widths[mini_blocks_per_block_] = {0};
+    uint64_t total_bits = 0;
+    for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) {
+      uint32_t start = i * values_per_mini_block_;
+      if (start >= values_in_block) {
+        bit_widths[i] = 0;
+        continue;
+      }
+      uint32_t adjusted_max = mini_max[i] - min_delta;
+      uint8_t w = FastNumBits(adjusted_max);
+      bit_widths[i] = w;
+      uint32_t end = std::min(start + values_per_mini_block_, values_in_block);
+      total_bits += static_cast<uint64_t>(w) * (end - start);
+    }
+    uint32_t payload_bytes = static_cast<uint32_t>((total_bits + 7) / 8);
+
+    size_t need_size =
+        payload_bytes + mini_blocks_per_block_ + sizeof(min_delta);
+
+    // Grows the buffer to be at least need_size bytes. To avoid frequent
+    // resizing, the new capacity is calculated as the maximum of (current
+    // capacity * 1.5) or (current capacity + need_size).
+    if (result_buffer_->Available() < need_size) {
+      size_t inc_size = need_size > (result_buffer_->Capacity() * 0.5)
+                            ? need_size
+                            : result_buffer_->Capacity() * 0.5;
+      result_buffer_->ReSize(result_buffer_->Capacity() + inc_size);
+    }
+
+    // write block header: min_delta
+    result_buffer_->Write(reinterpret_cast<char *>(&min_delta),
+                          sizeof(min_delta));
+    result_buffer_->Brush(sizeof(min_delta));
+
+    // write bit_widths
+    result_buffer_->Write(reinterpret_cast<char *>(bit_widths),
+                          mini_blocks_per_block_);
+    result_buffer_->Brush(mini_blocks_per_block_);
+
+    uint8_t *payload_ptr =
+        reinterpret_cast<uint8_t *>(result_buffer_->GetAvailableBuffer());
+    BitWriter64Ptr bw(payload_ptr);
+    for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) {
+      uint32_t start = i * values_per_mini_block_;
+      if (start >= values_in_block) break;
+      uint32_t end = std::min(start + values_per_mini_block_, values_in_block);
+      uint8_t w = bit_widths[i];
+      if (w == 0) continue;
+      for (uint32_t j = start; j < end; ++j) {
+        uint32_t adjusted = deltas[j] - min_delta;
+        bw.Append(adjusted, w);
+      }
+    }
+    bw.FlushToByte();
+    result_buffer_->Brush(payload_bytes);
+
+    values_emitted += values_in_block;
+  }
+}
+
+template <typename T>
+bool PaxDeltaEncoder<T>::SupportAppendNull() const {
+  return false;
+}
+
+template <typename T>
+void PaxDeltaEncoder<T>::Flush() {
+  // do nothing
+}
+
+// Specialized reading of one mini-block and batch writing results
+// (BitReader64Ptr)
+template <typename T>
+inline void ReadMiniBlockSpecializedPtr(BitReader64Ptr &br, T *out_values,
+                                        T &current_value, uint32_t count_in_mb,
+                                        uint32_t min_delta, uint8_t w) {
+  switch (w) {
+    case 0: {
+      for (uint32_t j = 0; j < count_in_mb; ++j) {
+        current_value =
+            static_cast<T>(static_cast<uint64_t>(current_value) + min_delta);
+        out_values[j] = current_value;
+      }
+      return;
+    }
+    case 8: {
+      for (uint32_t j = 0; j < count_in_mb; ++j) {
+        uint32_t adjusted = br.Read(8);
+        current_value = static_cast<T>(static_cast<uint64_t>(current_value) +
+                                       adjusted + min_delta);
+        out_values[j] = current_value;
+      }
+      return;
+    }
+    case 16: {
+      for (uint32_t j = 0; j < count_in_mb; ++j) {
+        uint32_t adjusted = br.Read(16);
+        current_value = static_cast<T>(static_cast<uint64_t>(current_value) +
+                                       adjusted + min_delta);
+        out_values[j] = current_value;
+      }
+      return;
+    }
+    case 32: {
+      for (uint32_t j = 0; j < count_in_mb; ++j) {
+        uint32_t adjusted = br.Read(32);
+        current_value = static_cast<T>(static_cast<uint64_t>(current_value) +
+                                       adjusted + min_delta);
+        out_values[j] = current_value;
+      }
+      return;
+    }
+    default: {
+      uint32_t j = 0;
+      const uint32_t n4 = count_in_mb & ~3u;
+      for (; j < n4; j += 4) {
+        uint32_t a0 = br.Read(w);
+        uint32_t a1 = br.Read(w);
+        uint32_t a2 = br.Read(w);
+        uint32_t a3 = br.Read(w);
+        current_value = static_cast<T>(static_cast<uint64_t>(current_value) +
+                                       a0 + min_delta);
+        out_values[j] = current_value;
+        current_value = static_cast<T>(static_cast<uint64_t>(current_value) +
+                                       a1 + min_delta);
+        out_values[j + 1] = current_value;
+        current_value = static_cast<T>(static_cast<uint64_t>(current_value) +
+                                       a2 + min_delta);
+        out_values[j + 2] = current_value;
+        current_value = static_cast<T>(static_cast<uint64_t>(current_value) +
+                                       a3 + min_delta);
+        out_values[j + 3] = current_value;
+      }
+      for (; j < count_in_mb; ++j) {
+        uint32_t a = br.Read(w);
+        current_value = static_cast<T>(static_cast<uint64_t>(current_value) +
+                                       a + min_delta);
+        out_values[j] = current_value;
+      }
+      return;
+    }
+  }
+}
+
+// Specialized reading of one mini-block and batch writing results
+template <typename T>
+PaxDeltaDecoder<T>::PaxDeltaDecoder(
+    const PaxDecoder::DecodingOption &encoder_options)
+    : PaxDecoder(encoder_options),
+      data_buffer_(nullptr),
+      result_buffer_(nullptr) {
+  CBDB_CHECK(encoder_options.column_encode_type ==
+                 ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA,
+             cbdb::CException::kExTypeAbort,
+             fmt("PaxDeltaDecoder only support DIRECT_DELTA encoding"));
+  // TODO: if sign is true, should use zigzag encoding, now use delta encoding
+  // for offsets in non-fixed columns
+  CBDB_CHECK(encoder_options.is_sign == false,
+             cbdb::CException::kExTypeUnImplements,
+             fmt("PaxDeltaDecoder is not supported for signed data, "
+                 "will support zigzag later"));
+}
+
+template <typename T>
+PaxDecoder *PaxDeltaDecoder<T>::SetSrcBuffer(char *data, size_t data_len) {
+  if (data) {
+    data_buffer_ =
+        std::make_shared<DataBuffer<char>>(data, data_len, false, false);
+    data_buffer_->Brush(data_len);
+  }
+  return this;
+}
+
+template <typename T>
+PaxDecoder *PaxDeltaDecoder<T>::SetDataBuffer(
+    std::shared_ptr<DataBuffer<char>> result_buffer) {
+  result_buffer_ = result_buffer;
+  return this;
+}
+
+template <typename T>
+const char *PaxDeltaDecoder<T>::GetBuffer() const {
+  return result_buffer_ ? result_buffer_->GetBuffer() : nullptr;
+}
+
+template <typename T>
+size_t PaxDeltaDecoder<T>::GetBufferSize() const {
+  return result_buffer_ ? result_buffer_->Used() : 0;
+}
+
+template <typename T>
+size_t PaxDeltaDecoder<T>::Next(const char * /*not_null*/) {
+  CBDB_RAISE(cbdb::CException::kExTypeUnImplements);
+}
+
+template <typename T>
+size_t PaxDeltaDecoder<T>::Decoding() {
+  if (!data_buffer_) return 0;
+  Assert(result_buffer_);
+
+  const uint8_t *p =
+      reinterpret_cast<const uint8_t *>(data_buffer_->GetBuffer());
+  uint32_t remaining = static_cast<uint32_t>(data_buffer_->Used());
+
+  // read header: values_per_block, values_per_mini_block_, total_count,
+  // first_value
+  DeltaBlockHeader header;
+  std::memcpy(&header, p, sizeof(header));
+  p += sizeof(header);
+  remaining -= sizeof(header);
+  uint32_t values_per_block = header.value_per_block;
+  uint32_t values_per_mini_block_ = header.values_per_mini_block;
+  uint32_t total_count = header.total_count;
+
+  T first_value;
+  std::memcpy(&first_value, p, sizeof(T));
+  p += sizeof(T);
+  remaining -= sizeof(T);
+
+  // reserve output buffer
+  if (result_buffer_->Capacity() < total_count * sizeof(T)) {
+    result_buffer_->ReSize(total_count * sizeof(T));
+  }
+
+  // write first value
+  T current_value = static_cast<T>(first_value);
+  result_buffer_->Write(reinterpret_cast<char *>(&current_value), sizeof(T));
+  result_buffer_->Brush(sizeof(T));
+  uint32_t decoded = 1;
+
+  const uint32_t mini_blocks_per_block_ =
+      values_per_block / values_per_mini_block_;
+
+  while (decoded < total_count && remaining > 0) {
+    uint32_t min_delta;
+    std::memcpy(&min_delta, p, sizeof(min_delta));
+    p += sizeof(min_delta);
+    remaining -= sizeof(min_delta);
+
+    if (remaining < mini_blocks_per_block_) break;
+
+    uint8_t bit_widths[mini_blocks_per_block_] = {0};
+    for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) {
+      bit_widths[i] = *p++;
+      --remaining;
+    }
+
+    uint32_t values_in_block =
+        std::min(values_per_block, total_count - decoded);
+
+    // read payload: initialize reader with remaining bytes; we'll compute
+    // consumed
+    BitReader64Ptr br(p, remaining);
+
+    for (uint32_t i = 0; i < mini_blocks_per_block_ && decoded < total_count;
+         ++i) {
+      uint32_t start = i * values_per_mini_block_;
+      if (start >= values_in_block) break;
+      uint32_t end = std::min(start + values_per_mini_block_, values_in_block);
+      uint32_t cnt = end - start;
+      uint8_t w = bit_widths[i];
+
+      T *out_base = reinterpret_cast<T 
*>(result_buffer_->GetAvailableBuffer());
+      ReadMiniBlockSpecializedPtr<T>(br, out_base, current_value, cnt,
+                                     min_delta, w);
+      result_buffer_->Brush(cnt * sizeof(T));
+      decoded += cnt;
+    }
+
+    br.AlignToByte();
+
+    size_t consumed = br.index;
+    p += consumed;
+    remaining -= consumed;
+  }
+
+  Assert(result_buffer_->Used() == total_count * sizeof(T));
+
+  return result_buffer_->Used();
+}
+
+template <typename T>
+size_t PaxDeltaDecoder<T>::Decoding(const char * /*not_null*/,
+                                    size_t /*not_null_len*/) {
+  CBDB_RAISE(cbdb::CException::kExTypeUnImplements);
+}
+
+template class PaxDeltaEncoder<uint32_t>;
+template class PaxDeltaDecoder<uint32_t>;
+// Add explicit instantiations for signed integral types used by CreateDecoder
+template class PaxDeltaDecoder<long>;
+template class PaxDeltaDecoder<int>;
+template class PaxDeltaDecoder<short>;
+template class PaxDeltaDecoder<signed char>;
+
+}  // namespace pax
\ No newline at end of file
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.h 
b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.h
new file mode 100644
index 00000000000..7f2251201bf
--- /dev/null
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.h
@@ -0,0 +1,135 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * pax_delta_encoding.h
+ *
+ * IDENTIFICATION
+ *       contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#pragma once
+
+#include "storage/columns/pax_encoding.h"
+#include "storage/columns/pax_decoding.h"
+#include <vector>
+
+namespace pax {
+  
+struct BitReader64 {
+    const uint8_t*& p;
+    uint32_t& remaining;
+    uint64_t bit_buffer = 0;
+    uint32_t bit_count = 0;
+
+    BitReader64(const uint8_t*& ptr, uint32_t& size) : p(ptr), remaining(size) 
{}
+
+    inline void Ensure(uint32_t need_bits) {
+        while (bit_count < need_bits && remaining > 0) {
+            bit_buffer |= (static_cast<uint64_t>(*p) << bit_count);
+            ++p;
+            --remaining;
+            bit_count += 8;
+        }
+    }
+
+    inline uint32_t Read(uint8_t width) {
+        if (width == 0) return 0;
+        Ensure(width);
+        uint32_t result;
+        if (width == 32) {
+            result = static_cast<uint32_t>(bit_buffer & 0xFFFFFFFFull);
+        } else {
+            result = static_cast<uint32_t>(bit_buffer & ((1ull << width) - 1));
+        }
+        bit_buffer >>= width;
+        bit_count -= width;
+        return result;
+    }
+
+    inline void AlignToByte() {
+        uint32_t drop = bit_count % 8;
+        if (drop) {
+            bit_buffer >>= drop;
+            bit_count -= drop;
+        }
+    }
+};
+
+struct DeltaBlockHeader {
+  uint32_t value_per_block;
+  uint32_t values_per_mini_block;
+  uint32_t total_count;
+};
+
+template <typename T>
+class PaxDeltaEncoder : public PaxEncoder {
+ public:
+  explicit PaxDeltaEncoder(const EncodingOption &encoder_options);
+
+  virtual void Append(char *data, size_t size) override;
+
+  virtual bool SupportAppendNull() const override;
+
+  virtual void Flush() override;
+
+  virtual size_t GetBoundSize(size_t src_len) const override;
+
+ private:
+
+  void Encode(T *data, size_t size);
+
+ private:
+  static constexpr uint32_t value_per_block_ = 128;
+  static constexpr uint32_t mini_blocks_per_block_ = 4;
+  static constexpr uint32_t values_per_mini_block_ =
+      value_per_block_ / mini_blocks_per_block_;
+
+ private:
+  bool has_append_ = false;
+  // Reusable working buffer to avoid per-block allocations during encoding
+  std::vector<uint32_t> deltas_scratch_;
+};
+
+template <typename T>
+class PaxDeltaDecoder : public PaxDecoder {
+ public:
+  explicit PaxDeltaDecoder(const PaxDecoder::DecodingOption &encoder_options);
+
+  virtual PaxDecoder *SetSrcBuffer(char *data, size_t data_len) override;
+
+  virtual PaxDecoder *SetDataBuffer(
+      std::shared_ptr<DataBuffer<char>> result_buffer) override;
+
+  virtual size_t Next(const char *not_null) override;
+
+  virtual size_t Decoding() override;
+
+  virtual size_t Decoding(const char *not_null, size_t not_null_len) override;
+
+  virtual const char *GetBuffer() const override;
+
+  virtual size_t GetBufferSize() const override;
+
+ private:
+  std::shared_ptr<DataBuffer<char>> data_buffer_;
+  std::shared_ptr<DataBuffer<char>> result_buffer_;
+};
+
+}  // namespace pax
\ No newline at end of file
diff --git 
a/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding_test.cc 
b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding_test.cc
new file mode 100644
index 00000000000..031563381ee
--- /dev/null
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding_test.cc
@@ -0,0 +1,339 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * pax_delta_encoding_test.cc
+ *
+ * IDENTIFICATION
+ *       contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding_test.cc
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "storage/columns/pax_delta_encoding.h"
+
+#include <random>
+#include <vector>
+
+#include "comm/gtest_wrappers.h"
+#include "pax_gtest_helper.h"
+
+namespace pax {
+
+class PaxDeltaEncodingTest : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    // Create encoding options
+    encoding_options_.column_encode_type =
+        ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+    encoding_options_.is_sign = false;
+
+    // Create decoding options
+    decoding_options_.column_encode_type =
+        ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+    decoding_options_.is_sign = false;
+  }
+
+  void TearDown() override {}
+
+  // Fast bit width calculation (0 -> 0)
+  inline uint8_t FastNumBits(uint32_t v) {
+#if defined(__GNUC__) || defined(__clang__)
+    return v == 0 ? 0 : static_cast<uint8_t>(32 - __builtin_clz(v));
+#else
+    uint8_t bits = 0;
+    while (v) {
+      ++bits;
+      v >>= 1;
+    }
+    return bits;
+#endif
+  }
+
+  // Helper function to encode and decode data
+  template <typename T>
+  std::vector<T> EncodeAndDecode(const std::vector<T> &input) {
+    // Create encoder
+    PaxDeltaEncoder<T> encoder(encoding_options_);
+
+    size_t bound_size = encoder.GetBoundSize(input.size() * sizeof(T));
+
+    encoder.SetDataBuffer(std::make_shared<DataBuffer<char>>(bound_size));
+
+    // Encode data
+    encoder.Append(reinterpret_cast<char *>(const_cast<T *>(input.data())),
+                   input.size() * sizeof(T));
+
+    // Get encoded buffer
+    const char *encoded_data = encoder.GetBuffer();
+    size_t encoded_size = encoder.GetBufferSize();
+
+    // Create decoder
+    PaxDeltaDecoder<T> decoder(decoding_options_);
+
+    // Set source buffer
+    decoder.SetSrcBuffer(const_cast<char *>(encoded_data), encoded_size);
+
+    // Create result buffer
+    auto result_buffer =
+        std::make_shared<DataBuffer<char>>(input.size() * sizeof(T));
+    decoder.SetDataBuffer(result_buffer);
+
+    // Decode
+    size_t decoded_size = decoder.Decoding();
+
+    // Convert result back to vector
+    const T *decoded_data = reinterpret_cast<const T *>(decoder.GetBuffer());
+    size_t count = decoded_size / sizeof(T);
+
+    return std::vector<T>(decoded_data, decoded_data + count);
+  }
+
+  PaxEncoder::EncodingOption encoding_options_;
+  PaxDecoder::DecodingOption decoding_options_;
+};
+
+// Test basic functionality
+TEST_F(PaxDeltaEncodingTest, BasicEncodeDecode) {
+  std::vector<uint32_t> input = {1, 2, 3, 4, 5};
+  auto output = EncodeAndDecode(input);
+  EXPECT_EQ(input, output);
+}
+
+// Test example from documentation - consecutive sequence
+TEST_F(PaxDeltaEncodingTest, ConsecutiveSequence) {
+  std::vector<uint32_t> input = {1, 2, 3, 4, 5};
+  auto output = EncodeAndDecode(input);
+  EXPECT_EQ(input, output);
+
+  // Verify deltas would be [1, 1, 1, 1] with min_delta = 1
+  // and adjusted deltas [0, 0, 0, 0] with bit_width = 0
+}
+
+// Test example from documentation - sequence with variation
+TEST_F(PaxDeltaEncodingTest, SequenceWithVariation) {
+  std::vector<uint32_t> input = {7, 5, 3, 1, 2, 3, 4, 5};
+  auto output = EncodeAndDecode(input);
+  EXPECT_EQ(input, output);
+
+  // Verify deltas would be [-2, -2, -2, 1, 1, 1, 1] with min_delta = -2
+  // Since we cast to uint32, -2 becomes a large positive number
+  // adjusted deltas would be [0, 0, 0, 3, 3, 3, 3] with bit_width = 2
+}
+
+// Test single value
+TEST_F(PaxDeltaEncodingTest, SingleValue) {
+  std::vector<uint32_t> input = {42};
+  auto output = EncodeAndDecode(input);
+  EXPECT_EQ(input, output);
+}
+
+// Test two values
+TEST_F(PaxDeltaEncodingTest, TwoValues) {
+  std::vector<uint32_t> input = {10, 15};
+  auto output = EncodeAndDecode(input);
+  EXPECT_EQ(input, output);
+}
+
+// Test large values
+TEST_F(PaxDeltaEncodingTest, LargeValues) {
+  std::vector<uint32_t> input = {1000000, 1000001, 1000002, 1000003};
+  auto output = EncodeAndDecode(input);
+  EXPECT_EQ(input, output);
+}
+
+// Test values with large deltas
+TEST_F(PaxDeltaEncodingTest, LargeDeltas) {
+  std::vector<uint32_t> input = {1, 1000, 2000, 3000};
+  auto output = EncodeAndDecode(input);
+  EXPECT_EQ(input, output);
+}
+
+// Test full block (128 values)
+TEST_F(PaxDeltaEncodingTest, FullBlock) {
+  std::vector<uint32_t> input;
+  for (uint32_t i = 0; i < 128; ++i) {
+    input.push_back(i);
+  }
+  auto output = EncodeAndDecode(input);
+  EXPECT_EQ(input, output);
+}
+
+// Test multiple blocks
+TEST_F(PaxDeltaEncodingTest, MultipleBlocks) {
+  std::vector<uint32_t> input;
+  for (uint32_t i = 0; i < 250; ++i) {
+    input.push_back(i);
+  }
+  auto output = EncodeAndDecode(input);
+  EXPECT_EQ(input, output);
+}
+
+// Test random data
+TEST_F(PaxDeltaEncodingTest, RandomData) {
+  std::mt19937 gen(12345);
+  std::uniform_int_distribution<uint32_t> dis(0, 1000000);
+
+  std::vector<uint32_t> input;
+  for (int i = 0; i < 100; ++i) {
+    input.push_back(dis(gen));
+  }
+
+  auto output = EncodeAndDecode(input);
+  EXPECT_EQ(input, output);
+}
+
+// Test payload size calculation
+TEST_F(PaxDeltaEncodingTest, PayloadSizeCalculation) {
+  std::vector<uint32_t> input = {
+      1,  2,  3,  4,  5,  6,  7,  8,  9,  10, 11, 12, 13, 14, 15, 16, 17, 18,
+      19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 56, 63, 89};
+  // Test the specific example: deltas [0,0,0,0,0,0,0,0,...,0,22,6,25] with
+  // bit_width 0,5,0,0
+
+  PaxDeltaEncoder<uint32_t> encoder(encoding_options_);
+  size_t bound_size = encoder.GetBoundSize(input.size() * sizeof(uint32_t));
+  encoder.SetDataBuffer(std::make_shared<DataBuffer<char>>(bound_size));
+  encoder.Append(reinterpret_cast<char *>(input.data()),
+                 input.size() * sizeof(uint32_t));
+
+  // Verify the encoded data structure manually
+  const char *encoded_data = encoder.GetBuffer();
+  size_t encoded_size = encoder.GetBufferSize();
+
+  EXPECT_GT(encoded_size, 0);
+
+  // Parse the encoded data
+  const uint8_t *p = reinterpret_cast<const uint8_t *>(encoded_data);
+
+  // Read header
+  DeltaBlockHeader header;
+  std::memcpy(&header, p, sizeof(header));
+  p += sizeof(header);
+
+  EXPECT_EQ(header.value_per_block, 128);
+  EXPECT_EQ(header.values_per_mini_block, 32);
+  EXPECT_EQ(header.total_count, input.size());
+
+  // Read first value
+  uint32_t first_value;
+  std::memcpy(&first_value, p, sizeof(first_value));
+  p += sizeof(first_value);
+  EXPECT_EQ(first_value, 1);
+
+  // Read block data
+  uint32_t min_delta;
+  std::memcpy(&min_delta, p, sizeof(min_delta));
+  p += sizeof(min_delta);
+
+  // Read allbit widths
+  uint8_t bit_widths[4];
+  for (int i = 0; i < 4; ++i) {
+    bit_widths[i] = *p++;
+  }
+
+  // bit_widths should be [0, 6, 0, 0]
+  ASSERT_EQ(bit_widths[0], 0);
+  ASSERT_EQ(bit_widths[1], 5);
+  ASSERT_EQ(bit_widths[2], 0);
+  ASSERT_EQ(bit_widths[3], 0);
+
+  // Compute payload size from bit_widths and counts
+  uint32_t values_in_block =
+      input.size() - 1;  // we constructed input with 35 deltas in first block
+  uint64_t total_bits = 0;
+  for (uint32_t i = 0; i < 4; ++i) {
+    uint32_t start = i * 32;
+    if (start >= values_in_block) break;
+    uint32_t end = std::min(start + 32u, values_in_block);
+    uint8_t w = bit_widths[i];
+    total_bits += static_cast<uint64_t>(w) * (end - start);
+  }
+  uint32_t payload_size = static_cast<uint32_t>((total_bits + 7) / 8);
+
+  // For this example, we expect payload_size = 2 bytes
+  EXPECT_EQ(payload_size, 2);
+
+  // Assert payload bitmap is correct
+  uint8_t payload[4];
+  std::memcpy(payload, p, 4);
+  p += 4;
+
+  // payload should be LSB-Last, value is(22,6,25)
+  // [0b10110, 0b00110, 0b11001]
+  EXPECT_EQ(payload[0], 0b11010110);
+  EXPECT_EQ(payload[1], 0b01100100);
+}
+
+// Test bit width calculation helper
+TEST_F(PaxDeltaEncodingTest, BitWidthCalculation) {
+  EXPECT_EQ(FastNumBits(0), 0);
+  EXPECT_EQ(FastNumBits(1), 1);
+  EXPECT_EQ(FastNumBits(2), 2);
+  EXPECT_EQ(FastNumBits(3), 2);
+  EXPECT_EQ(FastNumBits(4), 3);
+  EXPECT_EQ(FastNumBits(7), 3);
+  EXPECT_EQ(FastNumBits(8), 4);
+  EXPECT_EQ(FastNumBits(15), 4);
+  EXPECT_EQ(FastNumBits(16), 5);
+  EXPECT_EQ(FastNumBits(255), 8);
+  EXPECT_EQ(FastNumBits(256), 9);
+}
+
+// Test zero deltas (all same values)
+TEST_F(PaxDeltaEncodingTest, ZeroDeltas) {
+  std::vector<uint32_t> input = {42, 42, 42, 42, 42};
+  auto output = EncodeAndDecode(input);
+  EXPECT_EQ(input, output);
+}
+
+// Test decreasing sequence (negative deltas)
+TEST_F(PaxDeltaEncodingTest, DecreasingSequence) {
+  std::vector<uint32_t> input = {100, 90, 80, 70, 60};
+  auto output = EncodeAndDecode(input);
+  EXPECT_EQ(input, output);
+}
+
+// Test mixed pattern
+TEST_F(PaxDeltaEncodingTest, MixedPattern) {
+  std::vector<uint32_t> input = {10, 20, 15, 25, 5, 30, 1, 35};
+  auto output = EncodeAndDecode(input);
+  EXPECT_EQ(input, output);
+}
+
+// Test empty input (edge case)
+TEST_F(PaxDeltaEncodingTest, EmptyInput) {
+  std::vector<uint32_t> input = {};
+  // This should handle gracefully or throw expected exception
+  // For now, let's skip this test until we clarify expected behavior
+}
+
+// Test different data types
+TEST_F(PaxDeltaEncodingTest, DifferentTypes) {
+  // Test int32_t (with non-negative values)
+  std::vector<uint32_t> input32 = {1, 2, 3, 4, 5};
+  auto output32 = EncodeAndDecode(input32);
+  EXPECT_EQ(input32, output32);
+}
+
+}  // namespace pax
+
+int main(int argc, char **argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_dict_encoding.h 
b/contrib/pax_storage/src/cpp/storage/columns/pax_dict_encoding.h
index e552fa7a55a..38f3ba217db 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_dict_encoding.h
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_dict_encoding.h
@@ -53,6 +53,10 @@ class PaxDictEncoder final : public PaxEncoder {
 
   void Flush() override;
 
+  size_t GetBoundSize(size_t src_len) const override {
+    CBDB_RAISE(cbdb::CException::kExTypeUnImplements);
+  }
+
  private:
   size_t AppendInternal(char *data, size_t len);
 
@@ -89,7 +93,8 @@ class PaxDictDecoder final : public PaxDecoder {
 
   PaxDecoder *SetSrcBuffer(char *data, size_t data_len) override;
 
-  PaxDecoder *SetDataBuffer(std::shared_ptr<DataBuffer<char>> result_buffer) 
override;
+  PaxDecoder *SetDataBuffer(
+      std::shared_ptr<DataBuffer<char>> result_buffer) override;
 
   const char *GetBuffer() const override;
 
@@ -121,8 +126,8 @@ class PaxDictDecoder final : public PaxDecoder {
 
     buffer = src_buff->GetBuffer();
 
-    index_buffer =
-        std::make_shared<DataBuffer<int32>>((int32 *)buffer, head.indexsz, 
false, false);
+    index_buffer = std::make_shared<DataBuffer<int32>>(
+        (int32 *)buffer, head.indexsz, false, false);
     index_buffer->BrushAll();
 
     desc_buffer = std::make_shared<DataBuffer<int32>>(
@@ -130,8 +135,8 @@ class PaxDictDecoder final : public PaxDecoder {
         false);
     desc_buffer->BrushAll();
 
-    entry_buffer = std::make_shared<DataBuffer<char>>(buffer + head.indexsz, 
head.dictsz,
-                                             false, false);
+    entry_buffer = std::make_shared<DataBuffer<char>>(
+        buffer + head.indexsz, head.dictsz, false, false);
     entry_buffer->BrushAll();
 
     return std::make_tuple(index_buffer, entry_buffer, desc_buffer);
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.cc 
b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.cc
index 3a354ceec8d..b11b2b7b6bd 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.cc
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.cc
@@ -33,6 +33,7 @@
 #include "comm/pax_memory.h"
 #include "storage/columns/pax_dict_encoding.h"
 #include "storage/columns/pax_rlev2_encoding.h"
+#include "storage/columns/pax_delta_encoding.h"
 
 namespace pax {
 
@@ -56,8 +57,7 @@ std::shared_ptr<PaxEncoder> 
PaxEncoder::CreateStreamingEncoder(
       break;
     }
     case ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA: {
-      // TODO(jiaqizho): support direct delta encoding
-      // not support yet, then direct return a nullptr(means no encoding)
+      encoder = std::make_shared<PaxDeltaEncoder<uint32_t>>(encoder_options);
       break;
     }
     case ColumnEncoding_Kind::ColumnEncoding_Kind_DEF_ENCODED: {
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.h 
b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.h
index 362e68caa13..465c7bf0600 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.h
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.h
@@ -75,6 +75,8 @@ class PaxEncoder {
 
   virtual size_t GetBufferSize() const;
 
+  virtual size_t GetBoundSize(size_t src_len) const = 0;
+
   /**
    * steaming encoder
    *
diff --git 
a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.cc 
b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.cc
index 25b6d2f1d6d..90060050236 100644
--- 
a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.cc
+++ 
b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.cc
@@ -59,21 +59,37 @@ void PaxNonFixedEncodingColumn::InitEncoder() {
 }
 
 void PaxNonFixedEncodingColumn::InitOffsetStreamCompressor() {
-  Assert(encoder_options_.offsets_encode_type !=
-         ColumnEncoding_Kind::ColumnEncoding_Kind_DEF_ENCODED);
-  offsets_compressor_ = PaxCompressor::CreateBlockCompressor(
-      encoder_options_.offsets_encode_type);
+  Assert(encoder_options_.offsets_encode_type ==
+         ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA);
+
   SetOffsetsEncodeType(encoder_options_.offsets_encode_type);
   SetOffsetsCompressLevel(encoder_options_.offsets_compress_level);
+
+  PaxEncoder::EncodingOption opt = encoder_options_;
+  opt.column_encode_type =
+      ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+  opt.is_sign = false;
+  // offsets are fixed-width, do not enable non_fixed streaming restriction
+  offsets_encoder_ = PaxEncoder::CreateStreamingEncoder(opt, false);
 }
 
 void PaxNonFixedEncodingColumn::InitOffsetStreamDecompressor() {
   Assert(decoder_options_.offsets_encode_type !=
          ColumnEncoding_Kind::ColumnEncoding_Kind_DEF_ENCODED);
-  offsets_compressor_ = PaxCompressor::CreateBlockCompressor(
-      decoder_options_.offsets_encode_type);
   SetOffsetsEncodeType(decoder_options_.offsets_encode_type);
   SetOffsetsCompressLevel(decoder_options_.offsets_compress_level);
+
+  if (decoder_options_.offsets_encode_type ==
+      ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA) {
+    PaxDecoder::DecodingOption temp_opt = decoder_options_;
+    temp_opt.column_encode_type =
+        ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+    temp_opt.is_sign = false;
+    offsets_decoder_ = PaxDecoder::CreateDecoder<int32>(temp_opt);
+  } else {
+    offsets_compressor_ = PaxCompressor::CreateBlockCompressor(
+        decoder_options_.offsets_encode_type);
+  }
 }
 
 void PaxNonFixedEncodingColumn::InitDecoder() {
@@ -169,9 +185,13 @@ void 
PaxNonFixedEncodingColumn::Set(std::shared_ptr<DataBuffer<char>> data,
 
   auto offsets_decompress = [&]() {
     Assert(!compress_route_);
-    Assert(offsets_compressor_);
+    Assert(offsets_compressor_ || offsets_decoder_);
+
+    if (offsets->Used() == 0) {
+      return;
+    }
 
-    if (offsets->Used() != 0) {
+    if (offsets_compressor_) {
       auto d_size = offsets_compressor_->Decompress(
           PaxNonFixedColumn::offsets_->Start(),
           PaxNonFixedColumn::offsets_->Capacity(), offsets->Start(),
@@ -182,22 +202,36 @@ void 
PaxNonFixedEncodingColumn::Set(std::shared_ptr<DataBuffer<char>> data,
             fmt("Decompress failed, %s", compressor_->ErrorName(d_size)));
       }
       PaxNonFixedColumn::offsets_->Brush(d_size);
+      return;
+    }
+
+    if (offsets_decoder_) {
+      // Decode offsets using encoder for int32 stream
+      shared_offsets_data_ = std::make_shared<DataBuffer<char>>(
+          PaxNonFixedColumn::offsets_->Start(),
+          PaxNonFixedColumn::offsets_->Capacity(), false, false);
+      offsets_decoder_->SetDataBuffer(shared_offsets_data_);
+      offsets_decoder_->SetSrcBuffer(offsets->Start(), offsets->Used());
+      offsets_decoder_->Decoding();
+      PaxNonFixedColumn::offsets_->Brush(shared_offsets_data_->Used());
+      return;
     }
   };
 
   exist_decoder = compressor_ || decoder_;
+  bool has_offsets_processor = offsets_compressor_ || offsets_decoder_;
 
-  if (exist_decoder && offsets_compressor_) {
+  if (exist_decoder && has_offsets_processor) {
     data_decompress();
     offsets_decompress();
     PaxNonFixedColumn::estimated_size_ = total_size;
     PaxNonFixedColumn::next_offsets_ = -1;
-  } else if (exist_decoder && !offsets_compressor_) {
+  } else if (exist_decoder && !has_offsets_processor) {
     data_decompress();
     PaxNonFixedColumn::offsets_ = offsets;
     PaxNonFixedColumn::estimated_size_ = total_size;
     PaxNonFixedColumn::next_offsets_ = -1;
-  } else if (!exist_decoder && offsets_compressor_) {
+  } else if (!exist_decoder && has_offsets_processor) {
     PaxNonFixedColumn::data_ = data;
     offsets_decompress();
     PaxNonFixedColumn::estimated_size_ = total_size;
@@ -278,17 +312,17 @@ std::pair<char *, size_t> 
PaxNonFixedEncodingColumn::GetOffsetBuffer(
     AppendLastOffset();
   }
 
-  if (offsets_compressor_ && compress_route_) {
-    if (shared_offsets_data_) {
-      return std::make_pair(shared_offsets_data_->Start(),
-                            shared_offsets_data_->Used());
-    }
+  if (shared_offsets_data_) {
+    return std::make_pair(shared_offsets_data_->Start(),
+                          shared_offsets_data_->Used());
+  }
 
-    if (PaxNonFixedColumn::offsets_->Used() == 0) {
-      // should never append last offset again
-      return PaxNonFixedColumn::GetOffsetBuffer(false);
-    }
+  if (PaxNonFixedColumn::offsets_->Used() == 0) {
+    // should never append last offset again
+    return PaxNonFixedColumn::GetOffsetBuffer(false);
+  }
 
+  if (offsets_compressor_ && compress_route_) {
     size_t bound_size = offsets_compressor_->GetCompressBound(
         PaxNonFixedColumn::offsets_->Used());
     shared_offsets_data_ = std::make_shared<DataBuffer<char>>(bound_size);
@@ -308,6 +342,20 @@ std::pair<char *, size_t> 
PaxNonFixedEncodingColumn::GetOffsetBuffer(
                           shared_offsets_data_->Used());
   }
 
+  if (offsets_encoder_ && compress_route_) {
+    // For delta encoder, allocate a buffer sized by raw bytes for safety
+    size_t bound_size = offsets_encoder_->GetBoundSize(offsets_->Used());
+    shared_offsets_data_ = std::make_shared<DataBuffer<char>>(bound_size);
+    offsets_encoder_->SetDataBuffer(shared_offsets_data_);
+
+    // Encode entire offsets buffer as a single stream
+    offsets_encoder_->Append(offsets_->Start(), offsets_->Used());
+    offsets_encoder_->Flush();
+
+    return std::make_pair(shared_offsets_data_->Start(),
+                          shared_offsets_data_->Used());
+  }
+
   // no compress or uncompressed
   // should never append last offset again
   return PaxNonFixedColumn::GetOffsetBuffer(false);
diff --git 
a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.h 
b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.h
index b4e956cfe4a..06b60d02ac2 100644
--- 
a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.h
+++ 
b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.h
@@ -83,6 +83,9 @@ class PaxNonFixedEncodingColumn : public PaxNonFixedColumn {
   std::shared_ptr<DataBuffer<char>> shared_data_;
 
   std::shared_ptr<PaxCompressor> offsets_compressor_;
+  // Optional encoder/decoder for offsets stream (alternative to compression)
+  std::shared_ptr<PaxEncoder> offsets_encoder_;
+  std::shared_ptr<PaxDecoder> offsets_decoder_;
   std::shared_ptr<DataBuffer<char>> shared_offsets_data_;
 };
 
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_test.cc 
b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_test.cc
index 5fa7fb7153c..b3a7ec59458 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_test.cc
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_test.cc
@@ -1361,4 +1361,96 @@ TEST_F(PaxEncodingTest, TestEncodingWithAllNULL) {
   ASSERT_EQ(n_read, shared_dst_data->Used());
 }
 
+TEST_F(PaxEncodingTest, TestPaxDeltaEncodingBasic) {
+  std::vector<uint32_t> data_vec{100, 101, 102, 105, 106, 110, 120, 121};
+  auto shared_data = std::make_shared<DataBuffer<char>>(1024);
+  auto shared_dst_data = std::make_shared<DataBuffer<char>>(1024);
+
+  PaxEncoder::EncodingOption encoder_options;
+  encoder_options.column_encode_type =
+      ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+  encoder_options.is_sign = false;
+  auto encoder = PaxEncoder::CreateStreamingEncoder(encoder_options);
+
+  ASSERT_TRUE(encoder);
+  encoder->SetDataBuffer(shared_data);
+  encoder->Append(reinterpret_cast<char *>(data_vec.data()), data_vec.size() * 
sizeof(uint32_t));
+  encoder->Flush();
+
+  ASSERT_NE(encoder->GetBuffer(), nullptr);
+  ASSERT_GT(encoder->GetBufferSize(), 0UL);
+
+  PaxDecoder::DecodingOption decoder_options;
+  decoder_options.column_encode_type =
+      ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+  decoder_options.is_sign = false;
+
+  auto decoder = PaxDecoder::CreateDecoder<int32>(decoder_options);
+  ASSERT_TRUE(decoder);
+  decoder->SetSrcBuffer(shared_data->GetBuffer(), shared_data->Used());
+
+  decoder->SetDataBuffer(shared_dst_data);
+  decoder->Decoding();
+
+  ASSERT_EQ(shared_dst_data->Used(), data_vec.size() * sizeof(int32));
+
+  auto result_dst_data = std::make_shared<DataBuffer<int32>>(
+      reinterpret_cast<int32 *>(shared_dst_data->Start()),
+      shared_dst_data->Used(), false, false);
+
+  for (size_t i = 0; i < data_vec.size(); ++i) {
+    ASSERT_EQ((*result_dst_data)[i], static_cast<int32>(data_vec[i]));
+  }
+}
+
+TEST_F(PaxEncodingTest, TestPaxDeltaEncodingRoundTripRandom) {
+  const size_t n = 1000;
+  std::vector<uint32_t> data_vec(n);
+  std::mt19937 rng(12345);
+  std::uniform_int_distribution<uint32_t> base_dist(0, 100);
+  std::uniform_int_distribution<uint32_t> step_dist(0, 5);
+
+  data_vec[0] = base_dist(rng);
+  for (size_t i = 1; i < n; ++i) {
+    data_vec[i] = data_vec[i - 1] + step_dist(rng);
+  }
+
+  auto shared_data = std::make_shared<DataBuffer<char>>(n * sizeof(uint32_t));
+  auto shared_dst_data = std::make_shared<DataBuffer<char>>(n * 
sizeof(uint32_t));
+
+  PaxEncoder::EncodingOption encoder_options;
+  encoder_options.column_encode_type =
+      ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+  encoder_options.is_sign = false;
+  auto encoder = PaxEncoder::CreateStreamingEncoder(encoder_options);
+
+  ASSERT_TRUE(encoder);
+  encoder->SetDataBuffer(shared_data);
+
+  encoder->Append(reinterpret_cast<char *>(data_vec.data()), data_vec.size() * 
sizeof(uint32_t));
+  encoder->Flush();
+
+  PaxDecoder::DecodingOption decoder_options;
+  decoder_options.column_encode_type =
+      ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+  decoder_options.is_sign = false;
+
+  auto decoder = PaxDecoder::CreateDecoder<int32>(decoder_options);
+  ASSERT_TRUE(decoder);
+  decoder->SetSrcBuffer(shared_data->GetBuffer(), shared_data->Used());
+
+  decoder->SetDataBuffer(shared_dst_data);
+  decoder->Decoding();
+
+  ASSERT_EQ(shared_dst_data->Used(), data_vec.size() * sizeof(int32));
+
+  auto result_dst_data = std::make_shared<DataBuffer<int32>>(
+      reinterpret_cast<int32 *>(shared_dst_data->Start()),
+      shared_dst_data->Used(), false, false);
+
+  for (size_t i = 0; i < data_vec.size(); ++i) {
+    ASSERT_EQ((*result_dst_data)[i], static_cast<int32>(data_vec[i]));
+  }
+}
+
 }  // namespace pax::tests
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_rlev2_encoding.h 
b/contrib/pax_storage/src/cpp/storage/columns/pax_rlev2_encoding.h
index 7d021a1f1cf..f2197258b69 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_rlev2_encoding.h
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_rlev2_encoding.h
@@ -49,6 +49,10 @@ class PaxOrcEncoder final : public PaxEncoder {
 
   void Flush() override;
 
+  size_t GetBoundSize(size_t src_len) const override {
+    CBDB_RAISE(cbdb::CException::kExTypeUnImplements);
+  }
+
  private:
   struct EncoderContext {
     bool is_sign;
diff --git 
a/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.cc 
b/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.cc
index aaf514f5926..8f3aafae2c4 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.cc
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.cc
@@ -348,7 +348,7 @@ void PaxVecNonFixedEncodingColumn::Set(
     PaxVecNonFixedColumn::estimated_size_ = total_size;
     PaxVecNonFixedColumn::next_offsets_ = -1;
   } else {  // (!compressor_ && !offsets_compressor_)
-    PaxVecNonFixedColumn::Set(data, offsets_, total_size, non_null_rows);
+    PaxVecNonFixedColumn::Set(data, offsets, total_size, non_null_rows);
   }
 }
 
diff --git 
a/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.h 
b/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.h
index 4362312a5a9..524ddca261a 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.h
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.h
@@ -112,6 +112,9 @@ class PaxVecNonFixedEncodingColumn : public 
PaxVecNonFixedColumn {
   std::shared_ptr<DataBuffer<char>> shared_data_;
 
   std::shared_ptr<PaxCompressor> offsets_compressor_;
+  // Optional encoder/decoder for offsets stream (alternative to compression)
+  std::shared_ptr<PaxEncoder> offsets_encoder_;
+  std::shared_ptr<PaxDecoder> offsets_decoder_;
   std::shared_ptr<DataBuffer<char>> shared_offsets_data_;
 };
 
diff --git a/contrib/pax_storage/src/cpp/storage/micro_partition.h 
b/contrib/pax_storage/src/cpp/storage/micro_partition.h
index 77d61462ad4..56d85b46a74 100644
--- a/contrib/pax_storage/src/cpp/storage/micro_partition.h
+++ b/contrib/pax_storage/src/cpp/storage/micro_partition.h
@@ -58,7 +58,6 @@ class MicroPartitionWriter {
     RelFileNode node;
     bool need_wal = false;
     std::vector<std::tuple<ColumnEncoding_Kind, int>> encoding_opts;
-    std::pair<ColumnEncoding_Kind, int> offsets_encoding_opts;
     std::vector<int> enable_min_max_col_idxs;
     std::vector<int> enable_bf_col_idxs;
 
diff --git a/contrib/pax_storage/src/cpp/storage/orc/orc_writer.cc 
b/contrib/pax_storage/src/cpp/storage/orc/orc_writer.cc
index 63413a2239d..6c8d49502e5 100644
--- a/contrib/pax_storage/src/cpp/storage/orc/orc_writer.cc
+++ b/contrib/pax_storage/src/cpp/storage/orc/orc_writer.cc
@@ -104,7 +104,6 @@ static std::unique_ptr<PaxColumns> BuildColumns(
     const std::vector<pax::porc::proto::Type_Kind> &types, const TupleDesc 
desc,
     const std::vector<std::tuple<ColumnEncoding_Kind, int>>
         &column_encoding_types,
-    const std::pair<ColumnEncoding_Kind, int> &offsets_encoding_types,
     const PaxStorageFormat &storage_format) {
   std::unique_ptr<PaxColumns> columns;
   bool is_vec;
@@ -125,14 +124,7 @@ static std::unique_ptr<PaxColumns> BuildColumns(
     encoding_option.is_sign = true;
     encoding_option.compress_level = std::get<1>(column_encoding_types[i]);
 
-    if (offsets_encoding_types.first == ColumnEncoding_Kind_DEF_ENCODED) {
-      // default value of offsets_stream is zstd
-      encoding_option.offsets_encode_type = ColumnEncoding_Kind_COMPRESS_ZSTD;
-      encoding_option.offsets_compress_level = 5;
-    } else {
-      encoding_option.offsets_encode_type = offsets_encoding_types.first;
-      encoding_option.offsets_compress_level = offsets_encoding_types.second;
-    }
+    encoding_option.offsets_encode_type = ColumnEncoding_Kind_DIRECT_DELTA;
 
     switch (type) {
       case (pax::porc::proto::Type_Kind::Type_Kind_STRING): {
@@ -241,10 +233,9 @@ OrcWriter::OrcWriter(
   Assert(writer_options.rel_tuple_desc->natts ==
          static_cast<int>(column_types.size()));
 
-  pax_columns_ = BuildColumns(column_types_, writer_options.rel_tuple_desc,
-                              writer_options.encoding_opts,
-                              writer_options.offsets_encoding_opts,
-                              writer_options.storage_format);
+  pax_columns_ =
+      BuildColumns(column_types_, writer_options.rel_tuple_desc,
+                   writer_options.encoding_opts, 
writer_options.storage_format);
 
   summary_.rel_oid = writer_options.rel_oid;
   summary_.block_id = writer_options.block_id;
@@ -300,7 +291,6 @@ void OrcWriter::Flush() {
 
     new_columns = BuildColumns(column_types_, writer_options_.rel_tuple_desc,
                                writer_options_.encoding_opts,
-                               writer_options_.offsets_encoding_opts,
                                writer_options_.storage_format);
 
     for (size_t i = 0; i < column_types_.size(); ++i) {
diff --git a/contrib/pax_storage/src/cpp/storage/pax.cc 
b/contrib/pax_storage/src/cpp/storage/pax.cc
index ab10387c76c..c8d29bbb6ce 100644
--- a/contrib/pax_storage/src/cpp/storage/pax.cc
+++ b/contrib/pax_storage/src/cpp/storage/pax.cc
@@ -200,8 +200,6 @@ std::unique_ptr<MicroPartitionWriter> 
TableWriter::CreateMicroPartitionWriter(
   options.file_name = std::move(file_path);
   options.encoding_opts = GetRelEncodingOptions();
   options.storage_format = GetStorageFormat();
-  options.offsets_encoding_opts = std::make_pair(
-      PAX_OFFSETS_DEFAULT_COMPRESSTYPE, PAX_OFFSETS_DEFAULT_COMPRESSLEVEL);
   options.enable_min_max_col_idxs = GetMinMaxColumnIndexes();
   options.enable_bf_col_idxs = GetBloomFilterColumnIndexes();
 
@@ -261,8 +259,8 @@ void TableWriter::InitOptionsCaches() {
 }
 
 void TableWriter::Open() {
-  rel_path_ = cbdb::BuildPaxDirectoryPath(
-      relation_->rd_node, relation_->rd_backend);
+  rel_path_ =
+      cbdb::BuildPaxDirectoryPath(relation_->rd_node, relation_->rd_backend);
 
   InitOptionsCaches();
 
@@ -509,8 +507,8 @@ void TableReader::OpenFile() {
 
   if (it.GetExistToast()) {
     // must exist the file in disk
-    toast_file = file_system_->Open(it.GetFileName() + TOAST_FILE_SUFFIX,
-                                    fs::kReadMode);
+    toast_file =
+        file_system_->Open(it.GetFileName() + TOAST_FILE_SUFFIX, 
fs::kReadMode);
   }
 
   reader_ = MicroPartitionFileFactory::CreateMicroPartitionReader(
@@ -588,8 +586,7 @@ void TableDeleter::DeleteWithVisibilityMap(
 
   std::unique_ptr<Bitmap8> visi_bitmap;
   auto catalog_update = pax::PaxCatalogUpdater::Begin(rel_);
-  auto rel_path = cbdb::BuildPaxDirectoryPath(
-      rel_->rd_node, rel_->rd_backend);
+  auto rel_path = cbdb::BuildPaxDirectoryPath(rel_->rd_node, rel_->rd_backend);
 
   min_max_col_idxs = cbdb::GetMinMaxColumnIndexes(rel_);
   stats_updater_projection->SetColumnProjection(min_max_col_idxs,
@@ -662,11 +659,10 @@ void TableDeleter::DeleteWithVisibilityMap(
     // TODO: update stats and visimap all in one catalog update
     // Update the stats in pax aux table
     // Notice that: PAX won't update the stats in group
-    UpdateStatsInAuxTable(catalog_update, micro_partition_metadata,
-                          std::make_shared<Bitmap8>(visi_bitmap->Raw()),
-                          min_max_col_idxs,
-                          cbdb::GetBloomFilterColumnIndexes(rel_),
-                          stats_updater_projection);
+    UpdateStatsInAuxTable(
+        catalog_update, micro_partition_metadata,
+        std::make_shared<Bitmap8>(visi_bitmap->Raw()), min_max_col_idxs,
+        cbdb::GetBloomFilterColumnIndexes(rel_), stats_updater_projection);
 
     // write pg_pax_blocks_oid
     catalog_update.UpdateVisimap(block_id, visimap_file_name);
diff --git a/contrib/pax_storage/src/cpp/storage/pax_defined.h 
b/contrib/pax_storage/src/cpp/storage/pax_defined.h
index b4ce1115af8..5315797ea3a 100644
--- a/contrib/pax_storage/src/cpp/storage/pax_defined.h
+++ b/contrib/pax_storage/src/cpp/storage/pax_defined.h
@@ -39,7 +39,7 @@ namespace pax {
 #define BITS_TO_BYTES(bits) (((bits) + 7) / 8)
 
 #define PAX_OFFSETS_DEFAULT_COMPRESSTYPE \
-  ColumnEncoding_Kind::ColumnEncoding_Kind_COMPRESS_ZSTD
+  ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA
 #define PAX_OFFSETS_DEFAULT_COMPRESSLEVEL 5
 
 #define COLUMN_STORAGE_FORMAT_IS_VEC(column) \
diff --git a/contrib/pax_storage/src/test/regress/expected/gp_toolkit.out 
b/contrib/pax_storage/src/test/regress/expected/gp_toolkit.out
index 336354081af..745db42283a 100644
--- a/contrib/pax_storage/src/test/regress/expected/gp_toolkit.out
+++ b/contrib/pax_storage/src/test/regress/expected/gp_toolkit.out
@@ -304,7 +304,7 @@ update pg_statistic set stawidth=2034567890 where starelid 
= 'wide_width_test'::
 select btdrelpages, btdexppages from gp_toolkit.gp_bloat_expected_pages where 
btdrelid='wide_width_test'::regclass;
  btdrelpages | btdexppages 
 -------------+-------------
-           4 |  3104504228
+           1 |  3104504228
 (1 row)
 
 select * from gp_toolkit.gp_bloat_diag WHERE bdinspname <> 'pg_catalog';


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to