This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 52c6c8250b6 branch-4.0: [fix](be fe) pick #62263 #61647 (#63816)
52c6c8250b6 is described below

commit 52c6c8250b6212af918dbb7d60901e7812dd18d1
Author: Jimmy <[email protected]>
AuthorDate: Tue Jun 9 16:31:48 2026 +0800

    branch-4.0: [fix](be fe) pick #62263 #61647 (#63816)
    
    pick https://github.com/apache/doris/pull/62263
    pick https://github.com/apache/doris/pull/61647
    
    ---------
    
    Co-authored-by: Siyang Tang <[email protected]>
---
 be/src/olap/merger.cpp                             | 142 ++++++++++-
 be/src/vec/olap/vertical_merge_iterator.h          |   2 +-
 be/test/vec/olap/vertical_compaction_test.cpp      | 259 +++++++++++++++++++++
 .../main/java/org/apache/doris/load/DeleteJob.java |  18 +-
 .../main/java/org/apache/doris/task/PushTask.java  |   6 +
 .../java/org/apache/doris/load/DeleteJobTest.java  | 152 ++++++++++++
 .../doris/master/MasterImplDeleteTaskTest.java     | 152 ++++++++++++
 7 files changed, 713 insertions(+), 18 deletions(-)

diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 8316ae94646..279b1fe51a5 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -30,6 +30,7 @@
 #include <ostream>
 #include <shared_mutex>
 #include <string>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
@@ -42,15 +43,18 @@
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
 #include "olap/rowid_conversion.h"
+#include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_meta.h"
 #include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/segment_v2/segment.h"
 #include "olap/rowset/segment_v2/segment_writer.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
 #include "olap/tablet_fwd.h"
 #include "olap/tablet_meta.h"
 #include "olap/tablet_reader.h"
+#include "olap/types.h"
 #include "olap/utils.h"
 #include "util/slice.h"
 #include "vec/core/block.h"
@@ -413,7 +417,8 @@ Status Merger::vertical_compact_one_group(
 }
 
 int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t 
way_cnt,
-                            ReaderType reader_type) {
+                            ReaderType reader_type, int64_t 
group_per_row_from_footer,
+                            bool footer_fallback) {
     auto& sample_info_lock = tablet->get_sample_info_lock(reader_type);
     auto& sample_infos = tablet->get_sample_infos(reader_type);
     std::unique_lock<std::mutex> lock(sample_info_lock);
@@ -441,9 +446,21 @@ int64_t estimate_batch_size(int group_index, 
BaseTabletSPtr tablet, int64_t way_
         group_data_size = info.bytes / info.rows;
         sample_infos[group_index].group_data_size = group_data_size;
     } else {
+        // No historical sampling data available.
+        // Try to use raw_data_bytes from segment footer for a better estimate.
+        if (!footer_fallback && group_per_row_from_footer > 0) {
+            int64_t batch_size = block_mem_limit / group_per_row_from_footer;
+            int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), 
int64_t(32L));
+            LOG(INFO) << "estimate batch size from footer for vertical 
compaction, tablet id: "
+                      << tablet->tablet_id()
+                      << " group_per_row_from_footer: " << 
group_per_row_from_footer
+                      << " way cnt: " << way_cnt << " batch size: " << res;
+            return res;
+        }
         LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
                   << tablet->tablet_id() << " group data size: " << 
info.group_data_size
-                  << " row num: " << info.rows << " consume bytes: " << 
info.bytes;
+                  << " row num: " << info.rows << " consume bytes: " << 
info.bytes
+                  << " footer_fallback: " << footer_fallback;
         return 1024 - 32;
     }
 
@@ -523,13 +540,132 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr 
tablet, ReaderType reader_t
         std::unique_lock<std::mutex> lock(sample_info_lock);
         sample_infos.resize(column_groups.size());
     }
