This is an automated email from the ASF dual-hosted git repository.
gongxun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git
The following commit(s) were added to refs/heads/main by this push:
new 29d2a2a824d feat: use ColumnEncoding_Kind_DIRECT_DELTA as default in
offset stream (#1337)
29d2a2a824d is described below
commit 29d2a2a824d85b9580f10622cd7770d82ab0d341
Author: Xun Gong <[email protected]>
AuthorDate: Wed Sep 3 11:29:41 2025 +0800
feat: use ColumnEncoding_Kind_DIRECT_DELTA as default in offset stream
(#1337)
* feat: use ColumnEncoding_Kind_DIRECT_DELTA as default in offset stream
Optimize performance of variable-length column offsets by switching from
Zstd to delta encoding. This approach better compresses incremental integer
sequences, cutting disk space by more than half while maintaining
performance.
The following is a comparison of file sizes for different encoding methods
on TPC-DS 20G:
Name PAX(ZSTD) AOCS_SIZE PAX(Delta) PAX SIZE /
AOCS * 100%
call_center 12 kB 231 kB 10185 bytes 4.31%
catalog_page 499 kB 653 kB 393 kB 60.18%
catalog_returns 240 MB 171 MB 178 MB 104.09%
catalog_sales 3033 MB 1837 MB 1977 MB 107.63%
customer 16 MB 12 MB 12 MB 100.00%
customer_address 7008 kB 3161 kB 3115 kB 98.54%
customer_demographics 28 MB 8164 kB 9292 kB 113.82%
date_dim 3193 kB 1406 kB 1249 kB 88.85%
household_demographics 42 kB 248 kB 28 kB 11.29%
income_band 1239 bytes 225 kB 1239 bytes 0.54%
inventory 36 MB 71 MB 36 MB 50.70%
item 3084 kB 2479 kB 2227 kB 89.84%
promotion 27 kB 239 kB 18 kB 7.53%
reason 2730 bytes 226 kB 2280 bytes 0.99%
ship_mode 3894 bytes 227 kB 3315 bytes 1.43%
store 23 kB 239 kB 18 kB 7.53%
store_returns 400 MB 265 MB 277 MB 104.53%
store_sales 4173 MB 2384 MB 2554 MB 107.12%
time_dim 1702 kB 819 kB 627 kB 76.56%
warehouse 5394 bytes 227 kB 4698 bytes 2.02%
web_page 21 kB 236 kB 14 kB 5.93%
web_returns 116 MB 83 MB 85 MB 102.41%
web_sales 1513 MB 908 MB 982 MB 108.15%
---
contrib/pax_storage/.gitignore | 1 +
contrib/pax_storage/src/cpp/cmake/pax.cmake | 1 +
contrib/pax_storage/src/cpp/cmake/pax_format.cmake | 1 +
contrib/pax_storage/src/cpp/pax_gbench.cc | 302 +++++++++++-
contrib/pax_storage/src/cpp/pax_gbench.h | 72 +++
.../src/cpp/storage/columns/pax_column_test.cc | 15 +-
.../src/cpp/storage/columns/pax_compress_bench.cc | 421 +++++++++++++++++
.../src/cpp/storage/columns/pax_decoding.cc | 3 +-
.../src/cpp/storage/columns/pax_delta_encoding.cc | 511 +++++++++++++++++++++
.../src/cpp/storage/columns/pax_delta_encoding.h | 135 ++++++
.../cpp/storage/columns/pax_delta_encoding_test.cc | 339 ++++++++++++++
.../src/cpp/storage/columns/pax_dict_encoding.h | 15 +-
.../src/cpp/storage/columns/pax_encoding.cc | 4 +-
.../src/cpp/storage/columns/pax_encoding.h | 2 +
.../columns/pax_encoding_non_fixed_column.cc | 88 +++-
.../columns/pax_encoding_non_fixed_column.h | 3 +
.../src/cpp/storage/columns/pax_encoding_test.cc | 92 ++++
.../src/cpp/storage/columns/pax_rlev2_encoding.h | 4 +
.../cpp/storage/columns/pax_vec_encoding_column.cc | 2 +-
.../cpp/storage/columns/pax_vec_encoding_column.h | 3 +
.../pax_storage/src/cpp/storage/micro_partition.h | 1 -
.../pax_storage/src/cpp/storage/orc/orc_writer.cc | 18 +-
contrib/pax_storage/src/cpp/storage/pax.cc | 22 +-
contrib/pax_storage/src/cpp/storage/pax_defined.h | 2 +-
.../src/test/regress/expected/gp_toolkit.out | 2 +-
25 files changed, 1989 insertions(+), 70 deletions(-)
diff --git a/contrib/pax_storage/.gitignore b/contrib/pax_storage/.gitignore
index 51a328f84e0..87aa2a4a742 100644
--- a/contrib/pax_storage/.gitignore
+++ b/contrib/pax_storage/.gitignore
@@ -12,6 +12,7 @@
Thumbs.db
# Temp files dir
+bench_data
.tmp/**
build*/**
results/**
diff --git a/contrib/pax_storage/src/cpp/cmake/pax.cmake
b/contrib/pax_storage/src/cpp/cmake/pax.cmake
index 71775bac2dd..099a66f30d8 100644
--- a/contrib/pax_storage/src/cpp/cmake/pax.cmake
+++ b/contrib/pax_storage/src/cpp/cmake/pax.cmake
@@ -51,6 +51,7 @@ set(pax_storage_src
storage/columns/pax_dict_encoding.cc
storage/columns/pax_decoding.cc
storage/columns/pax_encoding.cc
+ storage/columns/pax_delta_encoding.cc
storage/columns/pax_rlev2_decoding.cc
storage/columns/pax_rlev2_encoding.cc
storage/columns/pax_vec_bitpacked_column.cc
diff --git a/contrib/pax_storage/src/cpp/cmake/pax_format.cmake
b/contrib/pax_storage/src/cpp/cmake/pax_format.cmake
index 4bdc25671f9..5a12185a0e6 100644
--- a/contrib/pax_storage/src/cpp/cmake/pax_format.cmake
+++ b/contrib/pax_storage/src/cpp/cmake/pax_format.cmake
@@ -41,6 +41,7 @@ set(pax_storage_src
storage/columns/pax_dict_encoding.cc
storage/columns/pax_decoding.cc
storage/columns/pax_encoding.cc
+ storage/columns/pax_delta_encoding.cc
storage/columns/pax_rlev2_decoding.cc
storage/columns/pax_rlev2_encoding.cc
storage/columns/pax_vec_column.cc
diff --git a/contrib/pax_storage/src/cpp/pax_gbench.cc
b/contrib/pax_storage/src/cpp/pax_gbench.cc
index 82dbaaa7bb2..b6a0ecb0c76 100644
--- a/contrib/pax_storage/src/cpp/pax_gbench.cc
+++ b/contrib/pax_storage/src/cpp/pax_gbench.cc
@@ -25,12 +25,310 @@
*-------------------------------------------------------------------------
*/
+#include "pax_gbench.h"
+
+#include "comm/cbdb_api.h"
+
#include <benchmark/benchmark.h>
-static void example_benchmark(benchmark::State &state) {
+#include <functional>
+#include <memory>
+#include <vector>
+
+#include "access/paxc_rel_options.h"
+#include "comm/cbdb_wrappers.h"
+#include "cpp-stub/src/stub.h"
+#include "storage/micro_partition_iterator.h"
+#include "storage/pax.h"
+#include "storage/strategy.h"
+
+namespace pax::bench {
+
+// Create memory context for benchmark
+void CreateMemoryContext() {
+ MemoryContext test_memory_context = AllocSetContextCreate(
+ (MemoryContext)NULL, "TestMemoryContext", 80 * 1024 * 1024,
+ 80 * 1024 * 1024, 80 * 1024 * 1024);
+ MemoryContextSwitchTo(test_memory_context);
+}
+
+// Global registry
+class BenchmarkRegistry {
+ private:
+ std::vector<InitFunction> init_functions_;
+ std::vector<CleanupFunction> cleanup_functions_;
+ bool initialized_ = false;
+
+ public:
+ void RegisterInitFunction(InitFunction func) {
+ init_functions_.push_back(func);
+ }
+
+ void RegisterCleanupFunction(CleanupFunction func) {
+ cleanup_functions_.push_back(func);
+ }
+
+ void RunAllInitFunctions() {
+ if (initialized_) return;
+
+ printf("Running PAX Benchmark Suite...\n");
+ printf("Initializing all benchmark modules...\n\n");
+
+ for (const auto &func : init_functions_) {
+ func();
+ }
+ initialized_ = true;
+ }
+
+ void RunAllCleanupFunctions() {
+ if (!initialized_) return;
+
+ printf("\nCleaning up all benchmark modules...\n");
+
+ // Cleanup functions executed in reverse order
+ for (auto it = cleanup_functions_.rbegin(); it !=
cleanup_functions_.rend();
+ ++it) {
+ (*it)();
+ }
+ initialized_ = false;
+ }
+};
+
+// Global registry access function
+BenchmarkRegistry &GetBenchmarkRegistry() {
+ static BenchmarkRegistry instance;
+ return instance;
+}
+
+// Registration functions
+void RegisterBenchmarkInit(InitFunction func) {
+ GetBenchmarkRegistry().RegisterInitFunction(func);
+}
+
+void RegisterBenchmarkCleanup(CleanupFunction func) {
+ GetBenchmarkRegistry().RegisterCleanupFunction(func);
+}
+
+// Global Mock functions for benchmark framework
+bool MockMinMaxGetStrategyProcinfo(Oid, Oid, Oid *, FmgrInfo *,
+ StrategyNumber) {
+ return false;
+}
+
+int32 MockGetFastSequences(Oid) {
+ static int32 mock_id = 0;
+ return mock_id++;
+}
+
+void MockInsertMicroPartitionPlaceHolder(Oid, int) {}
+void MockDeleteMicroPartitionEntry(Oid, Snapshot, int) {}
+void MockExecStoreVirtualTuple(TupleTableSlot *) {}
+
+std::string MockBuildPaxDirectoryPath(RelFileNode rnode, BackendId backend_id)
{
+ // Create a simple file path for benchmarks
+ return std::string("./bench_data");
+}
+
+std::vector<int> MockGetMinMaxColumnIndexes(Relation) {
+ return std::vector<int>();
+}
+
+std::vector<int> MockBloomFilterColumnIndexes(Relation) {
+ return std::vector<int>();
+}
+
+std::vector<std::tuple<ColumnEncoding_Kind, int>> MockGetRelEncodingOptions(
+ Relation relation) {
+ std::vector<std::tuple<ColumnEncoding_Kind, int>> encoding_opts;
+
+ // Get number of columns from relation
+ int num_columns = 10; // default for benchmark
+ if (relation && relation->rd_att) {
+ num_columns = relation->rd_att->natts;
+ }
+
+ // Create encoding options for each column (NO_ENCODED, 0)
+ for (int i = 0; i < num_columns; i++) {
+ encoding_opts.emplace_back(
+ std::make_tuple(ColumnEncoding_Kind_NO_ENCODED, 0));
+ }
+
+ return encoding_opts;
+}
+
+// Mock TupleDescInitEntry that doesn't rely on SYSCACHE
+void MockTupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber,
+ const char *attributeName, Oid oidtypeid,
+ int32 typmod, int attdim) {
+ // Basic validation
+ if (attributeNumber < 1 || attributeNumber > desc->natts) {
+ return;
+ }
+
+ Form_pg_attribute att = TupleDescAttr(desc, attributeNumber - 1);
+
+ // Set basic attribute properties
+ namestrcpy(&(att->attname), attributeName);
+ att->atttypid = oidtypeid;
+ att->atttypmod = typmod;
+ att->attndims = attdim;
+ att->attnum = attributeNumber;
+ att->attnotnull = false;
+ att->atthasdef = false;
+ att->attidentity = '\0';
+ att->attgenerated = '\0';
+ att->attisdropped = false;
+ att->attislocal = true;
+ att->attinhcount = 0;
+ att->attcollation = InvalidOid;
+
+ // Set type-specific properties based on OID (hardcoded for common types)
+ switch (oidtypeid) {
+ case INT2OID: // smallint
+ att->attlen = 2;
+ att->attalign = 's';
+ att->attstorage = 'p';
+ att->attbyval = true;
+ break;
+ case INT4OID: // integer
+ att->attlen = 4;
+ att->attalign = 'i';
+ att->attstorage = TYPSTORAGE_PLAIN;
+ att->attbyval = true;
+ break;
+ case INT8OID: // bigint
+ att->attlen = 8;
+ att->attalign = 'd';
+ att->attstorage = TYPSTORAGE_PLAIN;
+ att->attbyval = FLOAT8PASSBYVAL;
+ break;
+ case FLOAT8OID: // double precision
+ att->attlen = 8;
+ att->attalign = 'd';
+ att->attstorage = 'p';
+ att->attbyval = FLOAT8PASSBYVAL;
+ break;
+ case BOOLOID: // boolean
+ att->attlen = 1;
+ att->attalign = 'c';
+ att->attstorage = 'p';
+ att->attbyval = true;
+ break;
+ case TEXTOID: // text
+ att->attlen = -1;
+ att->attalign = 'i';
+ att->attstorage = TYPSTORAGE_PLAIN;
+ att->attbyval = false;
+ att->attcollation = DEFAULT_COLLATION_OID;
+ break;
+ case NUMERICOID: // numeric
+ att->attlen = -1;
+ att->attalign = TYPALIGN_INT;
+ att->attstorage = TYPSTORAGE_PLAIN;
+ att->attbyval = false;
+ break;
+ case TIMESTAMPOID: // timestamp
+ att->attlen = 8;
+ att->attalign = 'd';
+ att->attstorage = TYPSTORAGE_PLAIN;
+ att->attbyval = FLOAT8PASSBYVAL;
+ break;
+ default:
+ // Default values for unknown types
+ att->attlen = -1;
+ att->attalign = 'i';
+ att->attstorage = 'p';
+ att->attbyval = false;
+ break;
+ }
+}
+
+// Global initialization function for general benchmark framework
+void GlobalBenchmarkInit() {
+ static bool global_initialized = false;
+ if (global_initialized) return;
+
+ printf("Initializing PAX benchmark framework...\n");
+
+ // Initialize memory context
+ MemoryContextInit();
+
+ // Setup global Mock functions
+ static std::unique_ptr<Stub> stub_global = std::make_unique<Stub>();
+
+ stub_global->set(MinMaxGetPgStrategyProcinfo, MockMinMaxGetStrategyProcinfo);
+ stub_global->set(CPaxGetFastSequences, MockGetFastSequences);
+ stub_global->set(cbdb::BuildPaxDirectoryPath, MockBuildPaxDirectoryPath);
+ stub_global->set(cbdb::InsertMicroPartitionPlaceHolder,
+ MockInsertMicroPartitionPlaceHolder);
+ stub_global->set(cbdb::DeleteMicroPartitionEntry,
+ MockDeleteMicroPartitionEntry);
+ stub_global->set(cbdb::GetMinMaxColumnIndexes, MockGetMinMaxColumnIndexes);
+ stub_global->set(cbdb::GetBloomFilterColumnIndexes,
+ MockBloomFilterColumnIndexes);
+ stub_global->set(cbdb::GetRelEncodingOptions, MockGetRelEncodingOptions);
+ stub_global->set(ExecStoreVirtualTuple, MockExecStoreVirtualTuple);
+ stub_global->set(TupleDescInitEntry, MockTupleDescInitEntry);
+
+ // Create basic test directory
+ system("mkdir -p ./bench_data");
+
+ global_initialized = true;
+ printf("PAX benchmark framework initialized.\n");
+}
+
+// Global cleanup function for general benchmark framework
+void GlobalBenchmarkCleanup() {
+ printf("Cleaning up PAX benchmark framework...\n");
+
+ // Clean up test directory
+ // system("rm -rf ./bench_data");
+
+ // Reset memory context
+ if (TopMemoryContext) {
+ MemoryContextReset(TopMemoryContext);
+ }
+
+ printf("PAX benchmark framework cleaned up.\n");
+}
+
+// Example benchmark test
+static void example_benchmark(::benchmark::State &state) {
for (auto _ : state) {
+ // Empty example test
}
}
BENCHMARK(example_benchmark);
-BENCHMARK_MAIN();
\ No newline at end of file
+} // namespace pax::benchmark
+
+// Global cleanup function (C-style for atexit)
+static void cleanup_all() {
+ pax::bench::GetBenchmarkRegistry().RunAllCleanupFunctions();
+ pax::bench::GlobalBenchmarkCleanup();
+}
+
+// Main entry function
+int main(int argc, char **argv) {
+ // Register global cleanup function
+ std::atexit(cleanup_all);
+
+ // Global initialization
+ pax::bench::GlobalBenchmarkInit();
+
+ // Run all registered initialization functions
+ pax::bench::GetBenchmarkRegistry().RunAllInitFunctions();
+
+ // Initialize benchmark framework
+ ::benchmark::Initialize(&argc, argv);
+ if (::benchmark::ReportUnrecognizedArguments(argc, argv)) return 1;
+
+ printf("\n=== Starting PAX Benchmark Suite ===\n");
+ printf("Use --benchmark_filter=<pattern> to run specific tests\n");
+ printf("Use --benchmark_list_tests to see all available tests\n\n");
+
+ // Run benchmark
+ ::benchmark::RunSpecifiedBenchmarks();
+
+ return 0;
+}
\ No newline at end of file
diff --git a/contrib/pax_storage/src/cpp/pax_gbench.h
b/contrib/pax_storage/src/cpp/pax_gbench.h
new file mode 100644
index 00000000000..44376022693
--- /dev/null
+++ b/contrib/pax_storage/src/cpp/pax_gbench.h
@@ -0,0 +1,72 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * pax_gbench.h
+ *
+ * IDENTIFICATION
+ * contrib/pax_storage/src/cpp/pax_gbench.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#pragma once
+
+#include <functional>
+#include <benchmark/benchmark.h>
+
+namespace pax {
+
+namespace bench {
+
+// Generic initialization and cleanup function types
+using InitFunction = std::function<void()>;
+using CleanupFunction = std::function<void()>;
+
+// Create memory context for benchmark
+extern void CreateMemoryContext();
+
+// Forward declaration
+class BenchmarkRegistry;
+
+// Global registry access function
+BenchmarkRegistry &GetBenchmarkRegistry();
+
+// Global initialization and cleanup functions
+void GlobalBenchmarkInit();
+void GlobalBenchmarkCleanup();
+
+// Registration functions (implemented in pax_gbench.cc)
+void RegisterBenchmarkInit(InitFunction func);
+void RegisterBenchmarkCleanup(CleanupFunction func);
+
+} // namespace benchmark
+} // namespace pax
+
+// Convenient registration macros
+#define REGISTER_BENCHMARK_INIT(func) \
+ static bool BENCHMARK_INIT_##__COUNTER__ = []() { \
+ pax::bench::RegisterBenchmarkInit(func); \
+ return true; \
+ }()
+
+#define REGISTER_BENCHMARK_CLEANUP(func) \
+ static bool BENCHMARK_CLEANUP_##__COUNTER__ = []() { \
+ pax::bench::RegisterBenchmarkCleanup(func); \
+ return true; \
+ }()
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_column_test.cc
b/contrib/pax_storage/src/cpp/storage/columns/pax_column_test.cc
index dfd346ef615..b26fdff65bf 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_column_test.cc
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_column_test.cc
@@ -697,7 +697,6 @@ TEST_P(PaxNonFixedColumnCompressTest,
auto number = ::testing::get<0>(GetParam());
auto kind = ::testing::get<1>(GetParam());
auto verify_range = ::testing::get<2>(GetParam());
- auto enable_offsets_encoding = ::testing::get<2>(GetParam());
const size_t number_of_rows = 1024;
PaxEncoder::EncodingOption encoding_option;
@@ -705,10 +704,9 @@ TEST_P(PaxNonFixedColumnCompressTest,
encoding_option.compress_level = 5;
encoding_option.is_sign = true;
- if (enable_offsets_encoding) {
- encoding_option.offsets_encode_type = kind;
- encoding_option.offsets_compress_level = 5;
- }
+ encoding_option.offsets_encode_type =
+ ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+ encoding_option.offsets_compress_level = 5;
non_fixed_column = new PaxNonFixedEncodingColumn(
number_of_rows, number_of_rows, std::move(encoding_option));
@@ -744,10 +742,9 @@ TEST_P(PaxNonFixedColumnCompressTest,
decoding_option.is_sign = true;
decoding_option.compress_level = 5;
- if (enable_offsets_encoding) {
- decoding_option.offsets_encode_type = kind;
- decoding_option.offsets_compress_level = 5;
- }
+ decoding_option.offsets_encode_type =
+ ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+ decoding_option.offsets_compress_level = 5;
auto non_fixed_column_for_read = new PaxNonFixedEncodingColumn(
number_of_rows * number, sizeof(int32) * number_of_rows,
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_compress_bench.cc
b/contrib/pax_storage/src/cpp/storage/columns/pax_compress_bench.cc
new file mode 100644
index 00000000000..0a792601e99
--- /dev/null
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_compress_bench.cc
@@ -0,0 +1,421 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * pax_compress_bench.cc
+ *
+ * IDENTIFICATION
+ * contrib/pax_storage/src/cpp/storage/columns/pax_compress_bench.cc
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#include <algorithm>
+#include <cerrno>
+#include <chrono>
+#include <cstdint>
+#include <cstring>
+#include <fstream>
+#include <iostream>
+#include <memory>
+#include <random>
+#include <vector>
+
+#include <cstdio>
+#include <unistd.h>
+
+#include "comm/cbdb_wrappers.h"
+#include "comm/pax_memory.h"
+#include "pax_gbench.h"
+#include "storage/columns/pax_compress.h"
+#include "storage/columns/pax_decoding.h"
+#include "storage/columns/pax_delta_encoding.h"
+#include "storage/columns/pax_rlev2_encoding.h"
+#include "storage/pax_buffer.h"
+
+namespace pax::bench {
+
+namespace {
+
+// Test data and prebuilt buffers for decode/decompress benchmarks
+static const size_t kCount = 1024 * 1024;
+static std::vector<uint32_t> g_offsets;
+static std::unique_ptr<char[]> g_raw_bytes;
+static size_t g_raw_len = 0;
+
+static std::vector<char> g_rle_encoded;
+static size_t g_rle_len = 0;
+
+static std::vector<char> g_delta_encoded;
+static size_t g_delta_len = 0;
+
+static std::unique_ptr<char[]> g_zstd_compressed;
+static size_t g_zstd_len = 0;
+
+static std::shared_ptr<pax::PaxCompressor> g_zstd;
+
+// Simple helpers for bench data persistence
+static void EnsureDirExists(const char *dir_path) {
+ if (mkdir(dir_path, 0755) != 0) {
+ if (errno != EEXIST) {
+ std::cerr << "Failed to create directory: " << dir_path << std::endl;
+ std::abort();
+ }
+ }
+}
+
+static bool ReadWholeFile(const char *path, std::vector<char> &out) {
+ std::ifstream in(path, std::ios::binary);
+ if (!in.is_open()) return false;
+ in.seekg(0, std::ios::end);
+ std::streampos size = in.tellg();
+ if (size <= 0) return false;
+ out.resize(static_cast<size_t>(size));
+ in.seekg(0, std::ios::beg);
+ in.read(out.data(), size);
+ return static_cast<bool>(in);
+}
+
+static bool ReadWholeFile(const char *path, std::unique_ptr<char[]> &out,
+ size_t &out_len) {
+ std::ifstream in(path, std::ios::binary);
+ if (!in.is_open()) return false;
+ in.seekg(0, std::ios::end);
+ std::streampos size = in.tellg();
+ if (size <= 0) return false;
+ out_len = static_cast<size_t>(size);
+ out = std::make_unique<char[]>(out_len);
+ in.seekg(0, std::ios::beg);
+ in.read(out.get(), size);
+ return static_cast<bool>(in);
+}
+
+static void WriteWholeFile(const char *path, const char *data, size_t len) {
+ std::ofstream out(path, std::ios::binary | std::ios::trunc);
+ if (!out.is_open()) {
+ std::cerr << "Failed to open file for write: " << path << std::endl;
+ std::abort();
+ }
+ out.write(data, static_cast<std::streamsize>(len));
+ if (!out) {
+ std::cerr << "Failed to write file: " << path << std::endl;
+ std::abort();
+ }
+}
+
+static const char *kBenchDataDir = "bench_data";
+static const char *kRLEV2Path = "bench_data/rle_v2_u32.bin";
+static const char *kDeltaPath = "bench_data/delta_u32.bin";
+static const char *kZSTDPath = "bench_data/zstd_u32.bin";
+static const char *kRawPath = "bench_data/raw_u32.bin";
+
+static std::vector<uint32_t> GenerateMonotonicOffsets(size_t n, uint32_t seed)
{
+ std::vector<uint32_t> offsets;
+ offsets.resize(n);
+ offsets[0] = 0;
+ std::mt19937 rng(seed);
+ std::uniform_int_distribution<int> step_dist(1, 256);
+ for (size_t i = 1; i < n; ++i) {
+ offsets[i] = offsets[i - 1] + static_cast<uint32_t>(step_dist(rng));
+ }
+ return offsets;
+}
+
+// Lazily ensure raw bytes are available (prefer loading from disk)
+static void EnsureRawData() {
+ if (g_raw_len != 0 && g_raw_bytes) return;
+ EnsureDirExists(kBenchDataDir);
+ std::vector<char> raw_from_file;
+ if (ReadWholeFile(kRawPath, raw_from_file)) {
+ g_raw_len = raw_from_file.size();
+ g_raw_bytes = std::make_unique<char[]>(g_raw_len);
+ std::memcpy(g_raw_bytes.get(), raw_from_file.data(), g_raw_len);
+ return;
+ }
+ // Fallback: generate and persist
+ g_offsets = GenerateMonotonicOffsets(kCount, /*seed=*/12345);
+ g_raw_len = g_offsets.size() * sizeof(uint32_t);
+ g_raw_bytes = std::make_unique<char[]>(g_raw_len);
+ std::memcpy(g_raw_bytes.get(), g_offsets.data(), g_raw_len);
+ WriteWholeFile(kRawPath, g_raw_bytes.get(), g_raw_len);
+}
+
+// Lazily ensure RLEv2 encoded buffer exists (load or build from raw)
+static void EnsureRleEncoded() {
+ if (g_rle_len != 0 && !g_rle_encoded.empty()) return;
+ EnsureDirExists(kBenchDataDir);
+ if (ReadWholeFile(kRLEV2Path, g_rle_encoded)) {
+ g_rle_len = g_rle_encoded.size();
+ return;
+ }
+ EnsureRawData();
+ PaxEncoder::EncodingOption enc_opt;
+ enc_opt.column_encode_type = ColumnEncoding_Kind_RLE_V2;
+ enc_opt.is_sign = false;
+
+ PaxOrcEncoder rle_encoder(enc_opt);
+ auto rle_out = std::make_shared<DataBuffer<char>>(g_raw_len);
+ rle_encoder.SetDataBuffer(rle_out);
+ // encode directly from raw bytes to avoid depending on g_offsets
+ size_t count = g_raw_len / sizeof(uint32_t);
+ const uint32_t *vals = reinterpret_cast<const uint32_t *>(g_raw_bytes.get());
+ for (size_t i = 0; i < count; ++i) {
+ uint32_t v = vals[i];
+ rle_encoder.Append(reinterpret_cast<char *>(&v), sizeof(uint32_t));
+ }
+ rle_encoder.Flush();
+
+ g_rle_len = rle_encoder.GetBufferSize();
+ g_rle_encoded.assign(rle_encoder.GetBuffer(),
+ rle_encoder.GetBuffer() + g_rle_len);
+ WriteWholeFile(kRLEV2Path, g_rle_encoded.data(), g_rle_len);
+}
+
+// Lazily ensure Delta encoded buffer exists (load or build from raw)
+static void EnsureDeltaEncoded() {
+ if (g_delta_len != 0 && !g_delta_encoded.empty()) return;
+ EnsureDirExists(kBenchDataDir);
+ if (ReadWholeFile(kDeltaPath, g_delta_encoded)) {
+ g_delta_len = g_delta_encoded.size();
+ return;
+ }
+ EnsureRawData();
+ PaxEncoder::EncodingOption enc_opt;
+ enc_opt.is_sign = false;
+ // type not used by PaxDeltaEncoder
+ PaxDeltaEncoder<uint32_t> delta_encoder(enc_opt);
+ auto delta_out = std::make_shared<DataBuffer<char>>(g_raw_len);
+ delta_encoder.SetDataBuffer(delta_out);
+ // Encode whole array in one shot
+ delta_encoder.Append(g_raw_bytes.get(), g_raw_len);
+ delta_encoder.Flush();
+
+ g_delta_len = delta_encoder.GetBufferSize();
+ g_delta_encoded.assign(delta_encoder.GetBuffer(),
+ delta_encoder.GetBuffer() + g_delta_len);
+ WriteWholeFile(kDeltaPath, g_delta_encoded.data(), g_delta_len);
+}
+
+// Lazily ensure ZSTD compressed buffer exists (load or build from raw)
+static void EnsureZstdCompressed() {
+ EnsureDirExists(kBenchDataDir);
+ if (!g_zstd) {
+ g_zstd =
+
PaxCompressor::CreateBlockCompressor(ColumnEncoding_Kind_COMPRESS_ZSTD);
+ if (!g_zstd) {
+ std::cerr << "Failed to create ZSTD compressor" << std::endl;
+ std::abort();
+ }
+ }
+ if (g_zstd_len != 0 && g_zstd_compressed) return;
+ if (ReadWholeFile(kZSTDPath, g_zstd_compressed, g_zstd_len)) {
+ return;
+ }
+ EnsureRawData();
+ size_t bound = g_zstd->GetCompressBound(g_raw_len);
+ g_zstd_compressed = std::make_unique<char[]>(bound);
+ g_zstd_len = g_zstd->Compress(g_zstd_compressed.get(), bound,
+ g_raw_bytes.get(), g_raw_len, /*lvl=*/5);
+ if (g_zstd->IsError(g_zstd_len) || g_zstd_len == 0) {
+ std::cerr << "ZSTD one-time compress failed" << std::endl;
+ std::abort();
+ }
+ WriteWholeFile(kZSTDPath, g_zstd_compressed.get(), g_zstd_len);
+}
+
+static void PrepareOnce() {
+ pax::bench::CreateMemoryContext();
+ EnsureDirExists(kBenchDataDir);
+}
+
+static void CleanupBenchData() {
+ const char *files[] = {kRLEV2Path, kDeltaPath, kZSTDPath, kRawPath};
+ for (const char *p : files) {
+ std::remove(p);
+ }
+
+ rmdir(kBenchDataDir);
+}
+
+} // namespace
+
+// Register module init with gbench framework
+REGISTER_BENCHMARK_INIT(PrepareOnce);
+REGISTER_BENCHMARK_CLEANUP(CleanupBenchData);
+
+// RLEv2 encode benchmark
+static void BM_RLEV2_Encode(::benchmark::State &state) {
+ // Prepare raw data only; no encoded buffers are created here
+ EnsureRawData();
+ for (auto _ : state) {
+ PaxEncoder::EncodingOption enc_opt;
+ enc_opt.column_encode_type = ColumnEncoding_Kind_RLE_V2;
+ enc_opt.is_sign = false;
+
+ PaxOrcEncoder encoder(enc_opt);
+ auto out = std::make_shared<DataBuffer<char>>(g_raw_len);
+ encoder.SetDataBuffer(out);
+
+ size_t count = g_raw_len / sizeof(uint32_t);
+ const uint32_t *vals =
+ reinterpret_cast<const uint32_t *>(g_raw_bytes.get());
+ for (size_t i = 0; i < count; ++i) {
+ uint32_t v = vals[i];
+ encoder.Append(reinterpret_cast<char *>(&v), sizeof(uint32_t));
+ }
+ encoder.Flush();
+ g_rle_len = encoder.GetBufferSize();
+ benchmark::DoNotOptimize(encoder.GetBuffer());
+ benchmark::ClobberMemory();
+ }
+ state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *
+ static_cast<int64_t>(g_raw_len));
+ state.counters["raw_kb"] =
+ benchmark::Counter(static_cast<double>(g_raw_len) / (1024.0));
+ state.counters["rle_kb"] =
+ benchmark::Counter(static_cast<double>(g_rle_len) / (1024.0));
+}
+BENCHMARK(BM_RLEV2_Encode);
+
+// RLEv2 decode benchmark
+static void BM_RLEV2_Decode(::benchmark::State &state) {
+ // Ensure we have raw size and encoded buffer ready (prefer from disk)
+ EnsureRawData();
+ EnsureRleEncoded();
+ for (auto _ : state) {
+ PaxDecoder::DecodingOption dec_opt;
+ dec_opt.column_encode_type = ColumnEncoding_Kind_RLE_V2;
+ dec_opt.is_sign = false;
+
+ auto decoder = PaxDecoder::CreateDecoder<int32>(dec_opt);
+ auto out = std::make_shared<DataBuffer<char>>(g_raw_len);
+ decoder->SetSrcBuffer(g_rle_encoded.data(), g_rle_len);
+ decoder->SetDataBuffer(out);
+ size_t n = decoder->Decoding();
+ benchmark::DoNotOptimize(n);
+ benchmark::ClobberMemory();
+ }
+ state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *
+ static_cast<int64_t>(g_raw_len));
+}
+BENCHMARK(BM_RLEV2_Decode);
+
+// Delta encode benchmark
+static void BM_Delta_Encode(::benchmark::State &state) {
+ EnsureRawData();
+ for (auto _ : state) {
+ PaxEncoder::EncodingOption enc_opt;
+ enc_opt.is_sign = false;
+ PaxDeltaEncoder<uint32_t> encoder(enc_opt);
+ auto out = std::make_shared<DataBuffer<char>>(g_raw_len);
+ encoder.SetDataBuffer(out);
+ encoder.Append(g_raw_bytes.get(), g_raw_len);
+ encoder.Flush();
+ g_delta_len = encoder.GetBufferSize();
+ benchmark::DoNotOptimize(encoder.GetBuffer());
+ benchmark::ClobberMemory();
+ }
+ state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *
+ static_cast<int64_t>(g_raw_len));
+ state.counters["delta_kb"] =
+ benchmark::Counter(static_cast<double>(g_delta_len) / (1024.0));
+}
+BENCHMARK(BM_Delta_Encode);
+
+// Delta decode benchmark
+static void BM_Delta_Decode(::benchmark::State &state) {
+ EnsureRawData();
+ EnsureDeltaEncoded();
+ for (auto _ : state) {
+ PaxDecoder::DecodingOption dec_opt;
+ dec_opt.is_sign = false;
+ dec_opt.column_encode_type = ColumnEncoding_Kind_DIRECT_DELTA;
+ PaxDeltaDecoder<int32> decoder(dec_opt);
+ auto out = std::make_shared<DataBuffer<char>>(g_raw_len);
+ decoder.SetSrcBuffer(g_delta_encoded.data(), g_delta_len);
+ decoder.SetDataBuffer(out);
+ size_t n = decoder.Decoding();
+ if (n != g_raw_len / sizeof(uint32_t) && out->Used() != g_raw_len) {
+ std::cerr << "Delta decode failed, n: " << n
+ << ", g_raw_len: " << g_raw_len
+ << ", g_delta_len: " << g_delta_len
+ << ", out: Used: " << out->Used() << std::endl;
+ std::abort();
+ }
+
+ if (memcmp(out->GetBuffer(), g_raw_bytes.get(), g_raw_len) != 0) {
+ std::cerr << "Delta decode failed, out: " << out->GetBuffer()
+ << ", g_raw_bytes: " << g_raw_bytes.get() << std::endl;
+ std::abort();
+ }
+
+ benchmark::DoNotOptimize(n);
+ benchmark::ClobberMemory();
+ }
+ state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *
+ static_cast<int64_t>(g_raw_len));
+}
+BENCHMARK(BM_Delta_Decode);
+
+// ZSTD compress benchmark
+static void BM_ZSTD_Compress(::benchmark::State &state) {
+ EnsureRawData();
+ if (!g_zstd) {
+ g_zstd =
+
PaxCompressor::CreateBlockCompressor(ColumnEncoding_Kind_COMPRESS_ZSTD);
+ if (!g_zstd) {
+ std::cerr << "Failed to create ZSTD compressor" << std::endl;
+ std::abort();
+ }
+ }
+ size_t bound = g_zstd->GetCompressBound(g_raw_len);
+ std::unique_ptr<char[]> dst(new char[bound]);
+ for (auto _ : state) {
+ size_t n = g_zstd->Compress(dst.get(), bound, g_raw_bytes.get(), g_raw_len,
+ /*lvl=*/5);
+ g_zstd_len = n;
+ benchmark::DoNotOptimize(n);
+ benchmark::ClobberMemory();
+ }
+ state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *
+ static_cast<int64_t>(g_raw_len));
+ state.counters["zstd_kb"] =
+ benchmark::Counter(static_cast<double>(g_zstd_len) / (1024.0));
+}
+BENCHMARK(BM_ZSTD_Compress);
+
+// ZSTD decompress benchmark
+static void BM_ZSTD_Decompress(::benchmark::State &state) {
+ EnsureRawData();
+ EnsureZstdCompressed();
+ std::unique_ptr<char[]> dst(new char[g_raw_len]);
+ for (auto _ : state) {
+ size_t n = g_zstd->Decompress(dst.get(), g_raw_len,
g_zstd_compressed.get(),
+ g_zstd_len);
+ benchmark::DoNotOptimize(n);
+ benchmark::ClobberMemory();
+ }
+ state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *
+ static_cast<int64_t>(g_raw_len));
+}
+BENCHMARK(BM_ZSTD_Decompress);
+
+} // namespace pax::bench
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_decoding.cc
b/contrib/pax_storage/src/cpp/storage/columns/pax_decoding.cc
index 7ba0fcd6768..0e15ec52088 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_decoding.cc
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_decoding.cc
@@ -31,6 +31,7 @@
#include "comm/pax_memory.h"
#include "storage/columns/pax_dict_encoding.h"
#include "storage/columns/pax_rlev2_decoding.h"
+#include "storage/columns/pax_delta_encoding.h"
namespace pax {
@@ -47,7 +48,7 @@ std::shared_ptr<PaxDecoder> PaxDecoder::CreateDecoder(const
DecodingOption &deco
break;
}
case ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA: {
- /// TODO(jiaqizho) support it
+ decoder = std::make_shared<PaxDeltaDecoder<T>>(decoder_options);
break;
}
case ColumnEncoding_Kind::ColumnEncoding_Kind_DICTIONARY: {
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc
b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc
new file mode 100644
index 00000000000..3f4b5341c4a
--- /dev/null
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc
@@ -0,0 +1,511 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * pax_delta_encoding.cc
+ *
+ * IDENTIFICATION
+ * contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "storage/columns/pax_delta_encoding.h"
+
+#include <algorithm>
+#include <cstring>
+#include <vector>
+
+namespace pax {
+
+// delta bitpack encoder
+template <typename T>
+PaxDeltaEncoder<T>::PaxDeltaEncoder(const EncodingOption &encoder_options)
+ : PaxEncoder(encoder_options) {}
+
+template <typename T>
+void PaxDeltaEncoder<T>::Append(char *data, size_t size) {
+ CBDB_CHECK(!has_append_, cbdb::CException::kExTypeAbort,
+ fmt("PaxDeltaEncoder::Append only support Append Once"));
+ has_append_ = true;
+
+ auto T_data = reinterpret_cast<T *>(data);
+ auto T_data_len = size / sizeof(T);
+ Encode(T_data, T_data_len);
+}
+
+inline uint8_t NumBitsAllowZero(uint32_t value) {
+ if (value == 0) return 0;
+ uint8_t bits = 0;
+ while (value) {
+ bits++;
+ value >>= 1;
+ }
+ return bits;
+}
+
+// Fast bit width calculation (0 -> 0)
+inline static uint8_t FastNumBits(uint32_t v) {
+#if defined(__GNUC__) || defined(__clang__)
+ return v == 0 ? 0 : static_cast<uint8_t>(32 - __builtin_clz(v));
+#else
+ uint8_t bits = 0;
+ while (v) {
+ ++bits;
+ v >>= 1;
+ }
+ return bits;
+#endif
+}
+
+// 64-bit bit writer based on raw pointer (writes to reserved DataBuffer range)
+struct BitWriter64Ptr {
+ uint8_t *out;
+ size_t index;
+ uint64_t bit_buffer;
+ uint32_t bit_count;
+
+ BitWriter64Ptr(uint8_t *p) : out(p), index(0), bit_buffer(0), bit_count(0) {}
+
+ inline void Append(uint32_t value, uint8_t width) {
+ if (width == 0) return;
+ bit_buffer |= (static_cast<uint64_t>(value) << bit_count);
+ bit_count += width;
+ while (bit_count >= 8) {
+ out[index++] = static_cast<uint8_t>(bit_buffer & 0xFF);
+ bit_buffer >>= 8;
+ bit_count -= 8;
+ }
+ }
+
+ inline void FlushToByte() {
+ if (bit_count > 0) {
+ out[index++] = static_cast<uint8_t>(bit_buffer & 0xFF);
+ bit_buffer = 0;
+ bit_count = 0;
+ }
+ }
+};
+
+// 64-bit bit reader based on raw pointer (limited to specified payload bytes)
+struct BitReader64Ptr {
+ const uint8_t *in;
+ size_t size;
+ size_t index;
+ uint64_t bit_buffer;
+ uint32_t bit_count;
+
+ BitReader64Ptr(const uint8_t *p, size_t len)
+ : in(p), size(len), index(0), bit_buffer(0), bit_count(0) {}
+
+ inline void Ensure(uint32_t need_bits) {
+ while (bit_count < need_bits && index < size) {
+ bit_buffer |= (static_cast<uint64_t>(in[index]) << bit_count);
+ ++index;
+ bit_count += 8;
+ }
+ }
+
+ inline uint32_t Read(uint8_t width) {
+ if (width == 0) return 0;
+ Ensure(width);
+ uint32_t result;
+ if (width == 32)
+ result = static_cast<uint32_t>(bit_buffer & 0xFFFFFFFFull);
+ else
+ result = static_cast<uint32_t>(bit_buffer & ((1ull << width) - 1));
+ bit_buffer >>= width;
+ bit_count -= width;
+ return result;
+ }
+
+ inline void AlignToByte() {
+ uint32_t drop = bit_count % 8;
+ if (drop) {
+ bit_buffer >>= drop;
+ bit_count -= drop;
+ }
+ }
+};
+
+/*
+Overall layout:
+ DeltaBlockHeader (struct, fixed-size)
+ - uint32 value_per_block
+ - uint32 values_per_mini_block
+ - uint32 total_count
+ T first_value
+ [Repeated Block until total_count is exhausted]
+ - uint32 min_delta
+ - uint8 bit_widths[ mini_blocks_per_block ]
+ - uint8 payload[computed from bit_widths]
+ // bit-packed adjusted deltas, mini-block by mini-block
+ // within a block: bits are written MSB-first, end aligned to byte
+*/
+
+template <typename T>
+size_t PaxDeltaEncoder<T>::GetBoundSize(size_t src_len) const {
+ size_t value_count = src_len / sizeof(T);
+ size_t block_count = (value_count + value_per_block_ - 1) / value_per_block_;
+ /* header + first_value + block_count * (min_delta + bit_widths )
+ * + payload was eliminated to value_count*/
+ return sizeof(DeltaBlockHeader) + sizeof(T) +
+ block_count * (sizeof(uint32) + mini_blocks_per_block_) + value_count;
+}
+
+template <typename T>
+void PaxDeltaEncoder<T>::Encode(T *data, size_t count) {
+ // Estimate allocation: by element byte count, sufficient to accommodate
+ // header and bit stream
+ if (result_buffer_->Capacity() <
+ count * sizeof(T) + sizeof(DeltaBlockHeader) + sizeof(T)) {
+ result_buffer_->ReSize(count * sizeof(T) + sizeof(DeltaBlockHeader) +
+ sizeof(T));
+ }
+
+ DeltaBlockHeader header;
+ header.value_per_block = value_per_block_;
+ header.values_per_mini_block = values_per_mini_block_;
+ header.total_count = count;
+ // add delta block header
+ result_buffer_->Write(reinterpret_cast<char *>(&header), sizeof(header));
+ result_buffer_->Brush(sizeof(header));
+ // add base value
+ result_buffer_->Write(reinterpret_cast<char *>(&data[0]), sizeof(data[0]));
+ result_buffer_->Brush(sizeof(data[0]));
+
+ size_t values_emitted = 1;
+ T previous_value = data[0];
+
+ while (values_emitted < count) {
+ uint32_t values_in_block = std::min(
+ value_per_block_, static_cast<uint32_t>(count - values_emitted));
+
+ if (deltas_scratch_.size() < values_in_block) {
+ deltas_scratch_.resize(values_in_block);
+ }
+ uint32_t *deltas = deltas_scratch_.data();
+ uint32_t min_delta = UINT32_MAX;
+ uint32_t mini_max[mini_blocks_per_block_] = {0};
+
+ for (uint32_t i = 0; i < values_in_block; ++i) {
+ T current = data[values_emitted + i];
+ uint32_t delta = static_cast<uint32_t>(current - previous_value);
+ deltas[i] = delta;
+ previous_value = current;
+ if (delta < min_delta) min_delta = delta;
+ uint32_t mini_index = i / values_per_mini_block_;
+ if (delta > mini_max[mini_index]) mini_max[mini_index] = delta;
+ }
+
+ // write block header: min_delta later
+ uint8_t bit_widths[mini_blocks_per_block_] = {0};
+ uint64_t total_bits = 0;
+ for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) {
+ uint32_t start = i * values_per_mini_block_;
+ if (start >= values_in_block) {
+ bit_widths[i] = 0;
+ continue;
+ }
+ uint32_t adjusted_max = mini_max[i] - min_delta;
+ uint8_t w = FastNumBits(adjusted_max);
+ bit_widths[i] = w;
+ uint32_t end = std::min(start + values_per_mini_block_, values_in_block);
+ total_bits += static_cast<uint64_t>(w) * (end - start);
+ }
+ uint32_t payload_bytes = static_cast<uint32_t>((total_bits + 7) / 8);
+
+ size_t need_size =
+ payload_bytes + mini_blocks_per_block_ + sizeof(min_delta);
+
+ // Grows the buffer to be at least need_size bytes. To avoid frequent
+ // resizing, the new capacity is calculated as the maximum of (current
+ // capacity * 1.5) or (current capacity + need_size).
+ if (result_buffer_->Available() < need_size) {
+ size_t inc_size = need_size > (result_buffer_->Capacity() * 0.5)
+ ? need_size
+ : result_buffer_->Capacity() * 0.5;
+ result_buffer_->ReSize(result_buffer_->Capacity() + inc_size);
+ }
+
+ // write block header: min_delta
+ result_buffer_->Write(reinterpret_cast<char *>(&min_delta),
+ sizeof(min_delta));
+ result_buffer_->Brush(sizeof(min_delta));
+
+ // write bit_widths
+ result_buffer_->Write(reinterpret_cast<char *>(bit_widths),
+ mini_blocks_per_block_);
+ result_buffer_->Brush(mini_blocks_per_block_);
+
+ uint8_t *payload_ptr =
+ reinterpret_cast<uint8_t *>(result_buffer_->GetAvailableBuffer());
+ BitWriter64Ptr bw(payload_ptr);
+ for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) {
+ uint32_t start = i * values_per_mini_block_;
+ if (start >= values_in_block) break;
+ uint32_t end = std::min(start + values_per_mini_block_, values_in_block);
+ uint8_t w = bit_widths[i];
+ if (w == 0) continue;
+ for (uint32_t j = start; j < end; ++j) {
+ uint32_t adjusted = deltas[j] - min_delta;
+ bw.Append(adjusted, w);
+ }
+ }
+ bw.FlushToByte();
+ result_buffer_->Brush(payload_bytes);
+
+ values_emitted += values_in_block;
+ }
+}
+
+template <typename T>
+bool PaxDeltaEncoder<T>::SupportAppendNull() const {
+ return false;
+}
+
+template <typename T>
+void PaxDeltaEncoder<T>::Flush() {
+ // do nothing
+}
+
+// Specialized reading of one mini-block and batch writing results
+// (BitReader64Ptr)
+template <typename T>
+inline void ReadMiniBlockSpecializedPtr(BitReader64Ptr &br, T *out_values,
+ T ¤t_value, uint32_t count_in_mb,
+ uint32_t min_delta, uint8_t w) {
+ switch (w) {
+ case 0: {
+ for (uint32_t j = 0; j < count_in_mb; ++j) {
+ current_value =
+ static_cast<T>(static_cast<uint64_t>(current_value) + min_delta);
+ out_values[j] = current_value;
+ }
+ return;
+ }
+ case 8: {
+ for (uint32_t j = 0; j < count_in_mb; ++j) {
+ uint32_t adjusted = br.Read(8);
+ current_value = static_cast<T>(static_cast<uint64_t>(current_value) +
+ adjusted + min_delta);
+ out_values[j] = current_value;
+ }
+ return;
+ }
+ case 16: {
+ for (uint32_t j = 0; j < count_in_mb; ++j) {
+ uint32_t adjusted = br.Read(16);
+ current_value = static_cast<T>(static_cast<uint64_t>(current_value) +
+ adjusted + min_delta);
+ out_values[j] = current_value;
+ }
+ return;
+ }
+ case 32: {
+ for (uint32_t j = 0; j < count_in_mb; ++j) {
+ uint32_t adjusted = br.Read(32);
+ current_value = static_cast<T>(static_cast<uint64_t>(current_value) +
+ adjusted + min_delta);
+ out_values[j] = current_value;
+ }
+ return;
+ }
+ default: {
+ uint32_t j = 0;
+ const uint32_t n4 = count_in_mb & ~3u;
+ for (; j < n4; j += 4) {
+ uint32_t a0 = br.Read(w);
+ uint32_t a1 = br.Read(w);
+ uint32_t a2 = br.Read(w);
+ uint32_t a3 = br.Read(w);
+ current_value = static_cast<T>(static_cast<uint64_t>(current_value) +
+ a0 + min_delta);
+ out_values[j] = current_value;
+ current_value = static_cast<T>(static_cast<uint64_t>(current_value) +
+ a1 + min_delta);
+ out_values[j + 1] = current_value;
+ current_value = static_cast<T>(static_cast<uint64_t>(current_value) +
+ a2 + min_delta);
+ out_values[j + 2] = current_value;
+ current_value = static_cast<T>(static_cast<uint64_t>(current_value) +
+ a3 + min_delta);
+ out_values[j + 3] = current_value;
+ }
+ for (; j < count_in_mb; ++j) {
+ uint32_t a = br.Read(w);
+ current_value = static_cast<T>(static_cast<uint64_t>(current_value) +
+ a + min_delta);
+ out_values[j] = current_value;
+ }
+ return;
+ }
+ }
+}
+
+// Specialized reading of one mini-block and batch writing results
+template <typename T>
+PaxDeltaDecoder<T>::PaxDeltaDecoder(
+ const PaxDecoder::DecodingOption &encoder_options)
+ : PaxDecoder(encoder_options),
+ data_buffer_(nullptr),
+ result_buffer_(nullptr) {
+ CBDB_CHECK(encoder_options.column_encode_type ==
+ ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA,
+ cbdb::CException::kExTypeAbort,
+ fmt("PaxDeltaDecoder only support DIRECT_DELTA encoding"));
+ // TODO: if sign is true, should use zigzag encoding, now use delta encoding
+ // for offsets in non-fixed columns
+ CBDB_CHECK(encoder_options.is_sign == false,
+ cbdb::CException::kExTypeUnImplements,
+ fmt("PaxDeltaDecoder is not supported for signed data, "
+ "will support zigzag later"));
+}
+
+template <typename T>
+PaxDecoder *PaxDeltaDecoder<T>::SetSrcBuffer(char *data, size_t data_len) {
+ if (data) {
+ data_buffer_ =
+ std::make_shared<DataBuffer<char>>(data, data_len, false, false);
+ data_buffer_->Brush(data_len);
+ }
+ return this;
+}
+
+template <typename T>
+PaxDecoder *PaxDeltaDecoder<T>::SetDataBuffer(
+ std::shared_ptr<DataBuffer<char>> result_buffer) {
+ result_buffer_ = result_buffer;
+ return this;
+}
+
+template <typename T>
+const char *PaxDeltaDecoder<T>::GetBuffer() const {
+ return result_buffer_ ? result_buffer_->GetBuffer() : nullptr;
+}
+
+template <typename T>
+size_t PaxDeltaDecoder<T>::GetBufferSize() const {
+ return result_buffer_ ? result_buffer_->Used() : 0;
+}
+
+template <typename T>
+size_t PaxDeltaDecoder<T>::Next(const char * /*not_null*/) {
+ CBDB_RAISE(cbdb::CException::kExTypeUnImplements);
+}
+
+template <typename T>
+size_t PaxDeltaDecoder<T>::Decoding() {
+ if (!data_buffer_) return 0;
+ Assert(result_buffer_);
+
+ const uint8_t *p =
+ reinterpret_cast<const uint8_t *>(data_buffer_->GetBuffer());
+ uint32_t remaining = static_cast<uint32_t>(data_buffer_->Used());
+
+ // read header: values_per_block, values_per_mini_block_, total_count,
+ // first_value
+ DeltaBlockHeader header;
+ std::memcpy(&header, p, sizeof(header));
+ p += sizeof(header);
+ remaining -= sizeof(header);
+ uint32_t values_per_block = header.value_per_block;
+ uint32_t values_per_mini_block_ = header.values_per_mini_block;
+ uint32_t total_count = header.total_count;
+
+ T first_value;
+ std::memcpy(&first_value, p, sizeof(T));
+ p += sizeof(T);
+ remaining -= sizeof(T);
+
+ // reserve output buffer
+ if (result_buffer_->Capacity() < total_count * sizeof(T)) {
+ result_buffer_->ReSize(total_count * sizeof(T));
+ }
+
+ // write first value
+ T current_value = static_cast<T>(first_value);
+ result_buffer_->Write(reinterpret_cast<char *>(¤t_value), sizeof(T));
+ result_buffer_->Brush(sizeof(T));
+ uint32_t decoded = 1;
+
+ const uint32_t mini_blocks_per_block_ =
+ values_per_block / values_per_mini_block_;
+
+ while (decoded < total_count && remaining > 0) {
+ uint32_t min_delta;
+ std::memcpy(&min_delta, p, sizeof(min_delta));
+ p += sizeof(min_delta);
+ remaining -= sizeof(min_delta);
+
+ if (remaining < mini_blocks_per_block_) break;
+
+ uint8_t bit_widths[mini_blocks_per_block_] = {0};
+ for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) {
+ bit_widths[i] = *p++;
+ --remaining;
+ }
+
+ uint32_t values_in_block =
+ std::min(values_per_block, total_count - decoded);
+
+ // read payload: initialize reader with remaining bytes; we'll compute
+ // consumed
+ BitReader64Ptr br(p, remaining);
+
+ for (uint32_t i = 0; i < mini_blocks_per_block_ && decoded < total_count;
+ ++i) {
+ uint32_t start = i * values_per_mini_block_;
+ if (start >= values_in_block) break;
+ uint32_t end = std::min(start + values_per_mini_block_, values_in_block);
+ uint32_t cnt = end - start;
+ uint8_t w = bit_widths[i];
+
+ T *out_base = reinterpret_cast<T
*>(result_buffer_->GetAvailableBuffer());
+ ReadMiniBlockSpecializedPtr<T>(br, out_base, current_value, cnt,
+ min_delta, w);
+ result_buffer_->Brush(cnt * sizeof(T));
+ decoded += cnt;
+ }
+
+ br.AlignToByte();
+
+ size_t consumed = br.index;
+ p += consumed;
+ remaining -= consumed;
+ }
+
+ Assert(result_buffer_->Used() == total_count * sizeof(T));
+
+ return result_buffer_->Used();
+}
+
+template <typename T>
+size_t PaxDeltaDecoder<T>::Decoding(const char * /*not_null*/,
+ size_t /*not_null_len*/) {
+ CBDB_RAISE(cbdb::CException::kExTypeUnImplements);
+}
+
+template class PaxDeltaEncoder<uint32_t>;
+template class PaxDeltaDecoder<uint32_t>;
+// Add explicit instantiations for signed integral types used by CreateDecoder
+template class PaxDeltaDecoder<long>;
+template class PaxDeltaDecoder<int>;
+template class PaxDeltaDecoder<short>;
+template class PaxDeltaDecoder<signed char>;
+
+} // namespace pax
\ No newline at end of file
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.h
b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.h
new file mode 100644
index 00000000000..7f2251201bf
--- /dev/null
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.h
@@ -0,0 +1,135 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * pax_delta_encoding.h
+ *
+ * IDENTIFICATION
+ * contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#pragma once
+
+#include "storage/columns/pax_encoding.h"
+#include "storage/columns/pax_decoding.h"
+#include <vector>
+
+namespace pax {
+
+struct BitReader64 {
+ const uint8_t*& p;
+ uint32_t& remaining;
+ uint64_t bit_buffer = 0;
+ uint32_t bit_count = 0;
+
+ BitReader64(const uint8_t*& ptr, uint32_t& size) : p(ptr), remaining(size)
{}
+
+ inline void Ensure(uint32_t need_bits) {
+ while (bit_count < need_bits && remaining > 0) {
+ bit_buffer |= (static_cast<uint64_t>(*p) << bit_count);
+ ++p;
+ --remaining;
+ bit_count += 8;
+ }
+ }
+
+ inline uint32_t Read(uint8_t width) {
+ if (width == 0) return 0;
+ Ensure(width);
+ uint32_t result;
+ if (width == 32) {
+ result = static_cast<uint32_t>(bit_buffer & 0xFFFFFFFFull);
+ } else {
+ result = static_cast<uint32_t>(bit_buffer & ((1ull << width) - 1));
+ }
+ bit_buffer >>= width;
+ bit_count -= width;
+ return result;
+ }
+
+ inline void AlignToByte() {
+ uint32_t drop = bit_count % 8;
+ if (drop) {
+ bit_buffer >>= drop;
+ bit_count -= drop;
+ }
+ }
+};
+
+struct DeltaBlockHeader {
+ uint32_t value_per_block;
+ uint32_t values_per_mini_block;
+ uint32_t total_count;
+};
+
+template <typename T>
+class PaxDeltaEncoder : public PaxEncoder {
+ public:
+ explicit PaxDeltaEncoder(const EncodingOption &encoder_options);
+
+ virtual void Append(char *data, size_t size) override;
+
+ virtual bool SupportAppendNull() const override;
+
+ virtual void Flush() override;
+
+ virtual size_t GetBoundSize(size_t src_len) const override;
+
+ private:
+
+ void Encode(T *data, size_t size);
+
+ private:
+ static constexpr uint32_t value_per_block_ = 128;
+ static constexpr uint32_t mini_blocks_per_block_ = 4;
+ static constexpr uint32_t values_per_mini_block_ =
+ value_per_block_ / mini_blocks_per_block_;
+
+ private:
+ bool has_append_ = false;
+ // Reusable working buffer to avoid per-block allocations during encoding
+ std::vector<uint32_t> deltas_scratch_;
+};
+
+template <typename T>
+class PaxDeltaDecoder : public PaxDecoder {
+ public:
+ explicit PaxDeltaDecoder(const PaxDecoder::DecodingOption &encoder_options);
+
+ virtual PaxDecoder *SetSrcBuffer(char *data, size_t data_len) override;
+
+ virtual PaxDecoder *SetDataBuffer(
+ std::shared_ptr<DataBuffer<char>> result_buffer) override;
+
+ virtual size_t Next(const char *not_null) override;
+
+ virtual size_t Decoding() override;
+
+ virtual size_t Decoding(const char *not_null, size_t not_null_len) override;
+
+ virtual const char *GetBuffer() const override;
+
+ virtual size_t GetBufferSize() const override;
+
+ private:
+ std::shared_ptr<DataBuffer<char>> data_buffer_;
+ std::shared_ptr<DataBuffer<char>> result_buffer_;
+};
+
+} // namespace pax
\ No newline at end of file
diff --git
a/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding_test.cc
b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding_test.cc
new file mode 100644
index 00000000000..031563381ee
--- /dev/null
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding_test.cc
@@ -0,0 +1,339 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * pax_delta_encoding_test.cc
+ *
+ * IDENTIFICATION
+ * contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding_test.cc
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "storage/columns/pax_delta_encoding.h"
+
+#include <random>
+#include <vector>
+
+#include "comm/gtest_wrappers.h"
+#include "pax_gtest_helper.h"
+
+namespace pax {
+
+class PaxDeltaEncodingTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ // Create encoding options
+ encoding_options_.column_encode_type =
+ ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+ encoding_options_.is_sign = false;
+
+ // Create decoding options
+ decoding_options_.column_encode_type =
+ ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+ decoding_options_.is_sign = false;
+ }
+
+ void TearDown() override {}
+
+ // Fast bit width calculation (0 -> 0)
+ inline uint8_t FastNumBits(uint32_t v) {
+#if defined(__GNUC__) || defined(__clang__)
+ return v == 0 ? 0 : static_cast<uint8_t>(32 - __builtin_clz(v));
+#else
+ uint8_t bits = 0;
+ while (v) {
+ ++bits;
+ v >>= 1;
+ }
+ return bits;
+#endif
+ }
+
+ // Helper function to encode and decode data
+ template <typename T>
+ std::vector<T> EncodeAndDecode(const std::vector<T> &input) {
+ // Create encoder
+ PaxDeltaEncoder<T> encoder(encoding_options_);
+
+ size_t bound_size = encoder.GetBoundSize(input.size() * sizeof(T));
+
+ encoder.SetDataBuffer(std::make_shared<DataBuffer<char>>(bound_size));
+
+ // Encode data
+ encoder.Append(reinterpret_cast<char *>(const_cast<T *>(input.data())),
+ input.size() * sizeof(T));
+
+ // Get encoded buffer
+ const char *encoded_data = encoder.GetBuffer();
+ size_t encoded_size = encoder.GetBufferSize();
+
+ // Create decoder
+ PaxDeltaDecoder<T> decoder(decoding_options_);
+
+ // Set source buffer
+ decoder.SetSrcBuffer(const_cast<char *>(encoded_data), encoded_size);
+
+ // Create result buffer
+ auto result_buffer =
+ std::make_shared<DataBuffer<char>>(input.size() * sizeof(T));
+ decoder.SetDataBuffer(result_buffer);
+
+ // Decode
+ size_t decoded_size = decoder.Decoding();
+
+ // Convert result back to vector
+ const T *decoded_data = reinterpret_cast<const T *>(decoder.GetBuffer());
+ size_t count = decoded_size / sizeof(T);
+
+ return std::vector<T>(decoded_data, decoded_data + count);
+ }
+
+ PaxEncoder::EncodingOption encoding_options_;
+ PaxDecoder::DecodingOption decoding_options_;
+};
+
+// Test basic functionality
+TEST_F(PaxDeltaEncodingTest, BasicEncodeDecode) {
+ std::vector<uint32_t> input = {1, 2, 3, 4, 5};
+ auto output = EncodeAndDecode(input);
+ EXPECT_EQ(input, output);
+}
+
+// Test example from documentation - consecutive sequence
+TEST_F(PaxDeltaEncodingTest, ConsecutiveSequence) {
+ std::vector<uint32_t> input = {1, 2, 3, 4, 5};
+ auto output = EncodeAndDecode(input);
+ EXPECT_EQ(input, output);
+
+ // Verify deltas would be [1, 1, 1, 1] with min_delta = 1
+ // and adjusted deltas [0, 0, 0, 0] with bit_width = 0
+}
+
+// Test example from documentation - sequence with variation
+TEST_F(PaxDeltaEncodingTest, SequenceWithVariation) {
+ std::vector<uint32_t> input = {7, 5, 3, 1, 2, 3, 4, 5};
+ auto output = EncodeAndDecode(input);
+ EXPECT_EQ(input, output);
+
+ // Verify deltas would be [-2, -2, -2, 1, 1, 1, 1] with min_delta = -2
+ // Since we cast to uint32, -2 becomes a large positive number
+ // adjusted deltas would be [0, 0, 0, 3, 3, 3, 3] with bit_width = 2
+}
+
+// Test single value
+TEST_F(PaxDeltaEncodingTest, SingleValue) {
+ std::vector<uint32_t> input = {42};
+ auto output = EncodeAndDecode(input);
+ EXPECT_EQ(input, output);
+}
+
+// Test two values
+TEST_F(PaxDeltaEncodingTest, TwoValues) {
+ std::vector<uint32_t> input = {10, 15};
+ auto output = EncodeAndDecode(input);
+ EXPECT_EQ(input, output);
+}
+
+// Test large values
+TEST_F(PaxDeltaEncodingTest, LargeValues) {
+ std::vector<uint32_t> input = {1000000, 1000001, 1000002, 1000003};
+ auto output = EncodeAndDecode(input);
+ EXPECT_EQ(input, output);
+}
+
+// Test values with large deltas
+TEST_F(PaxDeltaEncodingTest, LargeDeltas) {
+ std::vector<uint32_t> input = {1, 1000, 2000, 3000};
+ auto output = EncodeAndDecode(input);
+ EXPECT_EQ(input, output);
+}
+
+// Test full block (128 values)
+TEST_F(PaxDeltaEncodingTest, FullBlock) {
+ std::vector<uint32_t> input;
+ for (uint32_t i = 0; i < 128; ++i) {
+ input.push_back(i);
+ }
+ auto output = EncodeAndDecode(input);
+ EXPECT_EQ(input, output);
+}
+
+// Test multiple blocks
+TEST_F(PaxDeltaEncodingTest, MultipleBlocks) {
+ std::vector<uint32_t> input;
+ for (uint32_t i = 0; i < 250; ++i) {
+ input.push_back(i);
+ }
+ auto output = EncodeAndDecode(input);
+ EXPECT_EQ(input, output);
+}
+
+// Test random data
+TEST_F(PaxDeltaEncodingTest, RandomData) {
+ std::mt19937 gen(12345);
+ std::uniform_int_distribution<uint32_t> dis(0, 1000000);
+
+ std::vector<uint32_t> input;
+ for (int i = 0; i < 100; ++i) {
+ input.push_back(dis(gen));
+ }
+
+ auto output = EncodeAndDecode(input);
+ EXPECT_EQ(input, output);
+}
+
+// Test payload size calculation
+TEST_F(PaxDeltaEncodingTest, PayloadSizeCalculation) {
+ std::vector<uint32_t> input = {
+ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
+ 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 56, 63, 89};
+ // Test the specific example: deltas [0,0,0,0,0,0,0,0,...,0,22,6,25] with
+ // bit_width 0,5,0,0
+
+ PaxDeltaEncoder<uint32_t> encoder(encoding_options_);
+ size_t bound_size = encoder.GetBoundSize(input.size() * sizeof(uint32_t));
+ encoder.SetDataBuffer(std::make_shared<DataBuffer<char>>(bound_size));
+ encoder.Append(reinterpret_cast<char *>(input.data()),
+ input.size() * sizeof(uint32_t));
+
+ // Verify the encoded data structure manually
+ const char *encoded_data = encoder.GetBuffer();
+ size_t encoded_size = encoder.GetBufferSize();
+
+ EXPECT_GT(encoded_size, 0);
+
+ // Parse the encoded data
+ const uint8_t *p = reinterpret_cast<const uint8_t *>(encoded_data);
+
+ // Read header
+ DeltaBlockHeader header;
+ std::memcpy(&header, p, sizeof(header));
+ p += sizeof(header);
+
+ EXPECT_EQ(header.value_per_block, 128);
+ EXPECT_EQ(header.values_per_mini_block, 32);
+ EXPECT_EQ(header.total_count, input.size());
+
+ // Read first value
+ uint32_t first_value;
+ std::memcpy(&first_value, p, sizeof(first_value));
+ p += sizeof(first_value);
+ EXPECT_EQ(first_value, 1);
+
+ // Read block data
+ uint32_t min_delta;
+ std::memcpy(&min_delta, p, sizeof(min_delta));
+ p += sizeof(min_delta);
+
+ // Read allbit widths
+ uint8_t bit_widths[4];
+ for (int i = 0; i < 4; ++i) {
+ bit_widths[i] = *p++;
+ }
+
+ // bit_widths should be [0, 6, 0, 0]
+ ASSERT_EQ(bit_widths[0], 0);
+ ASSERT_EQ(bit_widths[1], 5);
+ ASSERT_EQ(bit_widths[2], 0);
+ ASSERT_EQ(bit_widths[3], 0);
+
+ // Compute payload size from bit_widths and counts
+ uint32_t values_in_block =
+ input.size() - 1; // we constructed input with 35 deltas in first block
+ uint64_t total_bits = 0;
+ for (uint32_t i = 0; i < 4; ++i) {
+ uint32_t start = i * 32;
+ if (start >= values_in_block) break;
+ uint32_t end = std::min(start + 32u, values_in_block);
+ uint8_t w = bit_widths[i];
+ total_bits += static_cast<uint64_t>(w) * (end - start);
+ }
+ uint32_t payload_size = static_cast<uint32_t>((total_bits + 7) / 8);
+
+ // For this example, we expect payload_size = 2 bytes
+ EXPECT_EQ(payload_size, 2);
+
+ // Assert payload bitmap is correct
+ uint8_t payload[4];
+ std::memcpy(payload, p, 4);
+ p += 4;
+
+ // payload should be LSB-Last, value is(22,6,25)
+ // [0b10110, 0b00110, 0b11001]
+ EXPECT_EQ(payload[0], 0b11010110);
+ EXPECT_EQ(payload[1], 0b01100100);
+}
+
+// Test bit width calculation helper
+TEST_F(PaxDeltaEncodingTest, BitWidthCalculation) {
+ EXPECT_EQ(FastNumBits(0), 0);
+ EXPECT_EQ(FastNumBits(1), 1);
+ EXPECT_EQ(FastNumBits(2), 2);
+ EXPECT_EQ(FastNumBits(3), 2);
+ EXPECT_EQ(FastNumBits(4), 3);
+ EXPECT_EQ(FastNumBits(7), 3);
+ EXPECT_EQ(FastNumBits(8), 4);
+ EXPECT_EQ(FastNumBits(15), 4);
+ EXPECT_EQ(FastNumBits(16), 5);
+ EXPECT_EQ(FastNumBits(255), 8);
+ EXPECT_EQ(FastNumBits(256), 9);
+}
+
+// Test zero deltas (all same values)
+TEST_F(PaxDeltaEncodingTest, ZeroDeltas) {
+ std::vector<uint32_t> input = {42, 42, 42, 42, 42};
+ auto output = EncodeAndDecode(input);
+ EXPECT_EQ(input, output);
+}
+
+// Test decreasing sequence (negative deltas)
+TEST_F(PaxDeltaEncodingTest, DecreasingSequence) {
+ std::vector<uint32_t> input = {100, 90, 80, 70, 60};
+ auto output = EncodeAndDecode(input);
+ EXPECT_EQ(input, output);
+}
+
+// Test mixed pattern
+TEST_F(PaxDeltaEncodingTest, MixedPattern) {
+ std::vector<uint32_t> input = {10, 20, 15, 25, 5, 30, 1, 35};
+ auto output = EncodeAndDecode(input);
+ EXPECT_EQ(input, output);
+}
+
+// Test empty input (edge case)
+TEST_F(PaxDeltaEncodingTest, EmptyInput) {
+ std::vector<uint32_t> input = {};
+ // This should handle gracefully or throw expected exception
+ // For now, let's skip this test until we clarify expected behavior
+}
+
+// Test different data types
+TEST_F(PaxDeltaEncodingTest, DifferentTypes) {
+ // Test int32_t (with non-negative values)
+ std::vector<uint32_t> input32 = {1, 2, 3, 4, 5};
+ auto output32 = EncodeAndDecode(input32);
+ EXPECT_EQ(input32, output32);
+}
+
+} // namespace pax
+
+int main(int argc, char **argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_dict_encoding.h
b/contrib/pax_storage/src/cpp/storage/columns/pax_dict_encoding.h
index e552fa7a55a..38f3ba217db 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_dict_encoding.h
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_dict_encoding.h
@@ -53,6 +53,10 @@ class PaxDictEncoder final : public PaxEncoder {
void Flush() override;
+ size_t GetBoundSize(size_t src_len) const override {
+ CBDB_RAISE(cbdb::CException::kExTypeUnImplements);
+ }
+
private:
size_t AppendInternal(char *data, size_t len);
@@ -89,7 +93,8 @@ class PaxDictDecoder final : public PaxDecoder {
PaxDecoder *SetSrcBuffer(char *data, size_t data_len) override;
- PaxDecoder *SetDataBuffer(std::shared_ptr<DataBuffer<char>> result_buffer)
override;
+ PaxDecoder *SetDataBuffer(
+ std::shared_ptr<DataBuffer<char>> result_buffer) override;
const char *GetBuffer() const override;
@@ -121,8 +126,8 @@ class PaxDictDecoder final : public PaxDecoder {
buffer = src_buff->GetBuffer();
- index_buffer =
- std::make_shared<DataBuffer<int32>>((int32 *)buffer, head.indexsz,
false, false);
+ index_buffer = std::make_shared<DataBuffer<int32>>(
+ (int32 *)buffer, head.indexsz, false, false);
index_buffer->BrushAll();
desc_buffer = std::make_shared<DataBuffer<int32>>(
@@ -130,8 +135,8 @@ class PaxDictDecoder final : public PaxDecoder {
false);
desc_buffer->BrushAll();
- entry_buffer = std::make_shared<DataBuffer<char>>(buffer + head.indexsz,
head.dictsz,
- false, false);
+ entry_buffer = std::make_shared<DataBuffer<char>>(
+ buffer + head.indexsz, head.dictsz, false, false);
entry_buffer->BrushAll();
return std::make_tuple(index_buffer, entry_buffer, desc_buffer);
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.cc
b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.cc
index 3a354ceec8d..b11b2b7b6bd 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.cc
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.cc
@@ -33,6 +33,7 @@
#include "comm/pax_memory.h"
#include "storage/columns/pax_dict_encoding.h"
#include "storage/columns/pax_rlev2_encoding.h"
+#include "storage/columns/pax_delta_encoding.h"
namespace pax {
@@ -56,8 +57,7 @@ std::shared_ptr<PaxEncoder>
PaxEncoder::CreateStreamingEncoder(
break;
}
case ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA: {
- // TODO(jiaqizho): support direct delta encoding
- // not support yet, then direct return a nullptr(means no encoding)
+ encoder = std::make_shared<PaxDeltaEncoder<uint32_t>>(encoder_options);
break;
}
case ColumnEncoding_Kind::ColumnEncoding_Kind_DEF_ENCODED: {
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.h
b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.h
index 362e68caa13..465c7bf0600 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.h
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.h
@@ -75,6 +75,8 @@ class PaxEncoder {
virtual size_t GetBufferSize() const;
+ virtual size_t GetBoundSize(size_t src_len) const = 0;
+
/**
* steaming encoder
*
diff --git
a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.cc
b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.cc
index 25b6d2f1d6d..90060050236 100644
---
a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.cc
+++
b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.cc
@@ -59,21 +59,37 @@ void PaxNonFixedEncodingColumn::InitEncoder() {
}
void PaxNonFixedEncodingColumn::InitOffsetStreamCompressor() {
- Assert(encoder_options_.offsets_encode_type !=
- ColumnEncoding_Kind::ColumnEncoding_Kind_DEF_ENCODED);
- offsets_compressor_ = PaxCompressor::CreateBlockCompressor(
- encoder_options_.offsets_encode_type);
+ Assert(encoder_options_.offsets_encode_type ==
+ ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA);
+
SetOffsetsEncodeType(encoder_options_.offsets_encode_type);
SetOffsetsCompressLevel(encoder_options_.offsets_compress_level);
+
+ PaxEncoder::EncodingOption opt = encoder_options_;
+ opt.column_encode_type =
+ ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+ opt.is_sign = false;
+ // offsets are fixed-width, do not enable non_fixed streaming restriction
+ offsets_encoder_ = PaxEncoder::CreateStreamingEncoder(opt, false);
}
void PaxNonFixedEncodingColumn::InitOffsetStreamDecompressor() {
Assert(decoder_options_.offsets_encode_type !=
ColumnEncoding_Kind::ColumnEncoding_Kind_DEF_ENCODED);
- offsets_compressor_ = PaxCompressor::CreateBlockCompressor(
- decoder_options_.offsets_encode_type);
SetOffsetsEncodeType(decoder_options_.offsets_encode_type);
SetOffsetsCompressLevel(decoder_options_.offsets_compress_level);
+
+ if (decoder_options_.offsets_encode_type ==
+ ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA) {
+ PaxDecoder::DecodingOption temp_opt = decoder_options_;
+ temp_opt.column_encode_type =
+ ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+ temp_opt.is_sign = false;
+ offsets_decoder_ = PaxDecoder::CreateDecoder<int32>(temp_opt);
+ } else {
+ offsets_compressor_ = PaxCompressor::CreateBlockCompressor(
+ decoder_options_.offsets_encode_type);
+ }
}
void PaxNonFixedEncodingColumn::InitDecoder() {
@@ -169,9 +185,13 @@ void
PaxNonFixedEncodingColumn::Set(std::shared_ptr<DataBuffer<char>> data,
auto offsets_decompress = [&]() {
Assert(!compress_route_);
- Assert(offsets_compressor_);
+ Assert(offsets_compressor_ || offsets_decoder_);
+
+ if (offsets->Used() == 0) {
+ return;
+ }
- if (offsets->Used() != 0) {
+ if (offsets_compressor_) {
auto d_size = offsets_compressor_->Decompress(
PaxNonFixedColumn::offsets_->Start(),
PaxNonFixedColumn::offsets_->Capacity(), offsets->Start(),
@@ -182,22 +202,36 @@ void
PaxNonFixedEncodingColumn::Set(std::shared_ptr<DataBuffer<char>> data,
fmt("Decompress failed, %s", compressor_->ErrorName(d_size)));
}
PaxNonFixedColumn::offsets_->Brush(d_size);
+ return;
+ }
+
+ if (offsets_decoder_) {
+ // Decode offsets using encoder for int32 stream
+ shared_offsets_data_ = std::make_shared<DataBuffer<char>>(
+ PaxNonFixedColumn::offsets_->Start(),
+ PaxNonFixedColumn::offsets_->Capacity(), false, false);
+ offsets_decoder_->SetDataBuffer(shared_offsets_data_);
+ offsets_decoder_->SetSrcBuffer(offsets->Start(), offsets->Used());
+ offsets_decoder_->Decoding();
+ PaxNonFixedColumn::offsets_->Brush(shared_offsets_data_->Used());
+ return;
}
};
exist_decoder = compressor_ || decoder_;
+ bool has_offsets_processor = offsets_compressor_ || offsets_decoder_;
- if (exist_decoder && offsets_compressor_) {
+ if (exist_decoder && has_offsets_processor) {
data_decompress();
offsets_decompress();
PaxNonFixedColumn::estimated_size_ = total_size;
PaxNonFixedColumn::next_offsets_ = -1;
- } else if (exist_decoder && !offsets_compressor_) {
+ } else if (exist_decoder && !has_offsets_processor) {
data_decompress();
PaxNonFixedColumn::offsets_ = offsets;
PaxNonFixedColumn::estimated_size_ = total_size;
PaxNonFixedColumn::next_offsets_ = -1;
- } else if (!exist_decoder && offsets_compressor_) {
+ } else if (!exist_decoder && has_offsets_processor) {
PaxNonFixedColumn::data_ = data;
offsets_decompress();
PaxNonFixedColumn::estimated_size_ = total_size;
@@ -278,17 +312,17 @@ std::pair<char *, size_t>
PaxNonFixedEncodingColumn::GetOffsetBuffer(
AppendLastOffset();
}
- if (offsets_compressor_ && compress_route_) {
- if (shared_offsets_data_) {
- return std::make_pair(shared_offsets_data_->Start(),
- shared_offsets_data_->Used());
- }
+ if (shared_offsets_data_) {
+ return std::make_pair(shared_offsets_data_->Start(),
+ shared_offsets_data_->Used());
+ }
- if (PaxNonFixedColumn::offsets_->Used() == 0) {
- // should never append last offset again
- return PaxNonFixedColumn::GetOffsetBuffer(false);
- }
+ if (PaxNonFixedColumn::offsets_->Used() == 0) {
+ // should never append last offset again
+ return PaxNonFixedColumn::GetOffsetBuffer(false);
+ }
+ if (offsets_compressor_ && compress_route_) {
size_t bound_size = offsets_compressor_->GetCompressBound(
PaxNonFixedColumn::offsets_->Used());
shared_offsets_data_ = std::make_shared<DataBuffer<char>>(bound_size);
@@ -308,6 +342,20 @@ std::pair<char *, size_t>
PaxNonFixedEncodingColumn::GetOffsetBuffer(
shared_offsets_data_->Used());
}
+ if (offsets_encoder_ && compress_route_) {
+ // For delta encoder, allocate a buffer sized by raw bytes for safety
+ size_t bound_size = offsets_encoder_->GetBoundSize(offsets_->Used());
+ shared_offsets_data_ = std::make_shared<DataBuffer<char>>(bound_size);
+ offsets_encoder_->SetDataBuffer(shared_offsets_data_);
+
+ // Encode entire offsets buffer as a single stream
+ offsets_encoder_->Append(offsets_->Start(), offsets_->Used());
+ offsets_encoder_->Flush();
+
+ return std::make_pair(shared_offsets_data_->Start(),
+ shared_offsets_data_->Used());
+ }
+
// no compress or uncompressed
// should never append last offset again
return PaxNonFixedColumn::GetOffsetBuffer(false);
diff --git
a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.h
b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.h
index b4e956cfe4a..06b60d02ac2 100644
---
a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.h
+++
b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.h
@@ -83,6 +83,9 @@ class PaxNonFixedEncodingColumn : public PaxNonFixedColumn {
std::shared_ptr<DataBuffer<char>> shared_data_;
std::shared_ptr<PaxCompressor> offsets_compressor_;
+ // Optional encoder/decoder for offsets stream (alternative to compression)
+ std::shared_ptr<PaxEncoder> offsets_encoder_;
+ std::shared_ptr<PaxDecoder> offsets_decoder_;
std::shared_ptr<DataBuffer<char>> shared_offsets_data_;
};
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_test.cc
b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_test.cc
index 5fa7fb7153c..b3a7ec59458 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_test.cc
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_test.cc
@@ -1361,4 +1361,96 @@ TEST_F(PaxEncodingTest, TestEncodingWithAllNULL) {
ASSERT_EQ(n_read, shared_dst_data->Used());
}
+TEST_F(PaxEncodingTest, TestPaxDeltaEncodingBasic) {
+ std::vector<uint32_t> data_vec{100, 101, 102, 105, 106, 110, 120, 121};
+ auto shared_data = std::make_shared<DataBuffer<char>>(1024);
+ auto shared_dst_data = std::make_shared<DataBuffer<char>>(1024);
+
+ PaxEncoder::EncodingOption encoder_options;
+ encoder_options.column_encode_type =
+ ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+ encoder_options.is_sign = false;
+ auto encoder = PaxEncoder::CreateStreamingEncoder(encoder_options);
+
+ ASSERT_TRUE(encoder);
+ encoder->SetDataBuffer(shared_data);
+ encoder->Append(reinterpret_cast<char *>(data_vec.data()), data_vec.size() *
sizeof(uint32_t));
+ encoder->Flush();
+
+ ASSERT_NE(encoder->GetBuffer(), nullptr);
+ ASSERT_GT(encoder->GetBufferSize(), 0UL);
+
+ PaxDecoder::DecodingOption decoder_options;
+ decoder_options.column_encode_type =
+ ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+ decoder_options.is_sign = false;
+
+ auto decoder = PaxDecoder::CreateDecoder<int32>(decoder_options);
+ ASSERT_TRUE(decoder);
+ decoder->SetSrcBuffer(shared_data->GetBuffer(), shared_data->Used());
+
+ decoder->SetDataBuffer(shared_dst_data);
+ decoder->Decoding();
+
+ ASSERT_EQ(shared_dst_data->Used(), data_vec.size() * sizeof(int32));
+
+ auto result_dst_data = std::make_shared<DataBuffer<int32>>(
+ reinterpret_cast<int32 *>(shared_dst_data->Start()),
+ shared_dst_data->Used(), false, false);
+
+ for (size_t i = 0; i < data_vec.size(); ++i) {
+ ASSERT_EQ((*result_dst_data)[i], static_cast<int32>(data_vec[i]));
+ }
+}
+
+TEST_F(PaxEncodingTest, TestPaxDeltaEncodingRoundTripRandom) {
+ const size_t n = 1000;
+ std::vector<uint32_t> data_vec(n);
+ std::mt19937 rng(12345);
+ std::uniform_int_distribution<uint32_t> base_dist(0, 100);
+ std::uniform_int_distribution<uint32_t> step_dist(0, 5);
+
+ data_vec[0] = base_dist(rng);
+ for (size_t i = 1; i < n; ++i) {
+ data_vec[i] = data_vec[i - 1] + step_dist(rng);
+ }
+
+ auto shared_data = std::make_shared<DataBuffer<char>>(n * sizeof(uint32_t));
+ auto shared_dst_data = std::make_shared<DataBuffer<char>>(n *
sizeof(uint32_t));
+
+ PaxEncoder::EncodingOption encoder_options;
+ encoder_options.column_encode_type =
+ ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+ encoder_options.is_sign = false;
+ auto encoder = PaxEncoder::CreateStreamingEncoder(encoder_options);
+
+ ASSERT_TRUE(encoder);
+ encoder->SetDataBuffer(shared_data);
+
+ encoder->Append(reinterpret_cast<char *>(data_vec.data()), data_vec.size() *
sizeof(uint32_t));
+ encoder->Flush();
+
+ PaxDecoder::DecodingOption decoder_options;
+ decoder_options.column_encode_type =
+ ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA;
+ decoder_options.is_sign = false;
+
+ auto decoder = PaxDecoder::CreateDecoder<int32>(decoder_options);
+ ASSERT_TRUE(decoder);
+ decoder->SetSrcBuffer(shared_data->GetBuffer(), shared_data->Used());
+
+ decoder->SetDataBuffer(shared_dst_data);
+ decoder->Decoding();
+
+ ASSERT_EQ(shared_dst_data->Used(), data_vec.size() * sizeof(int32));
+
+ auto result_dst_data = std::make_shared<DataBuffer<int32>>(
+ reinterpret_cast<int32 *>(shared_dst_data->Start()),
+ shared_dst_data->Used(), false, false);
+
+ for (size_t i = 0; i < data_vec.size(); ++i) {
+ ASSERT_EQ((*result_dst_data)[i], static_cast<int32>(data_vec[i]));
+ }
+}
+
} // namespace pax::tests
diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_rlev2_encoding.h
b/contrib/pax_storage/src/cpp/storage/columns/pax_rlev2_encoding.h
index 7d021a1f1cf..f2197258b69 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_rlev2_encoding.h
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_rlev2_encoding.h
@@ -49,6 +49,10 @@ class PaxOrcEncoder final : public PaxEncoder {
void Flush() override;
+ size_t GetBoundSize(size_t src_len) const override {
+ CBDB_RAISE(cbdb::CException::kExTypeUnImplements);
+ }
+
private:
struct EncoderContext {
bool is_sign;
diff --git
a/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.cc
b/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.cc
index aaf514f5926..8f3aafae2c4 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.cc
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.cc
@@ -348,7 +348,7 @@ void PaxVecNonFixedEncodingColumn::Set(
PaxVecNonFixedColumn::estimated_size_ = total_size;
PaxVecNonFixedColumn::next_offsets_ = -1;
} else { // (!compressor_ && !offsets_compressor_)
- PaxVecNonFixedColumn::Set(data, offsets_, total_size, non_null_rows);
+ PaxVecNonFixedColumn::Set(data, offsets, total_size, non_null_rows);
}
}
diff --git
a/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.h
b/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.h
index 4362312a5a9..524ddca261a 100644
--- a/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.h
+++ b/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.h
@@ -112,6 +112,9 @@ class PaxVecNonFixedEncodingColumn : public
PaxVecNonFixedColumn {
std::shared_ptr<DataBuffer<char>> shared_data_;
std::shared_ptr<PaxCompressor> offsets_compressor_;
+ // Optional encoder/decoder for offsets stream (alternative to compression)
+ std::shared_ptr<PaxEncoder> offsets_encoder_;
+ std::shared_ptr<PaxDecoder> offsets_decoder_;
std::shared_ptr<DataBuffer<char>> shared_offsets_data_;
};
diff --git a/contrib/pax_storage/src/cpp/storage/micro_partition.h
b/contrib/pax_storage/src/cpp/storage/micro_partition.h
index 77d61462ad4..56d85b46a74 100644
--- a/contrib/pax_storage/src/cpp/storage/micro_partition.h
+++ b/contrib/pax_storage/src/cpp/storage/micro_partition.h
@@ -58,7 +58,6 @@ class MicroPartitionWriter {
RelFileNode node;
bool need_wal = false;
std::vector<std::tuple<ColumnEncoding_Kind, int>> encoding_opts;
- std::pair<ColumnEncoding_Kind, int> offsets_encoding_opts;
std::vector<int> enable_min_max_col_idxs;
std::vector<int> enable_bf_col_idxs;
diff --git a/contrib/pax_storage/src/cpp/storage/orc/orc_writer.cc
b/contrib/pax_storage/src/cpp/storage/orc/orc_writer.cc
index 63413a2239d..6c8d49502e5 100644
--- a/contrib/pax_storage/src/cpp/storage/orc/orc_writer.cc
+++ b/contrib/pax_storage/src/cpp/storage/orc/orc_writer.cc
@@ -104,7 +104,6 @@ static std::unique_ptr<PaxColumns> BuildColumns(
const std::vector<pax::porc::proto::Type_Kind> &types, const TupleDesc
desc,
const std::vector<std::tuple<ColumnEncoding_Kind, int>>
&column_encoding_types,
- const std::pair<ColumnEncoding_Kind, int> &offsets_encoding_types,
const PaxStorageFormat &storage_format) {
std::unique_ptr<PaxColumns> columns;
bool is_vec;
@@ -125,14 +124,7 @@ static std::unique_ptr<PaxColumns> BuildColumns(
encoding_option.is_sign = true;
encoding_option.compress_level = std::get<1>(column_encoding_types[i]);
- if (offsets_encoding_types.first == ColumnEncoding_Kind_DEF_ENCODED) {
- // default value of offsets_stream is zstd
- encoding_option.offsets_encode_type = ColumnEncoding_Kind_COMPRESS_ZSTD;
- encoding_option.offsets_compress_level = 5;
- } else {
- encoding_option.offsets_encode_type = offsets_encoding_types.first;
- encoding_option.offsets_compress_level = offsets_encoding_types.second;
- }
+ encoding_option.offsets_encode_type = ColumnEncoding_Kind_DIRECT_DELTA;
switch (type) {
case (pax::porc::proto::Type_Kind::Type_Kind_STRING): {
@@ -241,10 +233,9 @@ OrcWriter::OrcWriter(
Assert(writer_options.rel_tuple_desc->natts ==
static_cast<int>(column_types.size()));
- pax_columns_ = BuildColumns(column_types_, writer_options.rel_tuple_desc,
- writer_options.encoding_opts,
- writer_options.offsets_encoding_opts,
- writer_options.storage_format);
+ pax_columns_ =
+ BuildColumns(column_types_, writer_options.rel_tuple_desc,
+ writer_options.encoding_opts,
writer_options.storage_format);
summary_.rel_oid = writer_options.rel_oid;
summary_.block_id = writer_options.block_id;
@@ -300,7 +291,6 @@ void OrcWriter::Flush() {
new_columns = BuildColumns(column_types_, writer_options_.rel_tuple_desc,
writer_options_.encoding_opts,
- writer_options_.offsets_encoding_opts,
writer_options_.storage_format);
for (size_t i = 0; i < column_types_.size(); ++i) {
diff --git a/contrib/pax_storage/src/cpp/storage/pax.cc
b/contrib/pax_storage/src/cpp/storage/pax.cc
index ab10387c76c..c8d29bbb6ce 100644
--- a/contrib/pax_storage/src/cpp/storage/pax.cc
+++ b/contrib/pax_storage/src/cpp/storage/pax.cc
@@ -200,8 +200,6 @@ std::unique_ptr<MicroPartitionWriter>
TableWriter::CreateMicroPartitionWriter(
options.file_name = std::move(file_path);
options.encoding_opts = GetRelEncodingOptions();
options.storage_format = GetStorageFormat();
- options.offsets_encoding_opts = std::make_pair(
- PAX_OFFSETS_DEFAULT_COMPRESSTYPE, PAX_OFFSETS_DEFAULT_COMPRESSLEVEL);
options.enable_min_max_col_idxs = GetMinMaxColumnIndexes();
options.enable_bf_col_idxs = GetBloomFilterColumnIndexes();
@@ -261,8 +259,8 @@ void TableWriter::InitOptionsCaches() {
}
void TableWriter::Open() {
- rel_path_ = cbdb::BuildPaxDirectoryPath(
- relation_->rd_node, relation_->rd_backend);
+ rel_path_ =
+ cbdb::BuildPaxDirectoryPath(relation_->rd_node, relation_->rd_backend);
InitOptionsCaches();
@@ -509,8 +507,8 @@ void TableReader::OpenFile() {
if (it.GetExistToast()) {
// must exist the file in disk
- toast_file = file_system_->Open(it.GetFileName() + TOAST_FILE_SUFFIX,
- fs::kReadMode);
+ toast_file =
+ file_system_->Open(it.GetFileName() + TOAST_FILE_SUFFIX,
fs::kReadMode);
}
reader_ = MicroPartitionFileFactory::CreateMicroPartitionReader(
@@ -588,8 +586,7 @@ void TableDeleter::DeleteWithVisibilityMap(
std::unique_ptr<Bitmap8> visi_bitmap;
auto catalog_update = pax::PaxCatalogUpdater::Begin(rel_);
- auto rel_path = cbdb::BuildPaxDirectoryPath(
- rel_->rd_node, rel_->rd_backend);
+ auto rel_path = cbdb::BuildPaxDirectoryPath(rel_->rd_node, rel_->rd_backend);
min_max_col_idxs = cbdb::GetMinMaxColumnIndexes(rel_);
stats_updater_projection->SetColumnProjection(min_max_col_idxs,
@@ -662,11 +659,10 @@ void TableDeleter::DeleteWithVisibilityMap(
// TODO: update stats and visimap all in one catalog update
// Update the stats in pax aux table
// Notice that: PAX won't update the stats in group
- UpdateStatsInAuxTable(catalog_update, micro_partition_metadata,
- std::make_shared<Bitmap8>(visi_bitmap->Raw()),
- min_max_col_idxs,
- cbdb::GetBloomFilterColumnIndexes(rel_),
- stats_updater_projection);
+ UpdateStatsInAuxTable(
+ catalog_update, micro_partition_metadata,
+ std::make_shared<Bitmap8>(visi_bitmap->Raw()), min_max_col_idxs,
+ cbdb::GetBloomFilterColumnIndexes(rel_), stats_updater_projection);
// write pg_pax_blocks_oid
catalog_update.UpdateVisimap(block_id, visimap_file_name);
diff --git a/contrib/pax_storage/src/cpp/storage/pax_defined.h
b/contrib/pax_storage/src/cpp/storage/pax_defined.h
index b4ce1115af8..5315797ea3a 100644
--- a/contrib/pax_storage/src/cpp/storage/pax_defined.h
+++ b/contrib/pax_storage/src/cpp/storage/pax_defined.h
@@ -39,7 +39,7 @@ namespace pax {
#define BITS_TO_BYTES(bits) (((bits) + 7) / 8)
#define PAX_OFFSETS_DEFAULT_COMPRESSTYPE \
- ColumnEncoding_Kind::ColumnEncoding_Kind_COMPRESS_ZSTD
+ ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA
#define PAX_OFFSETS_DEFAULT_COMPRESSLEVEL 5
#define COLUMN_STORAGE_FORMAT_IS_VEC(column) \
diff --git a/contrib/pax_storage/src/test/regress/expected/gp_toolkit.out
b/contrib/pax_storage/src/test/regress/expected/gp_toolkit.out
index 336354081af..745db42283a 100644
--- a/contrib/pax_storage/src/test/regress/expected/gp_toolkit.out
+++ b/contrib/pax_storage/src/test/regress/expected/gp_toolkit.out
@@ -304,7 +304,7 @@ update pg_statistic set stawidth=2034567890 where starelid
= 'wide_width_test'::
select btdrelpages, btdexppages from gp_toolkit.gp_bloat_expected_pages where
btdrelid='wide_width_test'::regclass;
btdrelpages | btdexppages
-------------+-------------
- 4 | 3104504228
+ 1 | 3104504228
(1 row)
select * from gp_toolkit.gp_bloat_diag WHERE bdinspname <> 'pg_catalog';
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]