This is an automated email from the ASF dual-hosted git repository.

sollhui pushed a commit to branch 
fix-apache-master-configurable-block-size-lz4-log
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 1ef6f11f0ee51338970c5e282504ab2cbb501ac5
Author: laihui <[email protected]>
AuthorDate: Thu Jun 25 11:16:16 2026 +0800

    [improvement](be) Add block size config for load debugging
    
    ### What problem does this PR solve?
    
    Issue Number: None
    
    Related PR: None
    
    Problem Summary: The preferred block byte budget was capped by a hard-coded 
512MB value in BE, which made it inconvenient to reproduce and diagnose 
oversized block behavior from configuration. This change adds a BE config for 
the maximum preferred block byte budget and uses it in 
RuntimeState::preferred_block_size_bytes(). It also logs oversized string 
serialization and LZ4 compression/decompression failures in DataTypeString so 
large-block reproductions can quickly identify whether t [...]
    
    ### Release note
    
    Add BE-side diagnostics and a configurable preferred block byte cap for 
large-block debugging.
    
    ### Check List (For Author)
    
    - Test: Manual test
        - git diff --check
        - python3 build-support/run_clang_format.py --clang-format-executable 
/opt/homebrew/bin/clang-format --style file --inplace false 
be/src/common/config.cpp be/src/common/config.h be/src/runtime/runtime_state.h 
be/src/core/data_type/data_type_string.cpp
    - Behavior changed: Yes. The maximum preferred block byte budget can now be 
adjusted by BE config, default remains 512MB.
    - Does this need documentation: No
---
 be/src/common/config.cpp                   |  3 +++
 be/src/common/config.h                     |  3 +++
 be/src/core/data_type/data_type_string.cpp | 39 ++++++++++++++++++++++++++----
 be/src/runtime/runtime_state.h             | 22 +++++++++--------
 4 files changed, 52 insertions(+), 15 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 86b7d12e0aa..add465f3b90 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -436,6 +436,9 @@ DEFINE_mBool(enable_low_cardinality_optimize, "true");
 DEFINE_Bool(enable_low_cardinality_cache_code, "true");
 
 DEFINE_mBool(enable_adaptive_batch_size, "true");
+// Maximum byte budget returned by RuntimeState::preferred_block_size_bytes.
+// Default is 512MB. Increase only for debugging large-block behavior.
+DEFINE_mInt64(max_preferred_block_size_bytes, "536870912");
 
 // be policy
 // whether check compaction checksum
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0b415ed5d2c..1e3de57c762 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -505,6 +505,9 @@ DECLARE_Bool(enable_low_cardinality_cache_code);
 // so that each output block stays close to preferred_block_size_bytes.
 // When false, the fixed batch_size row behaviour is preserved.
 DECLARE_mBool(enable_adaptive_batch_size);
+// Maximum byte budget returned by RuntimeState::preferred_block_size_bytes.
+// Default is 512MB. Increase only for debugging large-block behavior.
+DECLARE_mInt64(max_preferred_block_size_bytes);
 
 // be policy
 // whether check compaction checksum
diff --git a/be/src/core/data_type/data_type_string.cpp 
b/be/src/core/data_type/data_type_string.cpp
index 2fa6d3e99d8..a6756d91f49 100644
--- a/be/src/core/data_type/data_type_string.cpp
+++ b/be/src/core/data_type/data_type_string.cpp
@@ -20,6 +20,7 @@
 
 #include "core/data_type/data_type_string.h"
 
+#include <glog/logging.h>
 #include <lz4/lz4.h>
 #include <streamvbyte.h>
 
@@ -29,6 +30,7 @@
 
 #include "agent/be_exec_version_manager.h"
 #include "common/cast_set.h"
+#include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/exception.h"
 #include "common/status.h"
 #include "core/assert_cast.h"
