This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 65123a9484e branch-4.0: [fix](cloud) Delete local rowsets before
add_rowsets in cloud schema change #62256 (#62310)
65123a9484e is described below
commit 65123a9484e201167af3ee117fe3ef0de5d62f0c
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri May 15 21:12:42 2026 -0700
branch-4.0: [fix](cloud) Delete local rowsets before add_rowsets in cloud
schema change #62256 (#62310)
Cherry-picked from #62256
Co-authored-by: bobhan1 <[email protected]>
---
be/src/cloud/cloud_schema_change_job.cpp | 23 +++
be/src/cloud/cloud_tablet.cpp | 28 ++++
be/src/cloud/cloud_tablet.h | 7 +
be/test/cloud/cloud_tablet_test.cpp | 248 +++++++++++++++++++++++++++++++
4 files changed, 306 insertions(+)
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index a3c9fd951ae..711bc158405 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -24,6 +24,7 @@
#include <memory>
#include <mutex>
#include <random>
+#include <ranges>
#include <thread>
#include "cloud/cloud_meta_mgr.h"
@@ -540,6 +541,28 @@ Status
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
// during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in
another thread
std::unique_lock lock {_new_tablet->get_sync_meta_lock()};
std::unique_lock wlock(_new_tablet->get_header_lock());
+ // Mirror MS behavior: delete rowsets in [2, alter_version] before
adding
+ // SC output rowsets to avoid stale compaction rowsets remaining
visible.
+ {
+ int64_t alter_ver = sc_job->alter_version();
+ std::vector<RowsetSharedPtr> to_delete;
+ for (auto& [v, rs] : _new_tablet->rowset_map()) {
+ if (v.first >= 2 && v.second <= alter_ver) {
+ to_delete.push_back(rs);
+ }
+ }
+ if (!to_delete.empty()) {
+ LOG_INFO(
+ "schema change: delete {} local rowsets in [2, {}]
before adding SC "
+ "output, tablet_id={}, versions=[{}]",
+ to_delete.size(), alter_ver, _new_tablet->tablet_id(),
+ fmt::join(to_delete | std::views::transform([](const
auto& rs) {
+ return rs->version().to_string();
+ }),
+ ", "));
+ _new_tablet->delete_rowsets_for_schema_change(to_delete,
wlock);
+ }
+ }
_new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock,
false);
_new_tablet->set_cumulative_layer_point(_output_cumulative_point);
_new_tablet->reset_approximate_stats(stats.num_rowsets(),
stats.num_segments(),
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 19bc5b92791..db620468fcb 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -682,6 +682,34 @@ void CloudTablet::delete_rowsets(const
std::vector<RowsetSharedPtr>& to_delete,
_tablet_meta->modify_rs_metas({}, rs_metas, false);
}
+void CloudTablet::delete_rowsets_for_schema_change(const
std::vector<RowsetSharedPtr>& to_delete,
+
std::unique_lock<std::shared_mutex>&) {
+ if (to_delete.empty()) {
+ return;
+ }
+ std::vector<RowsetMetaSharedPtr> rs_metas;
+ rs_metas.reserve(to_delete.size());
+ for (auto&& rs : to_delete) {
+ rs_metas.push_back(rs->rowset_meta());
+ _rs_version_map.erase(rs->version());
+ // Remove edge from version graph so that the greedy capture algorithm
+ // won't prefer the wider stale compaction rowset over individual SC
+ // output rowsets (e.g. [818-822] vs [818],[819],...,[822]).
+ _timestamped_version_tracker.delete_version(rs->version());
+ }
+
+ // Use same_version=true to skip adding to _stale_rs_metas. Do NOT use the
+ // stale tracking mechanism (_stale_rs_version_map /
_stale_version_path_map)
+ // because SC output will create new rowsets with identical version ranges;
+ // a later compaction could put those into stale as well, causing two stale
+ // paths to reference the same version key -- when one path is cleaned
first,
+ // the other hits a DCHECK(false) in delete_expired_stale_rowsets().
+ _tablet_meta->modify_rs_metas({}, rs_metas, true);
+
+ // Schedule for direct cache cleanup. MS has already recycled these
rowsets.
+ add_unused_rowsets(to_delete);
+}
+
uint64_t CloudTablet::delete_expired_stale_rowsets() {
if (config::enable_mow_verbose_log) {
LOG_INFO("begin delete_expired_stale_rowset for tablet={}",
tablet_id());
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index b27bd7c5b55..5def9eabea2 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -159,6 +159,13 @@ public:
void delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
std::unique_lock<std::shared_mutex>& meta_lock);
+ // Like delete_rowsets, but also removes edges from the version graph.
+ // Used by schema change to prevent the greedy capture algorithm from
+ // preferring stale compaction rowsets over individual SC output rowsets.
+ // MUST hold EXCLUSIVE `_meta_lock`.
+ void delete_rowsets_for_schema_change(const std::vector<RowsetSharedPtr>&
to_delete,
+ std::unique_lock<std::shared_mutex>&
meta_lock);
+
// When the tablet is dropped, we need to recycle cached data:
// 1. The data in file cache
// 2. The memory in tablet cache
diff --git a/be/test/cloud/cloud_tablet_test.cpp
b/be/test/cloud/cloud_tablet_test.cpp
index 904dc2e3fdf..356bbc2e040 100644
--- a/be/test/cloud/cloud_tablet_test.cpp
+++ b/be/test/cloud/cloud_tablet_test.cpp
@@ -997,4 +997,252 @@ TEST_F(CloudTabletWarmUpStateTest,
TestWarmedUpOverridesNotWarmedUp) {
EXPECT_TRUE(_tablet->is_rowset_warmed_up(rowset->rowset_id()));
}
+class CloudTabletDeleteRowsetsForSchemaChangeTest : public testing::Test {
+public:
+ CloudTabletDeleteRowsetsForSchemaChangeTest() :
_engine(CloudStorageEngine(EngineOptions {})) {}
+
+ void SetUp() override {
+ _tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5,
TTabletSchema(), 6, {{7, 8}},
+ UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK,
+ TCompressionType::LZ4F));
+ _tablet =
+ std::make_shared<CloudTablet>(_engine,
std::make_shared<TabletMeta>(*_tablet_meta));
+ }
+ void TearDown() override {}
+
+ RowsetSharedPtr create_rowset(Version version) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_rowset_type(BETA_ROWSET);
+ rs_meta->set_version(version);
+ rs_meta->set_rowset_id(_engine.next_rowset_id());
+ RowsetSharedPtr rowset;
+ Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta,
&rowset);
+ if (!st.ok()) {
+ return nullptr;
+ }
+ return rowset;
+ }
+
+protected:
+ TabletMetaSharedPtr _tablet_meta;
+ std::shared_ptr<CloudTablet> _tablet;
+ CloudStorageEngine _engine;
+};
+
+// Simulate the DORIS-25014 scenario:
+// - New tablet has compacted rowset [2-6] from compaction during SC
+// - SC produces individual output rowsets [2],[3],[4],[5],[6]
+// - Without the fix, add_rowsets fails to remove [2-6] because
+// [2].contains([2-6]) = false
+// - With delete_rowsets_for_schema_change, the stale compaction rowset is
+// removed from both _rs_version_map and version graph before add_rowsets
+TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest,
TestSchemaChangeDeletesCompactionRowset) {
+ // Setup: add placeholder [0-1] and compacted rowset [2-6]
+ auto rs_placeholder = create_rowset(Version(0, 1));
+ auto rs_compacted = create_rowset(Version(2, 6));
+ ASSERT_NE(rs_placeholder, nullptr);
+ ASSERT_NE(rs_compacted, nullptr);
+
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ _tablet->add_rowsets({rs_placeholder, rs_compacted}, false, wlock,
false);
+ }
+ // Verify initial state
+ ASSERT_EQ(_tablet->rowset_map().size(), 2);
+ ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 6)));
+
+ // SC produces individual rowsets
+ std::vector<RowsetSharedPtr> sc_output;
+ for (int v = 2; v <= 6; v++) {
+ auto rs = create_rowset(Version(v, v));
+ ASSERT_NE(rs, nullptr);
+ sc_output.push_back(rs);
+ }
+
+ // Simulate delete_rowsets_for_schema_change + add_rowsets
+ int64_t alter_version = 6;
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ // Collect rowsets in [2, alter_version]
+ std::vector<RowsetSharedPtr> to_delete;
+ for (auto& [v, rs] : _tablet->rowset_map()) {
+ if (v.first >= 2 && v.second <= alter_version) {
+ to_delete.push_back(rs);
+ }
+ }
+ ASSERT_EQ(to_delete.size(), 1); // only [2-6]
+ ASSERT_EQ(to_delete[0]->version(), Version(2, 6));
+
+ _tablet->delete_rowsets_for_schema_change(to_delete, wlock);
+
+ // [2-6] should be removed from rs_version_map
+ ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 6)));
+ // Should NOT go to stale (to avoid stale path conflicts), but to
unused
+ ASSERT_FALSE(_tablet->has_stale_rowsets());
+ ASSERT_TRUE(_tablet->need_remove_unused_rowsets());
+
+ _tablet->add_rowsets(std::move(sc_output), false, wlock, false);
+ }
+
+ // Verify: individual SC rowsets are now in rs_version_map
+ ASSERT_EQ(_tablet->rowset_map().size(), 6); // [0-1] + 5 individual
+ for (int v = 2; v <= 6; v++) {
+ ASSERT_TRUE(_tablet->rowset_map().count(Version(v, v)))
+ << "Missing version " << v << "-" << v;
+ }
+ ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 6)));
+
+ // Verify: capture_consistent_versions works correctly (no stale edges)
+ auto versions_result =
_tablet->capture_consistent_versions_unlocked(Version(0, 6), {});
+ ASSERT_TRUE(versions_result.has_value()) << versions_result.error();
+ auto& versions = versions_result.value();
+ ASSERT_EQ(versions.size(), 6); // [0-1] + [2],[3],[4],[5],[6]
+ ASSERT_EQ(versions[0], Version(0, 1));
+ for (int i = 0; i < 5; i++) {
+ ASSERT_EQ(versions[i + 1], Version(2 + i, 2 + i));
+ }
+}
+
+// Test that delete_rowsets_for_schema_change with empty input is a no-op
+TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestEmptyDeleteIsNoop) {
+ auto rs = create_rowset(Version(0, 1));
+ ASSERT_NE(rs, nullptr);
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ _tablet->add_rowsets({rs}, false, wlock, false);
+ }
+ ASSERT_EQ(_tablet->rowset_map().size(), 1);
+
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ _tablet->delete_rowsets_for_schema_change({}, wlock);
+ }
+ ASSERT_EQ(_tablet->rowset_map().size(), 1);
+ ASSERT_FALSE(_tablet->has_stale_rowsets());
+}
+
+// Test with multiple compaction rowsets spanning different version ranges
+TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest,
TestMultipleCompactionRowsets) {
+ auto rs_placeholder = create_rowset(Version(0, 1));
+ auto rs_comp1 = create_rowset(Version(2, 5));
+ auto rs_comp2 = create_rowset(Version(6, 10));
+ auto rs_post = create_rowset(Version(11, 11)); // after alter_version,
should NOT be deleted
+ ASSERT_NE(rs_placeholder, nullptr);
+ ASSERT_NE(rs_comp1, nullptr);
+ ASSERT_NE(rs_comp2, nullptr);
+ ASSERT_NE(rs_post, nullptr);
+
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ _tablet->add_rowsets({rs_placeholder, rs_comp1, rs_comp2, rs_post},
false, wlock, false);
+ }
+ ASSERT_EQ(_tablet->rowset_map().size(), 4);
+
+ // SC output: individual rowsets for versions 2-10
+ std::vector<RowsetSharedPtr> sc_output;
+ for (int v = 2; v <= 10; v++) {
+ auto rs = create_rowset(Version(v, v));
+ ASSERT_NE(rs, nullptr);
+ sc_output.push_back(rs);
+ }
+
+ int64_t alter_version = 10;
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ std::vector<RowsetSharedPtr> to_delete;
+ for (auto& [v, rs] : _tablet->rowset_map()) {
+ if (v.first >= 2 && v.second <= alter_version) {
+ to_delete.push_back(rs);
+ }
+ }
+ ASSERT_EQ(to_delete.size(), 2); // [2-5] and [6-10]
+
+ _tablet->delete_rowsets_for_schema_change(to_delete, wlock);
+
+ // Post-alter rowset should survive
+ ASSERT_TRUE(_tablet->rowset_map().count(Version(11, 11)));
+ ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 5)));
+ ASSERT_FALSE(_tablet->rowset_map().count(Version(6, 10)));
+
+ _tablet->add_rowsets(std::move(sc_output), false, wlock, false);
+ }
+
+ // Verify: [0-1], [2],[3],...,[10], [11-11]
+ ASSERT_EQ(_tablet->rowset_map().size(), 11);
+
+ // Verify capture
+ auto versions_result =
_tablet->capture_consistent_versions_unlocked(Version(0, 11), {});
+ ASSERT_TRUE(versions_result.has_value()) << versions_result.error();
+ auto& versions = versions_result.value();
+ ASSERT_EQ(versions.size(), 11);
+ ASSERT_EQ(versions[0], Version(0, 1));
+ for (int i = 0; i < 9; i++) {
+ ASSERT_EQ(versions[i + 1], Version(2 + i, 2 + i));
+ }
+ ASSERT_EQ(versions[10], Version(11, 11));
+}
+
+// Reproduce the CI crash scenario: SC delete puts rowsets to stale, then
+// compaction creates a new stale path with overlapping version keys. When
+// one stale path is cleaned, the other hits DCHECK(false) because the
+// version is already removed from _stale_rs_version_map.
+// With the fix (bypassing stale tracking), this should not happen.
+TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest,
TestNoStalePathConflictWithCompaction) {
+ // Setup: [0-1] placeholder, [2-6] compaction product during SC
+ auto rs_placeholder = create_rowset(Version(0, 1));
+ auto rs_compacted = create_rowset(Version(2, 6));
+ ASSERT_NE(rs_placeholder, nullptr);
+ ASSERT_NE(rs_compacted, nullptr);
+
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ _tablet->add_rowsets({rs_placeholder, rs_compacted}, false, wlock,
false);
+ }
+
+ // SC output: individual rowsets [2],[3],[4],[5],[6]
+ std::vector<RowsetSharedPtr> sc_output;
+ for (int v = 2; v <= 6; v++) {
+ sc_output.push_back(create_rowset(Version(v, v)));
+ }
+
+ // Step 1: delete_rowsets_for_schema_change + add SC output
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ _tablet->delete_rowsets_for_schema_change({rs_compacted}, wlock);
+ _tablet->add_rowsets(std::move(sc_output), false, wlock, false);
+ }
+ // Stale should be empty — SC delete bypasses stale tracking
+ ASSERT_FALSE(_tablet->has_stale_rowsets());
+
+ // Step 2: compaction merges SC output [2],[3],[4],[5],[6] -> [2-6]
+ auto rs_new_compacted = create_rowset(Version(2, 6));
+ std::vector<RowsetSharedPtr> compaction_input;
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ for (auto& [v, rs] : _tablet->rowset_map()) {
+ if (v.first >= 2 && v.second <= 6) {
+ compaction_input.push_back(rs);
+ }
+ }
+ ASSERT_EQ(compaction_input.size(), 5);
+ // Normal compaction delete_rowsets — this WILL use stale tracking
+ _tablet->delete_rowsets(compaction_input, wlock);
+ _tablet->add_rowsets({rs_new_compacted}, false, wlock, false);
+ }
+ // Now stale has the compaction inputs
+ ASSERT_TRUE(_tablet->has_stale_rowsets());
+
+ // Step 3: delete_expired_stale_rowsets — this is where CI crashed
+ // With old code: stale path from SC and compaction both reference [2-6]
key,
+ // causing DCHECK(false). With fix: only compaction stale path exists, no
conflict.
+ config::tablet_rowset_stale_sweep_time_sec = 0; // expire immediately
+ ASSERT_NO_FATAL_FAILURE(_tablet->delete_expired_stale_rowsets());
+
+ // Verify final state: [0-1] and [2-6] active, no stale left
+ ASSERT_EQ(_tablet->rowset_map().size(), 2);
+ ASSERT_TRUE(_tablet->rowset_map().count(Version(0, 1)));
+ ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 6)));
+ ASSERT_FALSE(_tablet->has_stale_rowsets());
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]