+    // Collect per-column raw_data_bytes from segment footer for first-time 
batch size estimation.
+    // raw_data_bytes is the original data size before encoding, close to 
runtime Block::bytes().
+    // Only collect when needed: skip if manual batch_size override is set, or 
if ALL groups
+    // already have historical sampling data. Use per-group granularity so 
that schema evolution
+    // (new groups without history) still gets footer-based estimation.
+    struct ColumnRawSizeInfo {
+        int64_t total_raw_bytes = 0;
+        int64_t rows_with_data = 0;
+    };
+    std::unordered_map<int32_t, ColumnRawSizeInfo> column_raw_sizes;
+    bool need_footer_collection = false;
+    if (config::compaction_batch_size == -1) {
+        std::unique_lock<std::mutex> lock(sample_info_lock);
+        for (const auto& info : sample_infos) {
+            if (info.group_data_size <= 0 && info.bytes <= 0 && info.rows <= 
0) {
+                need_footer_collection = true;
+                break;
+            }
+        }
+    }
+    if (need_footer_collection) {
+        for (const auto& rs_reader : src_rowset_readers) {
+            auto beta_rowset = 
std::dynamic_pointer_cast<BetaRowset>(rs_reader->rowset());
+            if (!beta_rowset) {
+                continue;
+            }
+            std::vector<segment_v2::SegmentSharedPtr> segments;
+            auto st = beta_rowset->load_segments(&segments);
+            if (!st.ok()) {
+                LOG(WARNING) << "Failed to load segments for footer 
raw_data_bytes collection"
+                             << ", tablet_id: " << tablet->tablet_id()
+                             << ", rowset_id: " << beta_rowset->rowset_id() << 
", status: " << st;
+                continue;
+            }
+            for (const auto& segment : segments) {
+                int64_t row_count = segment->num_rows();
+                auto collect_st = segment->traverse_column_meta_pbs(
+                        [&](const segment_v2::ColumnMetaPB& meta) {
+                            int32_t uid = meta.unique_id();
+                            if (uid >= 0 && meta.has_raw_data_bytes()) {
+                                auto& info = column_raw_sizes[uid];
+                                info.total_raw_bytes += meta.raw_data_bytes();
+                                info.rows_with_data += row_count;
+                            }
+                        });
+                if (!collect_st.ok()) {
+                    LOG(WARNING) << "Failed to traverse column meta for footer 
collection"
+                                 << ", tablet_id: " << tablet->tablet_id()
+                                 << ", status: " << collect_st;
+                }
+            }
+        }
+    }
+
+    // Pre-compute per-row estimate for each column group from footer data.
+    std::vector<int64_t> group_per_row_from_footer(column_groups.size(), 0);
+    std::vector<bool> group_footer_fallback(column_groups.size(), false);
+    for (size_t i = 0; i < column_groups.size(); ++i) {
+        int64_t group_per_row = 0;
+        bool need_fallback = false;
+        for (uint32_t col_ordinal : column_groups[i]) {
+            const auto& col = tablet_schema.column(col_ordinal);
+            int32_t uid = col.unique_id();
+
+            // Variant columns (root or subcolumn): raw_data_bytes is 0 (TODO 
in writer),
+            // cannot estimate from footer, fallback to default for the entire 
group.
+            if (uid < 0 || col.is_variant_type()) {
+                need_fallback = true;
+                break;
+            }
+
+            // Any column without footer data (e.g. legacy segments written 
before
+            // raw_data_bytes existed) makes the group sample partial and 
unreliable.
+            // Fall back to the default for the whole group instead of summing 
only
+            // the columns we measured.
+            auto it = column_raw_sizes.find(uid);
+            if (it == column_raw_sizes.end() || it->second.rows_with_data <= 
0) {
+                need_fallback = true;
+                break;
+            }
+
+            int64_t raw_per_row = it->second.total_raw_bytes / 
it->second.rows_with_data;
+            int64_t col_per_row = 0;
+
+            if (col.type() == FieldType::OLAP_FIELD_TYPE_ARRAY ||
+                col.type() == FieldType::OLAP_FIELD_TYPE_MAP ||
+                col.type() == FieldType::OLAP_FIELD_TYPE_STRUCT) {
+                // Complex types: raw_data_bytes recursively aggregates 
sub-writers.
+                col_per_row = raw_per_row;
+            } else if (col.is_length_variable_type()) {
+                // Variable-length scalar (VARCHAR/STRING/HLL/BITMAP/...): 
raw_per_row
+                // is the average char payload across all rows; reader still 
pays an
+                // 8-byte offset entry per row regardless of null-ness.
+                col_per_row = raw_per_row + 8;
+                if (col.is_nullable()) {
+                    col_per_row += 1; // null map
+                }
+            } else {
+                // Fixed-width scalar (INT/BIGINT/DOUBLE/DATE/...).
+                // raw_data_bytes only counts non-null payload (append_nulls() 
does
+                // not advance the page builder), but 
FileColumnIterator::next_batch
+                // still calls ColumnNullable::insert_many_defaults() for null 
runs,
+                // which grows the nested PODArray by N * type_size. So the 
runtime
+                // per-row footprint is at least type_size, no matter how 
sparse.
+                int64_t type_size = get_type_info(&col)->size();
+                col_per_row = std::max(raw_per_row, type_size);
+                if (col.is_nullable()) {
+                    col_per_row += 1; // null map
+                }
+            }
+
+            group_per_row += col_per_row;
+        }
+        group_per_row_from_footer[i] = group_per_row;
+        group_footer_fallback[i] = need_fallback;
+    }
+
     // compact group one by one
     for (auto i = 0; i < column_groups.size(); ++i) {
         VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
         bool is_key = (i == 0);
         int64_t batch_size = config::compaction_batch_size != -1
                                      ? config::compaction_batch_size
-                                     : estimate_batch_size(i, tablet, 
merge_way_num, reader_type);
+                                     : estimate_batch_size(i, tablet, 
merge_way_num, reader_type,
+                                                           
group_per_row_from_footer[i],
+                                                           
group_footer_fallback[i]);
         CompactionSampleInfo sample_info;
         Merger::Statistics group_stats;
         group_stats.rowid_conversion = total_stats.rowid_conversion;
diff --git a/be/src/vec/olap/vertical_merge_iterator.h 
b/be/src/vec/olap/vertical_merge_iterator.h
index 45865d8c7c1..d883af930d8 100644
--- a/be/src/vec/olap/vertical_merge_iterator.h
+++ b/be/src/vec/olap/vertical_merge_iterator.h
@@ -210,7 +210,7 @@ public:
 
     size_t bytes() {
         if (_block) {
-            return _block->bytes();
+            return _block->allocated_bytes();
         } else {
             return 0;
         }
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp 
b/be/test/vec/olap/vertical_compaction_test.cpp
index 1876bc26371..0562eb247d3 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -56,6 +56,7 @@
 #include "olap/rowset/rowset_reader_context.h"
 #include "olap/rowset/rowset_writer.h"
 #include "olap/rowset/rowset_writer_context.h"
+#include "olap/rowset/segment_v2/segment.h"
 #include "olap/schema.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
@@ -1194,5 +1195,263 @@ TEST_F(VerticalCompactionTest, 
TestUniqueKeyVerticalMergeWithNullableSparseColum
     config::sparse_column_compaction_threshold_percent = original_threshold;
 }
 
+// Test that first-time compaction (no historical sampling) uses footer 
raw_data_bytes
+// to estimate batch_size instead of hardcoded 992.
+// This test verifies the footer-based estimation path is triggered and 
compaction succeeds.
+TEST_F(VerticalCompactionTest, TestFirstCompactionUsesFooterEstimation) {
+    // Use small data to ensure compaction completes quickly
+    auto num_input_rowset = 2;
+    auto num_segments = 1;
+    auto rows_per_segment = 1024;
+    SegmentsOverlapPB overlap = NONOVERLAPPING;
+    std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>> 
input_data;
+    generate_input_data(num_input_rowset, num_segments, rows_per_segment, 
overlap, input_data);
+
+    TabletSchemaSPtr tablet_schema = create_schema();
+
+    // Create input rowsets
+    std::vector<RowsetSharedPtr> input_rowsets;
+    for (auto i = 0; i < num_input_rowset; i++) {
+        RowsetSharedPtr rowset = create_rowset(tablet_schema, overlap, 
input_data[i], i);
+        input_rowsets.push_back(rowset);
+    }
+
+    // Create input rowset readers
+    std::vector<RowsetReaderSharedPtr> input_rs_readers;
+    for (auto& rowset : input_rowsets) {
+        RowsetReaderSharedPtr rs_reader;
+        ASSERT_TRUE(rowset->create_reader(&rs_reader).ok());
+        input_rs_readers.push_back(std::move(rs_reader));
+    }
+
+    // Create output rowset writer
+    auto writer_context = create_rowset_writer_context(tablet_schema, 
NONOVERLAPPING, 3456,
+                                                       {0, 
input_rowsets.back()->end_version()});
+    auto res = RowsetFactory::create_rowset_writer(*engine_ref, 
writer_context, true);
+    ASSERT_TRUE(res.has_value()) << res.error();
+    auto output_rs_writer = std::move(res).value();
+
+    // Create tablet - fresh tablet has no historical sampling data,
+    // so estimate_batch_size will hit the else branch and use footer 
raw_data_bytes.
+    TabletSharedPtr tablet = create_tablet(*tablet_schema, false);
+    Merger::Statistics stats;
+    RowIdConversion rowid_conversion;
+    stats.rowid_conversion = &rowid_conversion;
+
+    // Verify sample_infos are empty (no historical data)
+    auto& sample_infos = 
tablet->get_sample_infos(ReaderType::READER_BASE_COMPACTION);
+    EXPECT_TRUE(sample_infos.empty());
+
+    // Run vertical merge - this should use footer raw_data_bytes for batch 
size estimation
+    // since there is no historical sampling data.
+    // The log should contain "estimate batch size from footer" instead of the 
old hardcoded path.
+    auto s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION,
+                                            *tablet_schema, input_rs_readers,
+                                            output_rs_writer.get(), 100000, 
num_segments, &stats);
+    ASSERT_TRUE(s.ok()) << s;
+
+    RowsetSharedPtr out_rowset;
+    ASSERT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
+    EXPECT_EQ(out_rowset->rowset_meta()->num_rows(),
+              num_input_rowset * num_segments * rows_per_segment);
+
+    // After first compaction, sample_infos should be populated with 
historical data
+    // for subsequent compactions to use.
+    auto& updated_infos = 
tablet->get_sample_infos(ReaderType::READER_BASE_COMPACTION);
+    EXPECT_FALSE(updated_infos.empty());
+}
+
+// Test that raw_data_bytes in segment footer accurately reflects the original 
data size
+// for different column types, which is the foundation of footer-based batch 
size estimation.
+TEST_F(VerticalCompactionTest, TestFooterRawDataBytesAccuracy) {
+    // Create a schema with INT key + VARCHAR value to test both fixed and 
variable-length types
+    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+    TabletSchemaPB tablet_schema_pb;
+    tablet_schema_pb.set_keys_type(DUP_KEYS);
+    tablet_schema_pb.set_num_short_key_columns(1);
+    tablet_schema_pb.set_num_rows_per_row_block(1024);
+    tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
+    tablet_schema_pb.set_next_column_unique_id(3);
+
+    ColumnPB* col_int = tablet_schema_pb.add_column();
+    col_int->set_unique_id(1);
+    col_int->set_name("c_int");
+    col_int->set_type("INT");
+    col_int->set_is_key(true);
+    col_int->set_length(4);
+    col_int->set_index_length(4);
+    col_int->set_is_nullable(false);
+    col_int->set_is_bf_column(false);
+
+    ColumnPB* col_varchar = tablet_schema_pb.add_column();
+    col_varchar->set_unique_id(2);
+    col_varchar->set_name("c_varchar");
+    col_varchar->set_type("VARCHAR");
+    col_varchar->set_is_key(false);
+    col_varchar->set_length(128);
+    col_varchar->set_index_length(20);
+    col_varchar->set_is_nullable(false);
+    col_varchar->set_is_bf_column(false);
+
+    tablet_schema->init_from_pb(tablet_schema_pb);
+
+    // Write 1000 rows: INT values + VARCHAR strings of exactly 20 bytes each
+    constexpr int kNumRows = 1000;
+    constexpr int kStringLen = 20;
+    std::string fixed_string(kStringLen, 'x');
+
+    auto writer_context =
+            create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 
UINT32_MAX, {0, 0});
+    auto res = RowsetFactory::create_rowset_writer(*engine_ref, 
writer_context, true);
+    ASSERT_TRUE(res.has_value()) << res.error();
+    auto rowset_writer = std::move(res).value();
+
+    Block block = tablet_schema->create_block();
+    auto columns = block.mutate_columns();
+    for (int i = 0; i < kNumRows; i++) {
+        int32_t int_val = i;
+        columns[0]->insert_data(reinterpret_cast<const char*>(&int_val), 
sizeof(int_val));
+        columns[1]->insert_data(fixed_string.data(), fixed_string.size());
+    }
+    ASSERT_TRUE(rowset_writer->add_block(&block).ok());
+    ASSERT_TRUE(rowset_writer->flush().ok());
+
+    RowsetSharedPtr rowset;
+    ASSERT_EQ(Status::OK(), rowset_writer->build(rowset));
+    ASSERT_EQ(1, rowset->rowset_meta()->num_segments());
+    ASSERT_EQ(kNumRows, rowset->rowset_meta()->num_rows());
+
+    // Load segments and read footer's raw_data_bytes
+    auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+    ASSERT_NE(beta_rowset, nullptr);
+    std::vector<segment_v2::SegmentSharedPtr> segments;
+    ASSERT_TRUE(beta_rowset->load_segments(&segments).ok());
+    ASSERT_EQ(1, segments.size());
+    ASSERT_EQ(kNumRows, segments[0]->num_rows());
+
+    // Collect raw_data_bytes per column from footer
+    std::unordered_map<int32_t, uint64_t> raw_bytes_by_uid;
+    auto st = segments[0]->traverse_column_meta_pbs([&](const 
segment_v2::ColumnMetaPB& meta) {
+        if (meta.unique_id() >= 0 && meta.has_raw_data_bytes()) {
+            raw_bytes_by_uid[meta.unique_id()] = meta.raw_data_bytes();
+        }
+    });
+    ASSERT_TRUE(st.ok()) << st;
+
+    // Verify INT column (uid=1): raw_data_bytes should be exactly kNumRows * 
sizeof(int32_t).
+    // PageBuilder::get_raw_data_size() accumulates raw data bytes added via 
add(),
+    // for fixed-width types this is exactly N * sizeof(T).
+    ASSERT_TRUE(raw_bytes_by_uid.count(1) > 0) << "INT column raw_data_bytes 
not found in footer";
+    EXPECT_EQ(raw_bytes_by_uid[1], kNumRows * sizeof(int32_t))
+            << "INT column: expected " << kNumRows * sizeof(int32_t)
+            << " total raw_data_bytes, got " << raw_bytes_by_uid[1];
+
+    // Verify VARCHAR column (uid=2): raw_data_bytes should be exactly 
kNumRows * kStringLen.
+    // BinaryPlainPageBuilder/BinaryDictPageBuilder only accumulate src->size 
(the raw string
+    // payload), not offsets, varint length prefixes, or dictionary overhead.
+    ASSERT_TRUE(raw_bytes_by_uid.count(2) > 0)
+            << "VARCHAR column raw_data_bytes not found in footer";
+    EXPECT_EQ(raw_bytes_by_uid[2], kNumRows * kStringLen)
+            << "VARCHAR column: expected " << kNumRows * kStringLen << " total 
raw_data_bytes, got "
+            << raw_bytes_by_uid[2];
+}
+
+// Verify that raw_data_bytes only counts non-null payload for nullable
+// fixed-width columns. This is the premise that motivates the type_size
+// lower bound in merger.cpp's footer-based per-row estimation: without it,
+// a sparse nullable column would produce a per-row estimate far below the
+// reader's actual memory footprint (which still allocates the full nested
+// slot for null rows via ColumnNullable::insert_many_defaults).
+TEST_F(VerticalCompactionTest, TestFooterRawDataBytesNullableSparse) {
+    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+    TabletSchemaPB tablet_schema_pb;
+    tablet_schema_pb.set_keys_type(DUP_KEYS);
+    tablet_schema_pb.set_num_short_key_columns(1);
+    tablet_schema_pb.set_num_rows_per_row_block(1024);
+    tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
+    tablet_schema_pb.set_next_column_unique_id(3);
+
+    ColumnPB* col_key = tablet_schema_pb.add_column();
+    col_key->set_unique_id(1);
+    col_key->set_name("c_key");
+    col_key->set_type("INT");
+    col_key->set_is_key(true);
+    col_key->set_length(4);
+    col_key->set_index_length(4);
+    col_key->set_is_nullable(false);
+    col_key->set_is_bf_column(false);
+
+    ColumnPB* col_val = tablet_schema_pb.add_column();
+    col_val->set_unique_id(2);
+    col_val->set_name("c_val");
+    col_val->set_type("INT");
+    col_val->set_is_key(false);
+    col_val->set_length(4);
+    col_val->set_index_length(4);
+    col_val->set_is_nullable(true);
+    col_val->set_is_bf_column(false);
+
+    tablet_schema->init_from_pb(tablet_schema_pb);
+
+    constexpr int kNumRows = 1000;
+    constexpr int kNonNullCount = 100; // 10% non-null, 90% null
+
+    auto writer_context =
+            create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 
UINT32_MAX, {0, 0});
+    auto res = RowsetFactory::create_rowset_writer(*engine_ref, 
writer_context, true);
+    ASSERT_TRUE(res.has_value()) << res.error();
+    auto rowset_writer = std::move(res).value();
+
+    Block block = tablet_schema->create_block();
+    auto columns = block.mutate_columns();
+    for (int i = 0; i < kNumRows; i++) {
+        int32_t key_val = i;
+        columns[0]->insert_data(reinterpret_cast<const char*>(&key_val), 
sizeof(key_val));
+        if (i < kNonNullCount) {
+            int32_t val = i;
+            columns[1]->insert_data(reinterpret_cast<const char*>(&val), 
sizeof(val));
+        } else {
+            columns[1]->insert_default(); // ColumnNullable default is null
+        }
+    }
+    ASSERT_TRUE(rowset_writer->add_block(&block).ok());
+    ASSERT_TRUE(rowset_writer->flush().ok());
+
+    RowsetSharedPtr rowset;
+    ASSERT_EQ(Status::OK(), rowset_writer->build(rowset));
+    ASSERT_EQ(1, rowset->rowset_meta()->num_segments());
+
+    auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+    ASSERT_NE(beta_rowset, nullptr);
+    std::vector<segment_v2::SegmentSharedPtr> segments;
+    ASSERT_TRUE(beta_rowset->load_segments(&segments).ok());
+    ASSERT_EQ(1, segments.size());
+
+    std::unordered_map<int32_t, uint64_t> raw_bytes_by_uid;
+    auto st = segments[0]->traverse_column_meta_pbs([&](const 
segment_v2::ColumnMetaPB& meta) {
+        if (meta.unique_id() >= 0 && meta.has_raw_data_bytes()) {
+            raw_bytes_by_uid[meta.unique_id()] = meta.raw_data_bytes();
+        }
+    });
+    ASSERT_TRUE(st.ok()) << st;
+
+    // Key column (non-null INT): full coverage, raw == kNumRows * 4.
+    ASSERT_TRUE(raw_bytes_by_uid.count(1) > 0);
+    EXPECT_EQ(raw_bytes_by_uid[1], kNumRows * sizeof(int32_t));
+
+    // Value column (nullable INT, 90% null): raw_data_bytes only reflects
+    // the non-null payload because ScalarColumnWriter::append_nulls() does
+    // not advance the page builder. So raw == kNonNullCount * 4.
+    //
+    // If merger.cpp used `raw / total_rows` directly the per-row estimate
+    // would be ~0.4 bytes (+1 for null map = 1.4), but the reader actually
+    // allocates 4 bytes for every nested slot (regardless of null-ness)
+    // plus 1 byte of null map = 5 bytes/row. The fixed-width type_size
+    // lower bound is what closes that ~3.5x gap.
+    ASSERT_TRUE(raw_bytes_by_uid.count(2) > 0);
+    EXPECT_EQ(raw_bytes_by_uid[2], kNonNullCount * sizeof(int32_t));
+    EXPECT_LT(raw_bytes_by_uid[2], kNumRows * sizeof(int32_t));
+}
+
 } // namespace vectorized
 } // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