@@ -87,6 +89,10 @@ int64_t 
DataTypeString::get_uncompressed_serialized_bytes(const IColumn& column,
         size += bytes;
     } else {
         if (bytes > LZ4_MAX_INPUT_SIZE) {
+            LOG(WARNING) << "DataTypeString serialized byte size exceeds LZ4 
max input size, "
+                         << "bytes=" << bytes << ", LZ4_MAX_INPUT_SIZE=" << 
LZ4_MAX_INPUT_SIZE
+                         << ", rows=" << data_column.size()
+                         << ", real_need_copy_num=" << real_need_copy_num;
             throw Exception(ErrorCode::BUFFER_OVERFLOW,
                             "LZ4_compressBound meet invalid input size, 
input_size={}, "
                             "LZ4_MAX_INPUT_SIZE={}",
@@ -127,9 +133,25 @@ char* DataTypeString::serialize(const IColumn& column, 
char* buf, int be_exec_ve
         memcpy(buf, string_column.get_chars().data(), value_len);
         buf += value_len;
     } else {
-        auto encode_size = 
LZ4_compress_fast(string_column.get_chars().raw_data(),
-                                             (buf + sizeof(size_t)), 
cast_set<Int32>(value_len),
-                                             
LZ4_compressBound(cast_set<Int32>(value_len)), 1);
+        if (UNLIKELY(value_len > LZ4_MAX_INPUT_SIZE)) {
+            LOG(WARNING) << "DataTypeString serialize value length exceeds LZ4 
max input size, "
+                         << "value_len=" << value_len
+                         << ", LZ4_MAX_INPUT_SIZE=" << LZ4_MAX_INPUT_SIZE
+                         << ", rows=" << string_column.size()
+                         << ", chars_size=" << 
string_column.get_chars().size();
+        }
+        const auto lz4_value_len = cast_set<Int32>(value_len);
+        const auto lz4_compress_bound = LZ4_compressBound(lz4_value_len);
+        auto encode_size =
+                LZ4_compress_fast(string_column.get_chars().raw_data(), (buf + 
sizeof(size_t)),
+                                  lz4_value_len, lz4_compress_bound, 1);
+        if (UNLIKELY(encode_size <= 0)) {
+            LOG(WARNING) << "DataTypeString LZ4_compress_fast failed, 
value_len=" << value_len
+                         << ", lz4_value_len=" << lz4_value_len
+                         << ", lz4_compress_bound=" << lz4_compress_bound
+                         << ", rows=" << string_column.size()
+                         << ", chars_size=" << 
string_column.get_chars().size();
+        }
         unaligned_store<size_t>(buf, encode_size);
         buf += (sizeof(size_t) + encode_size);
     }
@@ -172,8 +194,15 @@ const char* DataTypeString::deserialize(const char* buf, 
MutableColumnPtr* colum
     } else {
         size_t encode_size = unaligned_load<size_t>(buf);
         buf += sizeof(size_t);
-        LZ4_decompress_safe(buf, reinterpret_cast<char*>(data.data()), 
cast_set<Int32>(encode_size),
-                            cast_set<Int32>(value_len));
+        const auto lz4_encode_size = cast_set<Int32>(encode_size);
+        const auto lz4_value_len = cast_set<Int32>(value_len);
+        const auto decoded_size = LZ4_decompress_safe(buf, 
reinterpret_cast<char*>(data.data()),
+                                                      lz4_encode_size, 
lz4_value_len);
+        if (UNLIKELY(decoded_size < 0 || decoded_size != lz4_value_len)) {
+            LOG(WARNING) << "DataTypeString LZ4_decompress_safe failed, 
encode_size=" << encode_size
+                         << ", value_len=" << value_len << ", decoded_size=" 
<< decoded_size
+                         << ", rows=" << real_have_saved_num;
+        }
         buf += encode_size;
     }
     return buf;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index a01a57d3f7c..07bc8fbc4e7 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -148,23 +148,25 @@ public:
     }
 
     // Target byte budget per output block (default 8MB when adaptive is 
enabled).
-    // The public FE/session contract is [1MB, 512MB]; this accessor still 
clamps any direct
-    // thrift or mixed-version out-of-range value into that range. Returns 
`kMax` when adaptive
-    // is disabled by BE config so the value is always a legal byte budget; 
callers that need
-    // to know whether adaptive batch size is active should test
-    // `config::enable_adaptive_batch_size` explicitly.
+    // The public FE/session contract is [1MB, 
max_preferred_block_size_bytes]; this accessor still
+    // clamps any direct thrift or mixed-version out-of-range value into that 
range. Returns
+    // max_preferred_block_size_bytes when adaptive is disabled by BE config 
so the value is always
+    // a legal byte budget; callers that need to know whether adaptive batch 
size is active should
+    // test `config::enable_adaptive_batch_size` explicitly.
     MOCK_FUNCTION size_t preferred_block_size_bytes() const {
         static constexpr int64_t kDefault = 8388608L; // 8MB
-        static constexpr int64_t kMax = 536870912L;   // 512MB
         static constexpr int64_t kMin = 1048576L;     // 1MB
+        const int64_t max_preferred_block_size_bytes =
+                std::max<int64_t>(kMin, 
config::max_preferred_block_size_bytes);
         if (!config::enable_adaptive_batch_size) [[unlikely]] {
-            return kMax;
+            return max_preferred_block_size_bytes;
         }
         if (_query_options.__isset.preferred_block_size_bytes) [[likely]] {
-            return std::max<int64_t>(
-                    kMin, 
std::min<int64_t>(_query_options.preferred_block_size_bytes, kMax));
+            return std::max<int64_t>(kMin,
+                                     
std::min<int64_t>(_query_options.preferred_block_size_bytes,
+                                                       
max_preferred_block_size_bytes));
         }
-        return kDefault;
+        return std::min<int64_t>(kDefault, max_preferred_block_size_bytes);
     }
 
     int query_parallel_instance_num() const { return 
_query_options.parallel_instance; }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to