This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 52c6c8250b6 branch-4.0: [fix](be fe) pick #62263 #61647 (#63816)
52c6c8250b6 is described below
commit 52c6c8250b6212af918dbb7d60901e7812dd18d1
Author: Jimmy <[email protected]>
AuthorDate: Tue Jun 9 16:31:48 2026 +0800
branch-4.0: [fix](be fe) pick #62263 #61647 (#63816)
pick https://github.com/apache/doris/pull/62263
pick https://github.com/apache/doris/pull/61647
---------
Co-authored-by: Siyang Tang <[email protected]>
---
be/src/olap/merger.cpp | 142 ++++++++++-
be/src/vec/olap/vertical_merge_iterator.h | 2 +-
be/test/vec/olap/vertical_compaction_test.cpp | 259 +++++++++++++++++++++
.../main/java/org/apache/doris/load/DeleteJob.java | 18 +-
.../main/java/org/apache/doris/task/PushTask.java | 6 +
.../java/org/apache/doris/load/DeleteJobTest.java | 152 ++++++++++++
.../doris/master/MasterImplDeleteTaskTest.java | 152 ++++++++++++
7 files changed, 713 insertions(+), 18 deletions(-)
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 8316ae94646..279b1fe51a5 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -30,6 +30,7 @@
#include <ostream>
#include <shared_mutex>
#include <string>
+#include <unordered_map>
#include <utility>
#include <vector>
@@ -42,15 +43,18 @@
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowid_conversion.h"
+#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/segment_v2/segment.h"
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_reader.h"
+#include "olap/types.h"
#include "olap/utils.h"
#include "util/slice.h"
#include "vec/core/block.h"
@@ -413,7 +417,8 @@ Status Merger::vertical_compact_one_group(
}
int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t
way_cnt,
- ReaderType reader_type) {
+ ReaderType reader_type, int64_t
group_per_row_from_footer,
+ bool footer_fallback) {
auto& sample_info_lock = tablet->get_sample_info_lock(reader_type);
auto& sample_infos = tablet->get_sample_infos(reader_type);
std::unique_lock<std::mutex> lock(sample_info_lock);
@@ -441,9 +446,21 @@ int64_t estimate_batch_size(int group_index,
BaseTabletSPtr tablet, int64_t way_
group_data_size = info.bytes / info.rows;
sample_infos[group_index].group_data_size = group_data_size;
} else {
+ // No historical sampling data available.
+ // Try to use raw_data_bytes from segment footer for a better estimate.
+ if (!footer_fallback && group_per_row_from_footer > 0) {
+ int64_t batch_size = block_mem_limit / group_per_row_from_footer;
+ int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)),
int64_t(32L));
+ LOG(INFO) << "estimate batch size from footer for vertical
compaction, tablet id: "
+ << tablet->tablet_id()
+ << " group_per_row_from_footer: " <<
group_per_row_from_footer
+ << " way cnt: " << way_cnt << " batch size: " << res;
+ return res;
+ }
LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
<< tablet->tablet_id() << " group data size: " <<
info.group_data_size
- << " row num: " << info.rows << " consume bytes: " <<
info.bytes;
+ << " row num: " << info.rows << " consume bytes: " <<
info.bytes
+ << " footer_fallback: " << footer_fallback;
return 1024 - 32;
}
@@ -523,13 +540,132 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr
tablet, ReaderType reader_t
std::unique_lock<std::mutex> lock(sample_info_lock);
sample_infos.resize(column_groups.size());
}
+ // Collect per-column raw_data_bytes from segment footer for first-time
batch size estimation.
+ // raw_data_bytes is the original data size before encoding, close to
runtime Block::bytes().
+ // Only collect when needed: skip if manual batch_size override is set, or
if ALL groups
+ // already have historical sampling data. Use per-group granularity so
that schema evolution
+ // (new groups without history) still gets footer-based estimation.
+ struct ColumnRawSizeInfo {
+ int64_t total_raw_bytes = 0;
+ int64_t rows_with_data = 0;
+ };
+ std::unordered_map<int32_t, ColumnRawSizeInfo> column_raw_sizes;
+ bool need_footer_collection = false;
+ if (config::compaction_batch_size == -1) {
+ std::unique_lock<std::mutex> lock(sample_info_lock);
+ for (const auto& info : sample_infos) {
+ if (info.group_data_size <= 0 && info.bytes <= 0 && info.rows <=
0) {
+ need_footer_collection = true;
+ break;
+ }
+ }
+ }
+ if (need_footer_collection) {
+ for (const auto& rs_reader : src_rowset_readers) {
+ auto beta_rowset =
std::dynamic_pointer_cast<BetaRowset>(rs_reader->rowset());
+ if (!beta_rowset) {
+ continue;
+ }
+ std::vector<segment_v2::SegmentSharedPtr> segments;
+ auto st = beta_rowset->load_segments(&segments);
+ if (!st.ok()) {
+ LOG(WARNING) << "Failed to load segments for footer
raw_data_bytes collection"
+ << ", tablet_id: " << tablet->tablet_id()
+ << ", rowset_id: " << beta_rowset->rowset_id() <<
", status: " << st;
+ continue;
+ }
+ for (const auto& segment : segments) {
+ int64_t row_count = segment->num_rows();
+ auto collect_st = segment->traverse_column_meta_pbs(
+ [&](const segment_v2::ColumnMetaPB& meta) {
+ int32_t uid = meta.unique_id();
+ if (uid >= 0 && meta.has_raw_data_bytes()) {
+ auto& info = column_raw_sizes[uid];
+ info.total_raw_bytes += meta.raw_data_bytes();
+ info.rows_with_data += row_count;
+ }
+ });
+ if (!collect_st.ok()) {
+ LOG(WARNING) << "Failed to traverse column meta for footer
collection"
+ << ", tablet_id: " << tablet->tablet_id()
+ << ", status: " << collect_st;
+ }
+ }
+ }
+ }
+
+ // Pre-compute per-row estimate for each column group from footer data.
+ std::vector<int64_t> group_per_row_from_footer(column_groups.size(), 0);
+ std::vector<bool> group_footer_fallback(column_groups.size(), false);
+ for (size_t i = 0; i < column_groups.size(); ++i) {
+ int64_t group_per_row = 0;
+ bool need_fallback = false;
+ for (uint32_t col_ordinal : column_groups[i]) {
+ const auto& col = tablet_schema.column(col_ordinal);
+ int32_t uid = col.unique_id();
+
+ // Variant columns (root or subcolumn): raw_data_bytes is 0 (TODO
in writer),
+ // cannot estimate from footer, fallback to default for the entire
group.
+ if (uid < 0 || col.is_variant_type()) {
+ need_fallback = true;
+ break;
+ }
+
+ // Any column without footer data (e.g. legacy segments written
before
+ // raw_data_bytes existed) makes the group sample partial and
unreliable.
+ // Fall back to the default for the whole group instead of summing
only
+ // the columns we measured.
+ auto it = column_raw_sizes.find(uid);
+ if (it == column_raw_sizes.end() || it->second.rows_with_data <=
0) {
+ need_fallback = true;
+ break;
+ }
+
+ int64_t raw_per_row = it->second.total_raw_bytes /
it->second.rows_with_data;
+ int64_t col_per_row = 0;
+
+ if (col.type() == FieldType::OLAP_FIELD_TYPE_ARRAY ||
+ col.type() == FieldType::OLAP_FIELD_TYPE_MAP ||
+ col.type() == FieldType::OLAP_FIELD_TYPE_STRUCT) {
+ // Complex types: raw_data_bytes recursively aggregates
sub-writers.
+ col_per_row = raw_per_row;
+ } else if (col.is_length_variable_type()) {
+ // Variable-length scalar (VARCHAR/STRING/HLL/BITMAP/...):
raw_per_row
+ // is the average char payload across all rows; reader still
pays an
+ // 8-byte offset entry per row regardless of null-ness.
+ col_per_row = raw_per_row + 8;
+ if (col.is_nullable()) {
+ col_per_row += 1; // null map
+ }
+ } else {
+ // Fixed-width scalar (INT/BIGINT/DOUBLE/DATE/...).
+ // raw_data_bytes only counts non-null payload (append_nulls()
does
+ // not advance the page builder), but
FileColumnIterator::next_batch
+ // still calls ColumnNullable::insert_many_defaults() for null
runs,
+ // which grows the nested PODArray by N * type_size. So the
runtime
+ // per-row footprint is at least type_size, no matter how
sparse.
+ int64_t type_size = get_type_info(&col)->size();
+ col_per_row = std::max(raw_per_row, type_size);
+ if (col.is_nullable()) {
+ col_per_row += 1; // null map
+ }
+ }
+
+ group_per_row += col_per_row;
+ }
+ group_per_row_from_footer[i] = group_per_row;
+ group_footer_fallback[i] = need_fallback;
+ }
+
// compact group one by one
for (auto i = 0; i < column_groups.size(); ++i) {
VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
bool is_key = (i == 0);
int64_t batch_size = config::compaction_batch_size != -1
? config::compaction_batch_size
- : estimate_batch_size(i, tablet,
merge_way_num, reader_type);
+ : estimate_batch_size(i, tablet,
merge_way_num, reader_type,
+
group_per_row_from_footer[i],
+
group_footer_fallback[i]);
CompactionSampleInfo sample_info;
Merger::Statistics group_stats;
group_stats.rowid_conversion = total_stats.rowid_conversion;
diff --git a/be/src/vec/olap/vertical_merge_iterator.h
b/be/src/vec/olap/vertical_merge_iterator.h
index 45865d8c7c1..d883af930d8 100644
--- a/be/src/vec/olap/vertical_merge_iterator.h
+++ b/be/src/vec/olap/vertical_merge_iterator.h
@@ -210,7 +210,7 @@ public:
size_t bytes() {
if (_block) {
- return _block->bytes();
+ return _block->allocated_bytes();
} else {
return 0;
}
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp
b/be/test/vec/olap/vertical_compaction_test.cpp
index 1876bc26371..0562eb247d3 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -56,6 +56,7 @@
#include "olap/rowset/rowset_reader_context.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
+#include "olap/rowset/segment_v2/segment.h"
#include "olap/schema.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
@@ -1194,5 +1195,263 @@ TEST_F(VerticalCompactionTest,
TestUniqueKeyVerticalMergeWithNullableSparseColum
config::sparse_column_compaction_threshold_percent = original_threshold;
}
+// Test that first-time compaction (no historical sampling) uses footer
raw_data_bytes
+// to estimate batch_size instead of hardcoded 992.
+// This test verifies the footer-based estimation path is triggered and
compaction succeeds.
+TEST_F(VerticalCompactionTest, TestFirstCompactionUsesFooterEstimation) {
+ // Use small data to ensure compaction completes quickly
+ auto num_input_rowset = 2;
+ auto num_segments = 1;
+ auto rows_per_segment = 1024;
+ SegmentsOverlapPB overlap = NONOVERLAPPING;
+ std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>>
input_data;
+ generate_input_data(num_input_rowset, num_segments, rows_per_segment,
overlap, input_data);
+
+ TabletSchemaSPtr tablet_schema = create_schema();
+
+ // Create input rowsets
+ std::vector<RowsetSharedPtr> input_rowsets;
+ for (auto i = 0; i < num_input_rowset; i++) {
+ RowsetSharedPtr rowset = create_rowset(tablet_schema, overlap,
input_data[i], i);
+ input_rowsets.push_back(rowset);
+ }
+
+ // Create input rowset readers
+ std::vector<RowsetReaderSharedPtr> input_rs_readers;
+ for (auto& rowset : input_rowsets) {
+ RowsetReaderSharedPtr rs_reader;
+ ASSERT_TRUE(rowset->create_reader(&rs_reader).ok());
+ input_rs_readers.push_back(std::move(rs_reader));
+ }
+
+ // Create output rowset writer
+ auto writer_context = create_rowset_writer_context(tablet_schema,
NONOVERLAPPING, 3456,
+ {0,
input_rowsets.back()->end_version()});
+ auto res = RowsetFactory::create_rowset_writer(*engine_ref,
writer_context, true);
+ ASSERT_TRUE(res.has_value()) << res.error();
+ auto output_rs_writer = std::move(res).value();
+
+ // Create tablet - fresh tablet has no historical sampling data,
+ // so estimate_batch_size will hit the else branch and use footer
raw_data_bytes.
+ TabletSharedPtr tablet = create_tablet(*tablet_schema, false);
+ Merger::Statistics stats;
+ RowIdConversion rowid_conversion;
+ stats.rowid_conversion = &rowid_conversion;
+
+ // Verify sample_infos are empty (no historical data)
+ auto& sample_infos =
tablet->get_sample_infos(ReaderType::READER_BASE_COMPACTION);
+ EXPECT_TRUE(sample_infos.empty());
+
+ // Run vertical merge - this should use footer raw_data_bytes for batch
size estimation
+ // since there is no historical sampling data.
+ // The log should contain "estimate batch size from footer" instead of the
old hardcoded path.
+ auto s = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION,
+ *tablet_schema, input_rs_readers,
+ output_rs_writer.get(), 100000,
num_segments, &stats);
+ ASSERT_TRUE(s.ok()) << s;
+
+ RowsetSharedPtr out_rowset;
+ ASSERT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
+ EXPECT_EQ(out_rowset->rowset_meta()->num_rows(),
+ num_input_rowset * num_segments * rows_per_segment);
+
+ // After first compaction, sample_infos should be populated with
historical data
+ // for subsequent compactions to use.
+ auto& updated_infos =
tablet->get_sample_infos(ReaderType::READER_BASE_COMPACTION);
+ EXPECT_FALSE(updated_infos.empty());
+}
+
+// Test that raw_data_bytes in segment footer accurately reflects the original
data size
+// for different column types, which is the foundation of footer-based batch
size estimation.
+TEST_F(VerticalCompactionTest, TestFooterRawDataBytesAccuracy) {
+ // Create a schema with INT key + VARCHAR value to test both fixed and
variable-length types
+ TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+ TabletSchemaPB tablet_schema_pb;
+ tablet_schema_pb.set_keys_type(DUP_KEYS);
+ tablet_schema_pb.set_num_short_key_columns(1);
+ tablet_schema_pb.set_num_rows_per_row_block(1024);
+ tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
+ tablet_schema_pb.set_next_column_unique_id(3);
+
+ ColumnPB* col_int = tablet_schema_pb.add_column();
+ col_int->set_unique_id(1);
+ col_int->set_name("c_int");
+ col_int->set_type("INT");
+ col_int->set_is_key(true);
+ col_int->set_length(4);
+ col_int->set_index_length(4);
+ col_int->set_is_nullable(false);
+ col_int->set_is_bf_column(false);
+
+ ColumnPB* col_varchar = tablet_schema_pb.add_column();
+ col_varchar->set_unique_id(2);
+ col_varchar->set_name("c_varchar");
+ col_varchar->set_type("VARCHAR");
+ col_varchar->set_is_key(false);
+ col_varchar->set_length(128);
+ col_varchar->set_index_length(20);
+ col_varchar->set_is_nullable(false);
+ col_varchar->set_is_bf_column(false);
+
+ tablet_schema->init_from_pb(tablet_schema_pb);
+
+ // Write 1000 rows: INT values + VARCHAR strings of exactly 20 bytes each
+ constexpr int kNumRows = 1000;
+ constexpr int kStringLen = 20;
+ std::string fixed_string(kStringLen, 'x');
+
+ auto writer_context =
+ create_rowset_writer_context(tablet_schema, NONOVERLAPPING,
UINT32_MAX, {0, 0});
+ auto res = RowsetFactory::create_rowset_writer(*engine_ref,
writer_context, true);
+ ASSERT_TRUE(res.has_value()) << res.error();
+ auto rowset_writer = std::move(res).value();
+
+ Block block = tablet_schema->create_block();
+ auto columns = block.mutate_columns();
+ for (int i = 0; i < kNumRows; i++) {
+ int32_t int_val = i;
+ columns[0]->insert_data(reinterpret_cast<const char*>(&int_val),
sizeof(int_val));
+ columns[1]->insert_data(fixed_string.data(), fixed_string.size());
+ }
+ ASSERT_TRUE(rowset_writer->add_block(&block).ok());
+ ASSERT_TRUE(rowset_writer->flush().ok());
+
+ RowsetSharedPtr rowset;
+ ASSERT_EQ(Status::OK(), rowset_writer->build(rowset));
+ ASSERT_EQ(1, rowset->rowset_meta()->num_segments());
+ ASSERT_EQ(kNumRows, rowset->rowset_meta()->num_rows());
+
+ // Load segments and read footer's raw_data_bytes
+ auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+ ASSERT_NE(beta_rowset, nullptr);
+ std::vector<segment_v2::SegmentSharedPtr> segments;
+ ASSERT_TRUE(beta_rowset->load_segments(&segments).ok());
+ ASSERT_EQ(1, segments.size());
+ ASSERT_EQ(kNumRows, segments[0]->num_rows());
+
+ // Collect raw_data_bytes per column from footer
+ std::unordered_map<int32_t, uint64_t> raw_bytes_by_uid;
+ auto st = segments[0]->traverse_column_meta_pbs([&](const
segment_v2::ColumnMetaPB& meta) {
+ if (meta.unique_id() >= 0 && meta.has_raw_data_bytes()) {
+ raw_bytes_by_uid[meta.unique_id()] = meta.raw_data_bytes();
+ }
+ });
+ ASSERT_TRUE(st.ok()) << st;
+
+ // Verify INT column (uid=1): raw_data_bytes should be exactly kNumRows *
sizeof(int32_t).
+ // PageBuilder::get_raw_data_size() accumulates raw data bytes added via
add(),
+ // for fixed-width types this is exactly N * sizeof(T).
+ ASSERT_TRUE(raw_bytes_by_uid.count(1) > 0) << "INT column raw_data_bytes
not found in footer";
+ EXPECT_EQ(raw_bytes_by_uid[1], kNumRows * sizeof(int32_t))
+ << "INT column: expected " << kNumRows * sizeof(int32_t)
+ << " total raw_data_bytes, got " << raw_bytes_by_uid[1];
+
+ // Verify VARCHAR column (uid=2): raw_data_bytes should be exactly
kNumRows * kStringLen.
+ // BinaryPlainPageBuilder/BinaryDictPageBuilder only accumulate src->size
(the raw string
+ // payload), not offsets, varint length prefixes, or dictionary overhead.
+ ASSERT_TRUE(raw_bytes_by_uid.count(2) > 0)
+ << "VARCHAR column raw_data_bytes not found in footer";
+ EXPECT_EQ(raw_bytes_by_uid[2], kNumRows * kStringLen)
+ << "VARCHAR column: expected " << kNumRows * kStringLen << " total
raw_data_bytes, got "
+ << raw_bytes_by_uid[2];
+}
+
+// Verify that raw_data_bytes only counts non-null payload for nullable
+// fixed-width columns. This is the premise that motivates the type_size
+// lower bound in merger.cpp's footer-based per-row estimation: without it,
+// a sparse nullable column would produce a per-row estimate far below the
+// reader's actual memory footprint (which still allocates the full nested
+// slot for null rows via ColumnNullable::insert_many_defaults).
+TEST_F(VerticalCompactionTest, TestFooterRawDataBytesNullableSparse) {
+ TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+ TabletSchemaPB tablet_schema_pb;
+ tablet_schema_pb.set_keys_type(DUP_KEYS);
+ tablet_schema_pb.set_num_short_key_columns(1);
+ tablet_schema_pb.set_num_rows_per_row_block(1024);
+ tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
+ tablet_schema_pb.set_next_column_unique_id(3);
+
+ ColumnPB* col_key = tablet_schema_pb.add_column();
+ col_key->set_unique_id(1);
+ col_key->set_name("c_key");
+ col_key->set_type("INT");
+ col_key->set_is_key(true);
+ col_key->set_length(4);
+ col_key->set_index_length(4);
+ col_key->set_is_nullable(false);
+ col_key->set_is_bf_column(false);
+
+ ColumnPB* col_val = tablet_schema_pb.add_column();
+ col_val->set_unique_id(2);
+ col_val->set_name("c_val");
+ col_val->set_type("INT");
+ col_val->set_is_key(false);
+ col_val->set_length(4);
+ col_val->set_index_length(4);
+ col_val->set_is_nullable(true);
+ col_val->set_is_bf_column(false);
+
+ tablet_schema->init_from_pb(tablet_schema_pb);
+
+ constexpr int kNumRows = 1000;
+ constexpr int kNonNullCount = 100; // 10% non-null, 90% null
+
+ auto writer_context =
+ create_rowset_writer_context(tablet_schema, NONOVERLAPPING,
UINT32_MAX, {0, 0});
+ auto res = RowsetFactory::create_rowset_writer(*engine_ref,
writer_context, true);
+ ASSERT_TRUE(res.has_value()) << res.error();
+ auto rowset_writer = std::move(res).value();
+
+ Block block = tablet_schema->create_block();
+ auto columns = block.mutate_columns();
+ for (int i = 0; i < kNumRows; i++) {
+ int32_t key_val = i;
+ columns[0]->insert_data(reinterpret_cast<const char*>(&key_val),
sizeof(key_val));
+ if (i < kNonNullCount) {
+ int32_t val = i;
+ columns[1]->insert_data(reinterpret_cast<const char*>(&val),
sizeof(val));
+ } else {
+ columns[1]->insert_default(); // ColumnNullable default is null
+ }
+ }
+ ASSERT_TRUE(rowset_writer->add_block(&block).ok());
+ ASSERT_TRUE(rowset_writer->flush().ok());
+
+ RowsetSharedPtr rowset;
+ ASSERT_EQ(Status::OK(), rowset_writer->build(rowset));
+ ASSERT_EQ(1, rowset->rowset_meta()->num_segments());
+
+ auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+ ASSERT_NE(beta_rowset, nullptr);
+ std::vector<segment_v2::SegmentSharedPtr> segments;
+ ASSERT_TRUE(beta_rowset->load_segments(&segments).ok());
+ ASSERT_EQ(1, segments.size());
+
+ std::unordered_map<int32_t, uint64_t> raw_bytes_by_uid;
+ auto st = segments[0]->traverse_column_meta_pbs([&](const
segment_v2::ColumnMetaPB& meta) {
+ if (meta.unique_id() >= 0 && meta.has_raw_data_bytes()) {
+ raw_bytes_by_uid[meta.unique_id()] = meta.raw_data_bytes();
+ }
+ });
+ ASSERT_TRUE(st.ok()) << st;
+
+ // Key column (non-null INT): full coverage, raw == kNumRows * 4.
+ ASSERT_TRUE(raw_bytes_by_uid.count(1) > 0);
+ EXPECT_EQ(raw_bytes_by_uid[1], kNumRows * sizeof(int32_t));
+
+ // Value column (nullable INT, 90% null): raw_data_bytes only reflects
+ // the non-null payload because ScalarColumnWriter::append_nulls() does
+ // not advance the page builder. So raw == kNonNullCount * 4.
+ //
+ // If merger.cpp used `raw / total_rows` directly the per-row estimate
+ // would be ~0.4 bytes (+1 for null map = 1.4), but the reader actually
+ // allocates 4 bytes for every nested slot (regardless of null-ness)
+ // plus 1 byte of null map = 5 bytes/row. The fixed-width type_size
+ // lower bound is what closes that ~3.5x gap.
+ ASSERT_TRUE(raw_bytes_by_uid.count(2) > 0);
+ EXPECT_EQ(raw_bytes_by_uid[2], kNonNullCount * sizeof(int32_t));
+ EXPECT_LT(raw_bytes_by_uid[2], kNumRows * sizeof(int32_t));
+}
+
} // namespace vectorized
} // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
index 2f619a4cc53..c2e8532a684 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
@@ -394,13 +394,16 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback implements DeleteJ
long timeoutMs = getTimeoutMs();
boolean ok = countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
if (ok) {
+ checkAndUpdateQuorum();
+ if (state == DeleteState.QUORUM_FINISHED || state ==
DeleteState.FINISHED) {
+ return;
+ }
if (!countDownLatch.getStatus().ok()) {
// encounter some errors that don't need to retry, abort
directly
LOG.warn("delete job failed, errmsg={}",
countDownLatch.getStatus().getErrorMsg());
throw new UserException(String.format("delete job failed,
errmsg:%s",
countDownLatch.getStatus().getErrorMsg()));
}
- return;
}
//handle failure
@@ -420,19 +423,6 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback implements DeleteJ
throw new UserException(String.format("delete job timeout,
timeout(ms):%s, msg:%s", timeoutMs, errMsg));
case QUORUM_FINISHED:
case FINISHED:
- long nowQuorumTimeMs = System.currentTimeMillis();
- long endQuorumTimeoutMs = nowQuorumTimeMs + timeoutMs / 2;
- // if job's state is quorum_finished then wait for a period of
time and commit it.
- while (state == DeleteState.QUORUM_FINISHED
- && endQuorumTimeoutMs > nowQuorumTimeMs) {
- checkAndUpdateQuorum();
- Thread.sleep(1000);
- nowQuorumTimeMs = System.currentTimeMillis();
- if (LOG.isDebugEnabled()) {
- LOG.debug("wait for quorum finished delete job: {},
txn id: {}",
- id, transactionId);
- }
- }
break;
default:
throw new IllegalStateException("wrong delete job state: " +
state.name());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
index cb06f4d27f6..1862df155c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
@@ -224,6 +224,12 @@ public class PushTask extends AgentTask {
}
}
+ @Override
+ public void failedWithMsg(String errMsg) {
+ super.failedWithMsg(errMsg);
+ countDownLatchWithStatus(getBackendId(), getTabletId(), new
Status(TStatusCode.CANCELLED, errMsg));
+ }
+
// call this always means one of tasks is failed. count down to zero to
finish entire task
public void countDownToZero(TStatusCode code, String errMsg) {
if (this.latch != null) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteJobTest.java
new file mode 100644
index 00000000000..d1cf2a2f616
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteJobTest.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.common.MarkedCountDownLatch;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TStorageMedium;
+
+import com.google.common.collect.Lists;
+import mockit.Mock;
+import mockit.MockUp;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class DeleteJobTest {
+ private static final long DB_ID = 1L;
+ private static final long TABLE_ID = 2L;
+ private static final long PARTITION_ID = 3L;
+ private static final long TABLET_ID = 4L;
+
+ private final Database database = new Database(DB_ID, "db");
+ private final InternalCatalog catalog =
Deencapsulation.newInstance(InternalCatalog.class);
+ private final TabletInvertedIndex invertedIndex = new
TabletInvertedIndex() {
+ @Override
+ public List<Replica> getReplicas(Long tabletId) {
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public void deleteTablet(long tabletId) {
+ }
+
+ @Override
+ public void addReplica(long tabletId, Replica replica) {
+ }
+
+ @Override
+ public void deleteReplica(long tabletId, long backendId) {
+ }
+
+ @Override
+ public Replica getReplica(long tabletId, long backendId) {
+ return null;
+ }
+
+ @Override
+ public List<Replica> getReplicasByTabletId(long tabletId) {
+ return Lists.newArrayList();
+ }
+ };
+
+ @Before
+ public void setUp() {
+ invertedIndex.addTablet(TABLET_ID, new TabletMeta(DB_ID, TABLE_ID,
PARTITION_ID, 5L, 6, TStorageMedium.HDD));
+
+ new MockUp<Env>() {
+ @Mock
+ public InternalCatalog getCurrentInternalCatalog() {
+ return catalog;
+ }
+
+ @Mock
+ public TabletInvertedIndex getCurrentInvertedIndex() {
+ return invertedIndex;
+ }
+ };
+
+ new MockUp<InternalCatalog>() {
+ @Mock
+ public Database getDbOrMetaException(long dbId) {
+ return database;
+ }
+ };
+ }
+
+ @Test
+ public void testAwaitIgnoresFailureStatusAfterQuorumReached() throws
Exception {
+ MarkedCountDownLatch<Long, Long> latch = new
MarkedCountDownLatch<Long, Long>(1);
+ latch.addMark(1L, TABLET_ID);
+ latch.markedCountDownWithStatus(1L, TABLET_ID, new
Status(TStatusCode.INTERNAL_ERROR, "too many versions"));
+
+ DeleteJob deleteJob = newDeleteJob((short) 3, latch);
+ deleteJob.addFinishedReplica(PARTITION_ID, TABLET_ID,
+ new Replica(11L, 21L, Replica.ReplicaState.NORMAL, 1L, 6));
+ deleteJob.addFinishedReplica(PARTITION_ID, TABLET_ID,
+ new Replica(12L, 22L, Replica.ReplicaState.NORMAL, 1L, 6));
+
+ deleteJob.await();
+
+ Assert.assertEquals(DeleteJob.DeleteState.QUORUM_FINISHED,
deleteJob.getState());
+ }
+
+ @Test
+ public void testAwaitReturnsFailureStatusWhenQuorumNotReached() {
+ MarkedCountDownLatch<Long, Long> latch = new
MarkedCountDownLatch<Long, Long>(1);
+ latch.addMark(1L, TABLET_ID);
+ latch.markedCountDownWithStatus(1L, TABLET_ID, new
Status(TStatusCode.INTERNAL_ERROR, "too many versions"));
+
+ DeleteJob deleteJob = newDeleteJob((short) 1, latch);
+
+ try {
+ deleteJob.await();
+ Assert.fail("delete job should fail when quorum is not reached");
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("too many versions"));
+ Assert.assertEquals(DeleteJob.DeleteState.UN_QUORUM,
deleteJob.getState());
+ } catch (Exception e) {
+ Assert.fail("unexpected exception: " + e.getMessage());
+ }
+ }
+
+ private DeleteJob newDeleteJob(short replicaNum,
MarkedCountDownLatch<Long, Long> latch) {
+ DeleteInfo deleteInfo = new DeleteInfo(DB_ID, TABLE_ID, "tbl",
Collections.emptyList(),
+ false, Lists.newArrayList(PARTITION_ID),
Lists.newArrayList("p1"));
+ Map<Long, Short> partitionReplicaNum =
Collections.singletonMap(PARTITION_ID, replicaNum);
+ DeleteJob deleteJob = new DeleteJob(100L, 200L, "label",
partitionReplicaNum, deleteInfo);
+ deleteJob.setCountDownLatch(latch);
+ Set<Long> totalTablets = Deencapsulation.getField(deleteJob,
"totalTablets");
+ totalTablets.add(TABLET_ID);
+ return deleteJob;
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/master/MasterImplDeleteTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/master/MasterImplDeleteTaskTest.java
new file mode 100644
index 00000000000..3bf846b05f9
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/master/MasterImplDeleteTaskTest.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.master;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.MarkedCountDownLatch;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.PushTask;
+import org.apache.doris.thrift.TBackend;
+import org.apache.doris.thrift.TFinishTaskRequest;
+import org.apache.doris.thrift.TPriority;
+import org.apache.doris.thrift.TPushType;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TTaskType;
+
+import com.google.common.collect.Lists;
+import mockit.Mock;
+import mockit.MockUp;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MasterImplDeleteTaskTest {
+ private static final long BACKEND_ID = 10000L;
+ private static final long DB_ID = 20000L;
+ private static final long TABLE_ID = 30000L;
+ private static final long PARTITION_ID = 40000L;
+ private static final long INDEX_ID = 50000L;
+ private static final long TABLET_ID = 60000L;
+ private static final long REPLICA_ID = 70000L;
+ private static final long TRANSACTION_ID = 80000L;
+ private static final long SIGNATURE = 90000L;
+ private static final String HOST = "127.0.0.1";
+ private static final int HEARTBEAT_PORT = 9050;
+ private static final int BE_PORT = 9060;
+
+ private MasterImpl masterImpl;
+
+ @Before
+ public void setUp() {
+ AgentTaskQueue.clearAllTasks();
+
+ SystemInfoService systemInfoService = new SystemInfoService();
+ Backend backend = new Backend(BACKEND_ID, HOST, HEARTBEAT_PORT);
+ backend.setBePort(BE_PORT);
+ systemInfoService.addBackend(backend);
+
+ new MockUp<Env>() {
+ @Mock
+ public SystemInfoService getCurrentSystemInfo() {
+ return systemInfoService;
+ }
+ };
+
+ new MockUp<ReportHandler>() {
+ @Mock
+ public void start() {
+ }
+ };
+
+ masterImpl = new MasterImpl();
+ }
+
+ @After
+ public void tearDown() {
+ AgentTaskQueue.clearAllTasks();
+ }
+
+ @Test
+ public void testDeletePushGenericFailureCountsDownSingleMark() {
+ MarkedCountDownLatch<Long, Long> latch = new
MarkedCountDownLatch<Long, Long>(2);
+ latch.addMark(BACKEND_ID, TABLET_ID);
+ latch.addMark(BACKEND_ID + 1, TABLET_ID + 1);
+
+ PushTask pushTask = newDeletePushTask(latch);
+ AgentTaskQueue.addTask(pushTask);
+
+
masterImpl.finishTask(newFinishTaskRequest(TStatusCode.INTERNAL_ERROR));
+
+ Assert.assertEquals(1, latch.getCount());
+ Assert.assertEquals(TStatusCode.INTERNAL_ERROR,
latch.getStatus().getErrorCode());
+ Assert.assertEquals(1, pushTask.getFailedTimes());
+ Assert.assertNull(AgentTaskQueue.getTask(BACKEND_ID,
TTaskType.REALTIME_PUSH, SIGNATURE));
+ }
+
+ @Test
+ public void testDeletePushInvalidArgumentCountsDownToZero() {
+ MarkedCountDownLatch<Long, Long> latch = new
MarkedCountDownLatch<Long, Long>(2);
+ latch.addMark(BACKEND_ID, TABLET_ID);
+ latch.addMark(BACKEND_ID + 1, TABLET_ID + 1);
+
+ PushTask pushTask = newDeletePushTask(latch);
+ AgentTaskQueue.addTask(pushTask);
+
+
masterImpl.finishTask(newFinishTaskRequest(TStatusCode.INVALID_ARGUMENT));
+
+ Assert.assertEquals(0, latch.getCount());
+ Assert.assertEquals(TStatusCode.INVALID_ARGUMENT,
latch.getStatus().getErrorCode());
+ Assert.assertEquals(1, pushTask.getFailedTimes());
+ Assert.assertNull(AgentTaskQueue.getTask(BACKEND_ID,
TTaskType.REALTIME_PUSH, SIGNATURE));
+ }
+
+ @Test
+ public void testDeletePushFailedWithMsgKeepsFailureStatus() {
+ MarkedCountDownLatch<Long, Long> latch = new
MarkedCountDownLatch<Long, Long>(1);
+ latch.addMark(BACKEND_ID, TABLET_ID);
+
+ PushTask pushTask = newDeletePushTask(latch);
+ pushTask.failedWithMsg("submit failed");
+
+ Assert.assertEquals(0, latch.getCount());
+ Assert.assertEquals(TStatusCode.CANCELLED,
latch.getStatus().getErrorCode());
+ Assert.assertEquals("submit failed", latch.getStatus().getErrorMsg());
+ }
+
+ private PushTask newDeletePushTask(MarkedCountDownLatch<Long, Long> latch)
{
+ PushTask pushTask = new PushTask(null, BACKEND_ID, DB_ID, TABLE_ID,
PARTITION_ID, INDEX_ID,
+ TABLET_ID, REPLICA_ID, 1, 1L, null, 0, 60, -1,
TPushType.DELETE, null,
+ false, TPriority.NORMAL, TTaskType.REALTIME_PUSH,
TRANSACTION_ID, SIGNATURE,
+ null, null, null, 0, null);
+ pushTask.setCountDownLatch(latch);
+ return pushTask;
+ }
+
+ private TFinishTaskRequest newFinishTaskRequest(TStatusCode statusCode) {
+ TStatus taskStatus = new TStatus(statusCode);
+ taskStatus.setErrorMsgs(Lists.newArrayList("delete failed"));
+ TBackend tBackend = new TBackend(HOST, BE_PORT, 0);
+ TFinishTaskRequest request = new TFinishTaskRequest(tBackend,
TTaskType.REALTIME_PUSH, SIGNATURE, taskStatus);
+ request.setReportVersion(1L);
+ return request;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]