index 2f619a4cc53..c2e8532a684 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
@@ -394,13 +394,16 @@ public class DeleteJob extends 
AbstractTxnStateChangeCallback implements DeleteJ
         long timeoutMs = getTimeoutMs();
         boolean ok = countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
         if (ok) {
+            checkAndUpdateQuorum();
+            if (state == DeleteState.QUORUM_FINISHED || state == 
DeleteState.FINISHED) {
+                return;
+            }
             if (!countDownLatch.getStatus().ok()) {
                 // encounter some errors that don't need to retry, abort 
directly
                 LOG.warn("delete job failed, errmsg={}", 
countDownLatch.getStatus().getErrorMsg());
                 throw new UserException(String.format("delete job failed, 
errmsg:%s",
                         countDownLatch.getStatus().getErrorMsg()));
             }
-            return;
         }
 
         //handle failure
@@ -420,19 +423,6 @@ public class DeleteJob extends 
AbstractTxnStateChangeCallback implements DeleteJ
                 throw new UserException(String.format("delete job timeout, 
timeout(ms):%s, msg:%s", timeoutMs, errMsg));
             case QUORUM_FINISHED:
             case FINISHED:
-                long nowQuorumTimeMs = System.currentTimeMillis();
-                long endQuorumTimeoutMs = nowQuorumTimeMs + timeoutMs / 2;
-                // if job's state is quorum_finished then wait for a period of 
time and commit it.
-                while (state == DeleteState.QUORUM_FINISHED
-                        && endQuorumTimeoutMs > nowQuorumTimeMs) {
-                    checkAndUpdateQuorum();
-                    Thread.sleep(1000);
-                    nowQuorumTimeMs = System.currentTimeMillis();
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("wait for quorum finished delete job: {}, 
txn id: {}",
-                                id, transactionId);
-                    }
-                }
                 break;
             default:
                 throw new IllegalStateException("wrong delete job state: " + 
state.name());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
index cb06f4d27f6..1862df155c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
@@ -224,6 +224,12 @@ public class PushTask extends AgentTask {
         }
     }
 
+    @Override
+    public void failedWithMsg(String errMsg) {
+        super.failedWithMsg(errMsg);
+        countDownLatchWithStatus(getBackendId(), getTabletId(), new 
Status(TStatusCode.CANCELLED, errMsg));
+    }
+
     // call this always means one of tasks is failed. count down to zero to 
finish entire task
     public void countDownToZero(TStatusCode code, String errMsg) {
         if (this.latch != null) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteJobTest.java
new file mode 100644
index 00000000000..d1cf2a2f616
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteJobTest.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.common.MarkedCountDownLatch;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TStorageMedium;
+
+import com.google.common.collect.Lists;
+import mockit.Mock;
+import mockit.MockUp;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class DeleteJobTest {
+    private static final long DB_ID = 1L;
+    private static final long TABLE_ID = 2L;
+    private static final long PARTITION_ID = 3L;
+    private static final long TABLET_ID = 4L;
+
+    private final Database database = new Database(DB_ID, "db");
+    private final InternalCatalog catalog = 
Deencapsulation.newInstance(InternalCatalog.class);
+    private final TabletInvertedIndex invertedIndex = new 
TabletInvertedIndex() {
+        @Override
+        public List<Replica> getReplicas(Long tabletId) {
+            return Lists.newArrayList();
+        }
+
+        @Override
+        public void deleteTablet(long tabletId) {
+        }
+
+        @Override
+        public void addReplica(long tabletId, Replica replica) {
+        }
+
+        @Override
+        public void deleteReplica(long tabletId, long backendId) {
+        }
+
+        @Override
+        public Replica getReplica(long tabletId, long backendId) {
+            return null;
+        }
+
+        @Override
+        public List<Replica> getReplicasByTabletId(long tabletId) {
+            return Lists.newArrayList();
+        }
+    };
+
+    @Before
+    public void setUp() {
+        invertedIndex.addTablet(TABLET_ID, new TabletMeta(DB_ID, TABLE_ID, 
PARTITION_ID, 5L, 6, TStorageMedium.HDD));
+
+        new MockUp<Env>() {
+            @Mock
+            public InternalCatalog getCurrentInternalCatalog() {
+                return catalog;
+            }
+
+            @Mock
+            public TabletInvertedIndex getCurrentInvertedIndex() {
+                return invertedIndex;
+            }
+        };
+
+        new MockUp<InternalCatalog>() {
+            @Mock
+            public Database getDbOrMetaException(long dbId) {
+                return database;
+            }
+        };
+    }
+
+    @Test
+    public void testAwaitIgnoresFailureStatusAfterQuorumReached() throws 
Exception {
+        MarkedCountDownLatch<Long, Long> latch = new 
MarkedCountDownLatch<Long, Long>(1);
+        latch.addMark(1L, TABLET_ID);
+        latch.markedCountDownWithStatus(1L, TABLET_ID, new 
Status(TStatusCode.INTERNAL_ERROR, "too many versions"));
+
+        DeleteJob deleteJob = newDeleteJob((short) 3, latch);
+        deleteJob.addFinishedReplica(PARTITION_ID, TABLET_ID,
+                new Replica(11L, 21L, Replica.ReplicaState.NORMAL, 1L, 6));
+        deleteJob.addFinishedReplica(PARTITION_ID, TABLET_ID,
+                new Replica(12L, 22L, Replica.ReplicaState.NORMAL, 1L, 6));
+
+        deleteJob.await();
+
+        Assert.assertEquals(DeleteJob.DeleteState.QUORUM_FINISHED, 
deleteJob.getState());
+    }
+
+    @Test
+    public void testAwaitReturnsFailureStatusWhenQuorumNotReached() {
+        MarkedCountDownLatch<Long, Long> latch = new 
MarkedCountDownLatch<Long, Long>(1);
+        latch.addMark(1L, TABLET_ID);
+        latch.markedCountDownWithStatus(1L, TABLET_ID, new 
Status(TStatusCode.INTERNAL_ERROR, "too many versions"));
+
+        DeleteJob deleteJob = newDeleteJob((short) 1, latch);
+
+        try {
+            deleteJob.await();
+            Assert.fail("delete job should fail when quorum is not reached");
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("too many versions"));
+            Assert.assertEquals(DeleteJob.DeleteState.UN_QUORUM, 
deleteJob.getState());
+        } catch (Exception e) {
+            Assert.fail("unexpected exception: " + e.getMessage());
+        }
+    }
+
+    private DeleteJob newDeleteJob(short replicaNum, 
MarkedCountDownLatch<Long, Long> latch) {
+        DeleteInfo deleteInfo = new DeleteInfo(DB_ID, TABLE_ID, "tbl", 
Collections.emptyList(),
+                false, Lists.newArrayList(PARTITION_ID), 
Lists.newArrayList("p1"));
+        Map<Long, Short> partitionReplicaNum = 
Collections.singletonMap(PARTITION_ID, replicaNum);
+        DeleteJob deleteJob = new DeleteJob(100L, 200L, "label", 
partitionReplicaNum, deleteInfo);
+        deleteJob.setCountDownLatch(latch);
+        Set<Long> totalTablets = Deencapsulation.getField(deleteJob, 
"totalTablets");
+        totalTablets.add(TABLET_ID);
+        return deleteJob;
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/master/MasterImplDeleteTaskTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/master/MasterImplDeleteTaskTest.java
new file mode 100644
index 00000000000..3bf846b05f9
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/master/MasterImplDeleteTaskTest.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.master;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.MarkedCountDownLatch;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.PushTask;
+import org.apache.doris.thrift.TBackend;
+import org.apache.doris.thrift.TFinishTaskRequest;
+import org.apache.doris.thrift.TPriority;
+import org.apache.doris.thrift.TPushType;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TTaskType;
+
+import com.google.common.collect.Lists;
+import mockit.Mock;
+import mockit.MockUp;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MasterImplDeleteTaskTest {
+    private static final long BACKEND_ID = 10000L;
+    private static final long DB_ID = 20000L;
+    private static final long TABLE_ID = 30000L;
+    private static final long PARTITION_ID = 40000L;
+    private static final long INDEX_ID = 50000L;
+    private static final long TABLET_ID = 60000L;
+    private static final long REPLICA_ID = 70000L;
+    private static final long TRANSACTION_ID = 80000L;
+    private static final long SIGNATURE = 90000L;
+    private static final String HOST = "127.0.0.1";
+    private static final int HEARTBEAT_PORT = 9050;
+    private static final int BE_PORT = 9060;
+
+    private MasterImpl masterImpl;
+
+    @Before
+    public void setUp() {
+        AgentTaskQueue.clearAllTasks();
+
+        SystemInfoService systemInfoService = new SystemInfoService();
+        Backend backend = new Backend(BACKEND_ID, HOST, HEARTBEAT_PORT);
+        backend.setBePort(BE_PORT);
+        systemInfoService.addBackend(backend);
+
+        new MockUp<Env>() {
+            @Mock
+            public SystemInfoService getCurrentSystemInfo() {
+                return systemInfoService;
+            }
+        };
+
+        new MockUp<ReportHandler>() {
+            @Mock
+            public void start() {
+            }
+        };
+
+        masterImpl = new MasterImpl();
+    }
+
+    @After
+    public void tearDown() {
+        AgentTaskQueue.clearAllTasks();
+    }
+
+    @Test
+    public void testDeletePushGenericFailureCountsDownSingleMark() {
+        MarkedCountDownLatch<Long, Long> latch = new 
MarkedCountDownLatch<Long, Long>(2);
+        latch.addMark(BACKEND_ID, TABLET_ID);
+        latch.addMark(BACKEND_ID + 1, TABLET_ID + 1);
+
+        PushTask pushTask = newDeletePushTask(latch);
+        AgentTaskQueue.addTask(pushTask);
+
+        
masterImpl.finishTask(newFinishTaskRequest(TStatusCode.INTERNAL_ERROR));
+
+        Assert.assertEquals(1, latch.getCount());
+        Assert.assertEquals(TStatusCode.INTERNAL_ERROR, 
latch.getStatus().getErrorCode());
+        Assert.assertEquals(1, pushTask.getFailedTimes());
+        Assert.assertNull(AgentTaskQueue.getTask(BACKEND_ID, 
TTaskType.REALTIME_PUSH, SIGNATURE));
+    }
+
+    @Test
+    public void testDeletePushInvalidArgumentCountsDownToZero() {
+        MarkedCountDownLatch<Long, Long> latch = new 
MarkedCountDownLatch<Long, Long>(2);
+        latch.addMark(BACKEND_ID, TABLET_ID);
+        latch.addMark(BACKEND_ID + 1, TABLET_ID + 1);
+
+        PushTask pushTask = newDeletePushTask(latch);
+        AgentTaskQueue.addTask(pushTask);
+
+        
masterImpl.finishTask(newFinishTaskRequest(TStatusCode.INVALID_ARGUMENT));
+
+        Assert.assertEquals(0, latch.getCount());
+        Assert.assertEquals(TStatusCode.INVALID_ARGUMENT, 
latch.getStatus().getErrorCode());
+        Assert.assertEquals(1, pushTask.getFailedTimes());
+        Assert.assertNull(AgentTaskQueue.getTask(BACKEND_ID, 
TTaskType.REALTIME_PUSH, SIGNATURE));
+    }
+
+    @Test
+    public void testDeletePushFailedWithMsgKeepsFailureStatus() {
+        MarkedCountDownLatch<Long, Long> latch = new 
MarkedCountDownLatch<Long, Long>(1);
+        latch.addMark(BACKEND_ID, TABLET_ID);
+
+        PushTask pushTask = newDeletePushTask(latch);
+        pushTask.failedWithMsg("submit failed");
+
+        Assert.assertEquals(0, latch.getCount());
+        Assert.assertEquals(TStatusCode.CANCELLED, 
latch.getStatus().getErrorCode());
+        Assert.assertEquals("submit failed", latch.getStatus().getErrorMsg());
+    }
+
+    private PushTask newDeletePushTask(MarkedCountDownLatch<Long, Long> latch) 
{
+        PushTask pushTask = new PushTask(null, BACKEND_ID, DB_ID, TABLE_ID, 
PARTITION_ID, INDEX_ID,
+                TABLET_ID, REPLICA_ID, 1, 1L, null, 0, 60, -1, 
TPushType.DELETE, null,
+                false, TPriority.NORMAL, TTaskType.REALTIME_PUSH, 
TRANSACTION_ID, SIGNATURE,
+                null, null, null, 0, null);
+        pushTask.setCountDownLatch(latch);
+        return pushTask;
+    }
+
+    private TFinishTaskRequest newFinishTaskRequest(TStatusCode statusCode) {
+        TStatus taskStatus = new TStatus(statusCode);
+        taskStatus.setErrorMsgs(Lists.newArrayList("delete failed"));
+        TBackend tBackend = new TBackend(HOST, BE_PORT, 0);
+        TFinishTaskRequest request = new TFinishTaskRequest(tBackend, 
TTaskType.REALTIME_PUSH, SIGNATURE, taskStatus);
+        request.setReportVersion(1L);
+        return request;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to