This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch feature/read-uncommitted-phase1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 473aa6a8a69906fa31d673f0640a09a1cdeb1b16
Author: Yongqiang YANG <[email protected]>
AuthorDate: Sat Feb 7 16:51:57 2026 -0800

    [feature](read-uncommitted) Add READ UNCOMMITTED isolation level for 
unpublished rowsets (Phase 1)
    
    Enable queries to read uncommitted but committed-to-storage rowsets via
    SET read_uncommitted = true. Supports DUP_KEYS and UNIQUE_KEYS tables
    in both local and cloud modes. Includes async cross-uncommitted dedup
    for MoW tables with compaction and schema change awareness.
---
 be/src/cloud/cloud_rowset_builder.cpp              |  17 ++
 be/src/cloud/cloud_storage_engine.cpp              |   5 +
 be/src/cloud/cloud_storage_engine.h                |   5 +
 be/src/cloud/cloud_txn_delete_bitmap_cache.cpp     |  10 +
 be/src/olap/base_tablet.cpp                        |   9 +
 be/src/olap/compaction.cpp                         |   8 +
 be/src/olap/rowset_builder.cpp                     |  16 +
 be/src/olap/storage_engine.cpp                     |   5 +
 be/src/olap/storage_engine.h                       |   5 +
 be/src/olap/txn_manager.cpp                        |  13 +
 be/src/olap/uncommitted_rowset_registry.cpp        | 333 +++++++++++++++++++++
 be/src/olap/uncommitted_rowset_registry.h          | 121 ++++++++
 be/src/pipeline/exec/olap_scan_operator.cpp        |  65 ++++
 be/src/runtime/runtime_state.h                     |   4 +
 .../java/org/apache/doris/qe/SessionVariable.java  |  17 +-
 gensrc/thrift/PaloInternalService.thrift           |   2 +
 16 files changed, 633 insertions(+), 2 deletions(-)

diff --git a/be/src/cloud/cloud_rowset_builder.cpp 
b/be/src/cloud/cloud_rowset_builder.cpp
index 8ef15424a3d..b9c528af88e 100644
--- a/be/src/cloud/cloud_rowset_builder.cpp
+++ b/be/src/cloud/cloud_rowset_builder.cpp
@@ -19,6 +19,7 @@
 
 #include "cloud/cloud_meta_mgr.h"
 #include "cloud/cloud_storage_engine.h"
+#include "olap/uncommitted_rowset_registry.h"
 #include "cloud/cloud_tablet.h"
 #include "cloud/cloud_tablet_mgr.h"
 #include "olap/storage_policy.h"
@@ -157,6 +158,22 @@ Status CloudRowsetBuilder::set_txn_related_delete_bitmap() 
{
                 _req.txn_id, _tablet->tablet_id(), _delete_bitmap, 
*_rowset_ids, _rowset,
                 _req.txn_expiration, _partial_update_info);
     }
+
+    // Register uncommitted rowset for READ UNCOMMITTED visibility
+    if (auto* registry = _engine.uncommitted_rowset_registry()) {
+        auto entry = std::make_shared<UncommittedRowsetEntry>();
+        entry->rowset = _rowset;
+        entry->transaction_id = _req.txn_id;
+        entry->partition_id = _req.partition_id;
+        entry->tablet_id = _tablet->tablet_id();
+        entry->unique_key_merge_on_write = 
_tablet->enable_unique_key_merge_on_write();
+        entry->creation_time = _rowset->creation_time();
+        if (entry->unique_key_merge_on_write && _delete_bitmap) {
+            entry->committed_delete_bitmap = 
std::make_shared<DeleteBitmap>(*_delete_bitmap);
+        }
+        registry->register_rowset(std::move(entry));
+    }
+
     return Status::OK();
 }
 #include "common/compile_check_end.h"
diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index 126e9701a79..177c56e2f31 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -58,6 +58,7 @@
 #include "olap/cumulative_compaction_time_series_policy.h"
 #include "olap/memtable_flush_executor.h"
 #include "olap/storage_policy.h"
+#include "olap/uncommitted_rowset_registry.h"
 #include "runtime/memory/cache_manager.h"
 #include "util/parse_util.h"
 #include "util/time.h"
@@ -231,6 +232,10 @@ Status CloudStorageEngine::open() {
 
     _cloud_snapshot_mgr = std::make_unique<CloudSnapshotMgr>(*this);
 
+    _uncommitted_rowset_registry = 
std::make_unique<UncommittedRowsetRegistry>();
+    RETURN_IF_ERROR(_uncommitted_rowset_registry->init(
+            std::max(1, config::calc_delete_bitmap_max_thread / 2)));
+
     RETURN_NOT_OK_STATUS_WITH_WARN(
             
init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path),
             "init StreamLoadRecorder failed");
diff --git a/be/src/cloud/cloud_storage_engine.h 
b/be/src/cloud/cloud_storage_engine.h
index 9c27e164bba..3a435e10d90 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -51,6 +51,7 @@ class CloudWarmUpManager;
 class CloudCompactionStopToken;
 class CloudSnapshotMgr;
 class CloudIndexChangeCompaction;
+class UncommittedRowsetRegistry;
 
 class CloudStorageEngine final : public BaseStorageEngine {
 public:
@@ -91,6 +92,9 @@ public:
     CloudTabletMgr& tablet_mgr() const { return *_tablet_mgr; }
 
     CloudSnapshotMgr& cloud_snapshot_mgr() { return *_cloud_snapshot_mgr; }
+    UncommittedRowsetRegistry* uncommitted_rowset_registry() {
+        return _uncommitted_rowset_registry.get();
+    }
 
     CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() const { return 
*_txn_delete_bitmap_cache; }
 
@@ -225,6 +229,7 @@ private:
     std::unique_ptr<ThreadPool> _sync_load_for_tablets_thread_pool;
     std::unique_ptr<ThreadPool> _warmup_cache_async_thread_pool;
     std::unique_ptr<CloudSnapshotMgr> _cloud_snapshot_mgr;
+    std::unique_ptr<UncommittedRowsetRegistry> _uncommitted_rowset_registry;
 
     // FileSystem with latest shared storage info, new data will be written to 
this fs.
     mutable std::mutex _latest_fs_mtx;
diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp 
b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
index ab04f4eff16..de3ecc56b10 100644
--- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
+++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
@@ -29,6 +29,7 @@
 #include "olap/olap_common.h"
 #include "olap/tablet_meta.h"
 #include "olap/txn_manager.h"
+#include "olap/uncommitted_rowset_registry.h"
 
 namespace doris {
 
@@ -264,6 +265,10 @@ void 
CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
                     .tag("tablet_id", iter->second.tablet_id);
             _empty_rowset_markers.erase(marker_iter);
         }
+        // Unregister from UncommittedRowsetRegistry on expiration
+        if (auto* registry = get_uncommitted_rowset_registry()) {
+            registry->unregister_rowset(iter->second.tablet_id, 
iter->second.txn_id);
+        }
         _expiration_txn.erase(iter);
     }
 }
@@ -283,6 +288,11 @@ void 
CloudTxnDeleteBitmapCache::remove_unused_tablet_txn_info(TTransactionId tra
         erase(cache_key);
         _txn_map.erase(txn_key);
     }
+
+    // Unregister from UncommittedRowsetRegistry
+    if (auto* registry = get_uncommitted_rowset_registry()) {
+        registry->unregister_rowset(tablet_id, transaction_id);
+    }
 }
 
 void CloudTxnDeleteBitmapCache::mark_empty_rowset(TTransactionId txn_id, 
int64_t tablet_id,
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 241e531462d..0a6e43506c4 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -48,6 +48,7 @@
 #include "olap/rowset/segment_v2/column_reader.h"
 #include "olap/tablet_fwd.h"
 #include "olap/txn_manager.h"
+#include "olap/uncommitted_rowset_registry.h"
 #include "service/point_query_executor.h"
 #include "util/bvar_helper.h"
 #include "util/debug_points.h"
@@ -169,6 +170,14 @@ Status BaseTablet::set_tablet_state(TabletState state) {
                 "could not change tablet state from shutdown to {}", state);
     }
     _tablet_meta->set_tablet_state(state);
+
+    // Notify UncommittedRowsetRegistry when tablet leaves RUNNING state
+    if (state != TABLET_RUNNING) {
+        if (auto* registry = get_uncommitted_rowset_registry()) {
+            registry->on_tablet_state_change(tablet_id(), state);
+        }
+    }
+
     return Status::OK();
 }
 
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 4f3ed34939d..30e7ca86df2 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -69,6 +69,7 @@
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
 #include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
 #include "olap/storage_engine.h"
+#include "olap/uncommitted_rowset_registry.h"
 #include "olap/storage_policy.h"
 #include "olap/tablet.h"
 #include "olap/tablet_meta.h"
@@ -1359,6 +1360,13 @@ Status CompactionMixin::modify_rowsets() {
             tablet()->merge_delete_bitmap(output_rowset_delete_bitmap);
             RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, 
_input_rowsets, true));
         }
+
+        // Notify UncommittedRowsetRegistry that published rowsets changed.
+        // Cross-uncommitted delete bitmaps may reference rows that moved due 
to
+        // compaction, so they must be re-computed.
+        if (auto* registry = _engine.uncommitted_rowset_registry()) {
+            registry->on_compaction_completed(_tablet->tablet_id());
+        }
     } else {
         std::lock_guard<std::shared_mutex> wrlock(_tablet->get_header_lock());
         SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index f250f3bef10..fffa8c49f7a 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -34,6 +34,7 @@
 #include "io/fs/file_system.h"
 #include "io/fs/file_writer.h" // IWYU pragma: keep
 #include "olap/calc_delete_bitmap_executor.h"
+#include "olap/uncommitted_rowset_registry.h"
 #include "olap/olap_define.h"
 #include "olap/partial_update_info.h"
 #include "olap/rowset/beta_rowset.h"
@@ -347,6 +348,21 @@ Status RowsetBuilder::commit_txn() {
                 _delete_bitmap, *_rowset_ids, _partial_update_info);
     }
 
+    // Register uncommitted rowset for READ UNCOMMITTED visibility
+    if (auto* registry = _engine.uncommitted_rowset_registry()) {
+        auto entry = std::make_shared<UncommittedRowsetEntry>();
+        entry->rowset = _rowset;
+        entry->transaction_id = _req.txn_id;
+        entry->partition_id = _req.partition_id;
+        entry->tablet_id = tablet()->tablet_id();
+        entry->unique_key_merge_on_write = 
_tablet->enable_unique_key_merge_on_write();
+        entry->creation_time = _rowset->creation_time();
+        if (entry->unique_key_merge_on_write && _delete_bitmap) {
+            entry->committed_delete_bitmap = 
std::make_shared<DeleteBitmap>(*_delete_bitmap);
+        }
+        registry->register_rowset(std::move(entry));
+    }
+
     _is_committed = true;
     return Status::OK();
 }
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 3a64a82d1ab..21462f44cc5 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -71,6 +71,7 @@
 #include "olap/tablet_meta.h"
 #include "olap/tablet_meta_manager.h"
 #include "olap/txn_manager.h"
+#include "olap/uncommitted_rowset_registry.h"
 #include "runtime/client_cache.h"
 #include "runtime/stream_load/stream_load_recorder.h"
 #include "util/doris_metrics.h"
@@ -313,6 +314,10 @@ Status StorageEngine::_open() {
                     ? config::calc_delete_bitmap_for_load_max_thread
                     : std::max(1, CpuInfo::num_cores() / 2));
 
+    _uncommitted_rowset_registry = 
std::make_unique<UncommittedRowsetRegistry>();
+    RETURN_IF_ERROR(_uncommitted_rowset_registry->init(
+            std::max(1, config::calc_delete_bitmap_max_thread / 2)));
+
     _parse_default_rowset_type();
 
     return Status::OK();
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 335f4a45f21..05d081d0b69 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -73,6 +73,7 @@ class ReportWorker;
 class CreateTabletRRIdxCache;
 struct DirInfo;
 class SnapshotManager;
+class UncommittedRowsetRegistry;
 class WorkloadGroup;
 
 using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>;
@@ -311,6 +312,9 @@ public:
     TabletManager* tablet_manager() { return _tablet_manager.get(); }
     TxnManager* txn_manager() { return _txn_manager.get(); }
     SnapshotManager* snapshot_mgr() { return _snapshot_mgr.get(); }
+    UncommittedRowsetRegistry* uncommitted_rowset_registry() {
+        return _uncommitted_rowset_registry.get();
+    }
     // Rowset garbage collection helpers
     bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id);
     PendingRowsetSet& pending_local_rowsets() { return _pending_local_rowsets; 
}
@@ -533,6 +537,7 @@ private:
 
     std::unique_ptr<TabletManager> _tablet_manager;
     std::unique_ptr<TxnManager> _txn_manager;
+    std::unique_ptr<UncommittedRowsetRegistry> _uncommitted_rowset_registry;
 
     // Used to control the migration from segment_v1 to segment_v2, can be 
deleted in futrue.
     // Type of new loaded data
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index d760e891527..c0b20167e86 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -46,6 +46,7 @@
 #include "olap/schema_change.h"
 #include "olap/segment_loader.h"
 #include "olap/storage_engine.h"
+#include "olap/uncommitted_rowset_registry.h"
 #include "olap/tablet_manager.h"
 #include "olap/tablet_meta.h"
 #include "olap/tablet_meta_manager.h"
@@ -614,6 +615,12 @@ Status TxnManager::publish_txn(OlapMeta* meta, 
TPartitionId partition_id,
     stats->lock_wait_time_us += MonotonicMicros() - t6;
     _remove_txn_tablet_info_unlocked(partition_id, transaction_id, tablet_id, 
tablet_uid, txn_lock,
                                      wrlock);
+
+    // Unregister from UncommittedRowsetRegistry on publish
+    if (auto* registry = _engine.uncommitted_rowset_registry()) {
+        registry->unregister_rowset(tablet_id, transaction_id);
+    }
+
     VLOG_NOTICE << "publish txn successfully."
                 << " partition_id: " << key.first << ", txn_id: " << key.second
                 << ", tablet_id: " << tablet_info.tablet_id << ", rowsetid: " 
<< rowset->rowset_id()
@@ -736,6 +743,12 @@ Status TxnManager::delete_txn(OlapMeta* meta, TPartitionId 
partition_id,
         g_tablet_txn_info_txn_partitions_count << -1;
         _clear_txn_partition_map_unlocked(transaction_id, partition_id);
     }
+
+    // Unregister from UncommittedRowsetRegistry on delete
+    if (auto* registry = _engine.uncommitted_rowset_registry()) {
+        registry->unregister_rowset(tablet_id, transaction_id);
+    }
+
     return st;
 }
 
diff --git a/be/src/olap/uncommitted_rowset_registry.cpp 
b/be/src/olap/uncommitted_rowset_registry.cpp
new file mode 100644
index 00000000000..b96aedd2af2
--- /dev/null
+++ b/be/src/olap/uncommitted_rowset_registry.cpp
@@ -0,0 +1,333 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/uncommitted_rowset_registry.h"
+
+#include <algorithm>
+
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/config.h"
+#include "common/logging.h"
+#include "olap/base_tablet.h"
+#include "olap/calc_delete_bitmap_executor.h"
+#include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/segment_v2/segment.h"
+#include "olap/storage_engine.h"
+#include "runtime/exec_env.h"
+
+namespace doris {
+
+UncommittedRowsetRegistry::UncommittedRowsetRegistry() = default;
+
+UncommittedRowsetRegistry::~UncommittedRowsetRegistry() {
+    if (_dedup_thread_pool) {
+        _dedup_thread_pool->shutdown();
+    }
+}
+
+Status UncommittedRowsetRegistry::init(int dedup_threads) {
+    return ThreadPoolBuilder("UncommittedDedupPool")
+            .set_min_threads(1)
+            .set_max_threads(dedup_threads)
+            .build(&_dedup_thread_pool);
+}
+
+std::shared_ptr<std::mutex> UncommittedRowsetRegistry::_get_tablet_dedup_mutex(
+        Shard& shard, int64_t tablet_id) {
+    auto it = shard.tablet_dedup_mutex.find(tablet_id);
+    if (it == shard.tablet_dedup_mutex.end()) {
+        auto mtx = std::make_shared<std::mutex>();
+        shard.tablet_dedup_mutex[tablet_id] = mtx;
+        return mtx;
+    }
+    return it->second;
+}
+
+void 
UncommittedRowsetRegistry::register_rowset(std::shared_ptr<UncommittedRowsetEntry>
 entry) {
+    int64_t tablet_id = entry->tablet_id;
+    auto& shard = _get_shard(tablet_id);
+
+    {
+        std::lock_guard wlock(shard.lock);
+        auto& entries = shard.entries[tablet_id];
+        entries.push_back(entry);
+        // Keep entries sorted by creation_time for correct dedup ordering
+        std::sort(entries.begin(), entries.end(),
+                  [](const auto& a, const auto& b) {
+                      return a->creation_time < b->creation_time;
+                  });
+    }
+
+    // For DUP_KEYS tables, no dedup needed — mark ready immediately
+    if (!entry->unique_key_merge_on_write) {
+        entry->dedup_ready.store(true, std::memory_order_release);
+        return;
+    }
+
+    _submit_dedup_task(tablet_id, entry);
+}
+
+void UncommittedRowsetRegistry::unregister_rowset(int64_t tablet_id, int64_t 
transaction_id) {
+    auto& shard = _get_shard(tablet_id);
+    std::lock_guard wlock(shard.lock);
+
+    auto it = shard.entries.find(tablet_id);
+    if (it == shard.entries.end()) {
+        return;
+    }
+
+    auto& entries = it->second;
+    entries.erase(std::remove_if(entries.begin(), entries.end(),
+                                  [transaction_id](const auto& e) {
+                                      return e->transaction_id == 
transaction_id;
+                                  }),
+                  entries.end());
+
+    if (entries.empty()) {
+        shard.entries.erase(it);
+        shard.tablet_dedup_mutex.erase(tablet_id);
+    }
+}
+
+void UncommittedRowsetRegistry::get_ready_rowsets(
+        int64_t tablet_id, 
std::vector<std::shared_ptr<UncommittedRowsetEntry>>* result) {
+    auto& shard = _get_shard(tablet_id);
+    std::shared_lock rlock(shard.lock);
+
+    auto it = shard.entries.find(tablet_id);
+    if (it == shard.entries.end()) {
+        return;
+    }
+
+    for (const auto& entry : it->second) {
+        if (entry->dedup_ready.load(std::memory_order_acquire)) {
+            result->push_back(entry);
+        }
+    }
+}
+
+void UncommittedRowsetRegistry::on_compaction_completed(int64_t tablet_id) {
+    auto& shard = _get_shard(tablet_id);
+    bool has_mow_entries = false;
+
+    {
+        std::shared_lock rlock(shard.lock);
+        auto it = shard.entries.find(tablet_id);
+        if (it == shard.entries.end()) {
+            return;
+        }
+
+        // Invalidate all cross-delete bitmaps and mark not ready
+        for (auto& entry : it->second) {
+            if (entry->unique_key_merge_on_write) {
+                entry->dedup_ready.store(false, std::memory_order_release);
+                entry->cross_delete_bitmap.reset();
+                has_mow_entries = true;
+            }
+        }
+    }
+
+    // Re-compute dedup for all MoW entries
+    if (has_mow_entries) {
+        _recompute_all_dedup(tablet_id);
+    }
+}
+
+void UncommittedRowsetRegistry::on_tablet_state_change(int64_t tablet_id,
+                                                        TabletState new_state) 
{
+    if (new_state == TABLET_RUNNING) {
+        return; // Only clear on non-running states
+    }
+
+    auto& shard = _get_shard(tablet_id);
+    std::lock_guard wlock(shard.lock);
+    shard.entries.erase(tablet_id);
+    shard.tablet_dedup_mutex.erase(tablet_id);
+}
+
+void UncommittedRowsetRegistry::_submit_dedup_task(
+        int64_t tablet_id, std::shared_ptr<UncommittedRowsetEntry> entry) {
+    if (!_dedup_thread_pool) {
+        // No thread pool available, mark ready without cross-bitmap
+        entry->dedup_ready.store(true, std::memory_order_release);
+        return;
+    }
+
+    // Get the per-tablet dedup mutex
+    std::shared_ptr<std::mutex> dedup_mutex;
+    {
+        auto& shard = _get_shard(tablet_id);
+        std::lock_guard wlock(shard.lock);
+        dedup_mutex = _get_tablet_dedup_mutex(shard, tablet_id);
+    }
+
+    // Collect earlier uncommitted rowsets for this tablet
+    std::vector<RowsetSharedPtr> earlier_rowsets;
+    {
+        auto& shard = _get_shard(tablet_id);
+        std::shared_lock rlock(shard.lock);
+        auto it = shard.entries.find(tablet_id);
+        if (it != shard.entries.end()) {
+            for (const auto& other : it->second) {
+                if (other->creation_time < entry->creation_time &&
+                    other->unique_key_merge_on_write) {
+                    earlier_rowsets.push_back(other->rowset);
+                }
+            }
+        }
+    }
+
+    // If no earlier uncommitted MoW rowsets, no cross-dedup needed
+    if (earlier_rowsets.empty()) {
+        entry->dedup_ready.store(true, std::memory_order_release);
+        return;
+    }
+
+    // Submit async task to compute cross-uncommitted delete bitmap
+    auto st = _dedup_thread_pool->submit_func(
+            [this, tablet_id, entry, earlier_rowsets = 
std::move(earlier_rowsets),
+             dedup_mutex]() mutable {
+                // Serialize dedup per tablet
+                std::lock_guard tablet_lock(*dedup_mutex);
+
+                // Check if entry was unregistered while waiting
+                {
+                    auto& shard = _get_shard(tablet_id);
+                    std::shared_lock rlock(shard.lock);
+                    auto it = shard.entries.find(tablet_id);
+                    if (it == shard.entries.end()) {
+                        return;
+                    }
+                    bool found = false;
+                    for (const auto& e : it->second) {
+                        if (e->transaction_id == entry->transaction_id) {
+                            found = true;
+                            break;
+                        }
+                    }
+                    if (!found) {
+                        return;
+                    }
+                }
+
+                // Get tablet for bitmap computation
+                auto tablet_result =
+                        
ExecEnv::GetInstance()->storage_engine().get_tablet(tablet_id);
+                if (!tablet_result.has_value()) {
+                    LOG(WARNING) << "Failed to get tablet " << tablet_id
+                                 << " for cross-uncommitted dedup, marking 
ready without "
+                                    "cross-bitmap";
+                    entry->dedup_ready.store(true, std::memory_order_release);
+                    return;
+                }
+                auto tablet = tablet_result.value();
+
+                // Load segments from the rowset
+                auto* beta_rowset = 
dynamic_cast<BetaRowset*>(entry->rowset.get());
+                if (!beta_rowset) {
+                    entry->dedup_ready.store(true, std::memory_order_release);
+                    return;
+                }
+                std::vector<segment_v2::SegmentSharedPtr> segments;
+                auto load_st = beta_rowset->load_segments(&segments);
+                if (!load_st.ok()) {
+                    LOG(WARNING) << "Failed to load segments for 
cross-uncommitted dedup, "
+                                    "tablet_id="
+                                 << tablet_id << " txn_id=" << 
entry->transaction_id
+                                 << ": " << load_st;
+                    entry->dedup_ready.store(true, std::memory_order_release);
+                    return;
+                }
+
+                if (segments.empty()) {
+                    entry->dedup_ready.store(true, std::memory_order_release);
+                    return;
+                }
+
+                // Use CalcDeleteBitmapExecutor for the actual computation
+                auto* calc_executor =
+                        
ExecEnv::GetInstance()->storage_engine().calc_delete_bitmap_executor();
+                if (!calc_executor) {
+                    entry->dedup_ready.store(true, std::memory_order_release);
+                    return;
+                }
+                auto token = calc_executor->create_token();
+
+                // Compute cross-uncommitted delete bitmap
+                auto cross_bitmap = std::make_shared<DeleteBitmap>(tablet_id);
+                auto calc_st = BaseTablet::calc_delete_bitmap(
+                        tablet, entry->rowset, segments, earlier_rowsets, 
cross_bitmap,
+                        DeleteBitmap::TEMP_VERSION_COMMON, token.get());
+                if (calc_st.ok()) {
+                    calc_st = token->wait();
+                }
+
+                if (calc_st.ok()) {
+                    entry->cross_delete_bitmap = cross_bitmap;
+                } else {
+                    LOG(WARNING) << "Failed to compute cross-uncommitted 
delete bitmap, "
+                                    "tablet_id="
+                                 << tablet_id << " txn_id=" << 
entry->transaction_id
+                                 << ": " << calc_st;
+                }
+
+                entry->dedup_ready.store(true, std::memory_order_release);
+            });
+
+    if (!st.ok()) {
+        LOG(WARNING) << "Failed to submit cross-uncommitted dedup task, 
tablet_id=" << tablet_id
+                     << " txn_id=" << entry->transaction_id << ": " << st;
+        entry->dedup_ready.store(true, std::memory_order_release);
+    }
+}
+
+void UncommittedRowsetRegistry::_recompute_all_dedup(int64_t tablet_id) {
+    std::vector<std::shared_ptr<UncommittedRowsetEntry>> mow_entries;
+
+    {
+        auto& shard = _get_shard(tablet_id);
+        std::shared_lock rlock(shard.lock);
+        auto it = shard.entries.find(tablet_id);
+        if (it == shard.entries.end()) {
+            return;
+        }
+        for (auto& entry : it->second) {
+            if (entry->unique_key_merge_on_write) {
+                mow_entries.push_back(entry);
+            }
+        }
+    }
+
+    // Re-submit dedup tasks in order (they will serialize via tablet dedup 
mutex)
+    for (auto& entry : mow_entries) {
+        _submit_dedup_task(tablet_id, entry);
+    }
+}
+
+UncommittedRowsetRegistry* get_uncommitted_rowset_registry() {
+    auto* env = ExecEnv::GetInstance();
+    if (!env) {
+        return nullptr;
+    }
+    if (config::is_cloud_mode()) {
+        return env->storage_engine().to_cloud().uncommitted_rowset_registry();
+    } else {
+        return env->storage_engine().to_local().uncommitted_rowset_registry();
+    }
+}
+
+} // namespace doris
diff --git a/be/src/olap/uncommitted_rowset_registry.h 
b/be/src/olap/uncommitted_rowset_registry.h
new file mode 100644
index 00000000000..e2a07634cc3
--- /dev/null
+++ b/be/src/olap/uncommitted_rowset_registry.h
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <memory>
+#include <mutex>
+#include <shared_mutex>
+#include <unordered_map>
+#include <vector>
+
+#include "olap/rowset/rowset.h"
+#include "olap/tablet_fwd.h"
+#include "olap/tablet_meta.h"
+#include "util/threadpool.h"
+
+namespace doris {
+
+// Represents a single uncommitted rowset tracked by the registry.
+// The committed_delete_bitmap is computed at commit time (vs published 
rowsets).
+// The cross_delete_bitmap is computed asynchronously (vs earlier uncommitted 
rowsets).
+struct UncommittedRowsetEntry {
+    RowsetSharedPtr rowset;
+    int64_t transaction_id;
+    int64_t partition_id;
+    int64_t tablet_id;
+    // Delete bitmap computed at commit phase against published rowsets (layer 
2)
+    DeleteBitmapPtr committed_delete_bitmap;
+    // Delete bitmap computed async against earlier uncommitted rowsets (layer 
3)
+    DeleteBitmapPtr cross_delete_bitmap;
+    bool unique_key_merge_on_write;
+    int64_t creation_time; // for ordering: later wins in dedup
+    std::atomic<bool> dedup_ready {false}; // only serve to queries when true
+};
+
+// UncommittedRowsetRegistry tracks uncommitted rowsets across all tablets on 
this BE.
+// It supports the READ UNCOMMITTED isolation level by making uncommitted data 
visible
+// to queries that opt in via `SET read_uncommitted = true`.
+//
+// Thread safety: Uses sharded locks for the main map and per-tablet mutexes 
for
+// serializing async dedup computation.
+class UncommittedRowsetRegistry {
+public:
+    UncommittedRowsetRegistry();
+    ~UncommittedRowsetRegistry();
+
+    Status init(int dedup_threads);
+
+    // === Write Path ===
+
+    // Register an uncommitted rowset. For MoW unique key tables, this triggers
+    // async dedup computation against earlier uncommitted rowsets.
+    void register_rowset(std::shared_ptr<UncommittedRowsetEntry> entry);
+
+    // Unregister on publish or rollback. Removes the entry and any associated 
state.
+    void unregister_rowset(int64_t tablet_id, int64_t transaction_id);
+
+    // === Read Path ===
+
+    // Get all dedup-ready uncommitted rowsets for a tablet.
+    void get_ready_rowsets(int64_t tablet_id,
+                           
std::vector<std::shared_ptr<UncommittedRowsetEntry>>* result);
+
+    // === Compaction Path ===
+
+    // Called after compaction modifies published rowsets. Invalidates 
cross-delete
+    // bitmaps for all uncommitted rowsets on this tablet and re-computes them.
+    void on_compaction_completed(int64_t tablet_id);
+
+    // === Schema Change Path ===
+
+    // Called when a tablet transitions to a non-running state. Clears all 
entries.
+    void on_tablet_state_change(int64_t tablet_id, TabletState new_state);
+
+private:
+    static constexpr int SHARD_COUNT = 16;
+
+    struct Shard {
+        mutable std::shared_mutex lock;
+        // tablet_id -> list of uncommitted entries (ordered by creation_time)
+        std::unordered_map<int64_t, 
std::vector<std::shared_ptr<UncommittedRowsetEntry>>> entries;
+        // Per-tablet mutex for serializing dedup computation
+        std::unordered_map<int64_t, std::shared_ptr<std::mutex>> 
tablet_dedup_mutex;
+    };
+
+    Shard& _get_shard(int64_t tablet_id) { return _shards[tablet_id % 
SHARD_COUNT]; }
+    const Shard& _get_shard(int64_t tablet_id) const { return 
_shards[tablet_id % SHARD_COUNT]; }
+
+    // Get or create the per-tablet dedup mutex (must hold shard write lock)
+    std::shared_ptr<std::mutex> _get_tablet_dedup_mutex(Shard& shard, int64_t 
tablet_id);
+
+    // Submit async dedup task for one entry against earlier uncommitted 
rowsets
+    void _submit_dedup_task(int64_t tablet_id, 
std::shared_ptr<UncommittedRowsetEntry> entry);
+
+    // Recompute all cross-delete bitmaps for a tablet (after compaction)
+    void _recompute_all_dedup(int64_t tablet_id);
+
+    Shard _shards[SHARD_COUNT];
+    std::unique_ptr<ThreadPool> _dedup_thread_pool;
+};
+
+// Helper function to get the UncommittedRowsetRegistry from the current 
storage engine.
+// Works for both local and cloud mode. Returns nullptr if not available.
+UncommittedRowsetRegistry* get_uncommitted_rowset_registry();
+
+} // namespace doris
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 44fe060066a..cb8aecaae6d 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -33,6 +33,7 @@
 #include "olap/rowset/segment_v2/ann_index/ann_topn_runtime.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
+#include "olap/uncommitted_rowset_registry.h"
 #include "pipeline/exec/scan_operator.h"
 #include "pipeline/query_cache/query_cache.h"
 #include "runtime/runtime_state.h"
@@ -744,6 +745,70 @@ Status OlapScanLocalState::prepare(RuntimeState* state) {
                      print_id(PipelineXLocalState<>::_state->query_id()));
         }
     }
+    // Inject uncommitted rowsets for READ UNCOMMITTED isolation level
+    if (_state->read_uncommitted()) {
+        auto* registry = get_uncommitted_rowset_registry();
+        if (registry) {
+            for (size_t i = 0; i < _scan_ranges.size(); i++) {
+                auto& tablet = _tablets[i].tablet;
+                // Only DUP_KEYS and UNIQUE_KEYS supported
+                if (tablet->keys_type() != DUP_KEYS &&
+                    tablet->keys_type() != UNIQUE_KEYS) {
+                    continue;
+                }
+                // Skip tablets not in RUNNING state
+                if (tablet->tablet_state() != TABLET_RUNNING) {
+                    continue;
+                }
+
+                std::vector<std::shared_ptr<UncommittedRowsetEntry>> 
ready_entries;
+                registry->get_ready_rowsets(tablet->tablet_id(), 
&ready_entries);
+                if (ready_entries.empty()) {
+                    continue;
+                }
+
+                bool bitmap_copied = false;
+                auto ensure_bitmap_copy = [&]() {
+                    if (!bitmap_copied) {
+                        if (_read_sources[i].delete_bitmap) {
+                            _read_sources[i].delete_bitmap =
+                                    
std::make_shared<DeleteBitmap>(*_read_sources[i].delete_bitmap);
+                        } else {
+                            _read_sources[i].delete_bitmap =
+                                    
std::make_shared<DeleteBitmap>(tablet->tablet_id());
+                        }
+                        bitmap_copied = true;
+                    }
+                };
+
+                for (auto& entry : ready_entries) {
+                    RowsetReaderSharedPtr rs_reader;
+                    auto st = entry->rowset->create_reader(&rs_reader);
+                    if (!st.ok()) {
+                        LOG(WARNING) << "Failed to create reader for 
uncommitted rowset, "
+                                        "tablet_id="
+                                     << tablet->tablet_id()
+                                     << " txn_id=" << entry->transaction_id << 
": " << st;
+                        continue;
+                    }
+                    
_read_sources[i].rs_splits.emplace_back(std::move(rs_reader));
+
+                    bool is_mow = entry->unique_key_merge_on_write;
+                    // Merge committed-vs-published bitmap (layer 2)
+                    if (is_mow && entry->committed_delete_bitmap) {
+                        ensure_bitmap_copy();
+                        
_read_sources[i].delete_bitmap->merge(*entry->committed_delete_bitmap);
+                    }
+                    // Merge cross-uncommitted bitmap (layer 3)
+                    if (is_mow && entry->cross_delete_bitmap) {
+                        ensure_bitmap_copy();
+                        
_read_sources[i].delete_bitmap->merge(*entry->cross_delete_bitmap);
+                    }
+                }
+            }
+        }
+    }
+
     timer.stop();
     double cost_secs = static_cast<double>(timer.elapsed_time()) / 
NANOS_PER_SEC;
     if (cost_secs > 1) {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 70234928c64..3901c6d3eeb 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -436,6 +436,10 @@ public:
         return _query_options.__isset.skip_missing_version && 
_query_options.skip_missing_version;
     }
 
+    bool read_uncommitted() const {
+        return _query_options.__isset.read_uncommitted && 
_query_options.read_uncommitted;
+    }
+
     int64_t data_queue_max_blocks() const {
         return _query_options.__isset.data_queue_max_blocks ? 
_query_options.data_queue_max_blocks
                                                             : 1;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 9468a75ce3b..2c129f680cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -447,6 +447,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String SKIP_BAD_TABLET = "skip_bad_tablet";
 
+    public static final String READ_UNCOMMITTED = "read_uncommitted";
+
     public static final String ENABLE_PUSH_DOWN_NO_GROUP_AGG = 
"enable_push_down_no_group_agg";
 
     public static final String ENABLE_CBO_STATISTICS = "enable_cbo_statistics";
@@ -770,7 +772,8 @@ public class SessionVariable implements Serializable, 
Writable {
             SKIP_STORAGE_ENGINE_MERGE,
             SKIP_MISSING_VERSION,
             SKIP_BAD_TABLET,
-            SHOW_HIDDEN_COLUMNS
+            SHOW_HIDDEN_COLUMNS,
+            READ_UNCOMMITTED
     );
 
     public static final String ENABLE_STATS = "enable_stats";
@@ -2059,6 +2062,14 @@ public class SessionVariable implements Serializable, 
Writable {
             affectQueryResultInExecution = true)
     public boolean skipBadTablet = false;
 
+    @VariableMgr.VarAttr(name = READ_UNCOMMITTED, needForward = true,
+            varType = VariableAnnotation.EXPERIMENTAL,
+            description = {"启用 READ UNCOMMITTED 隔离级别,允许查询读取未提交的 rowset 
数据(实验特性)",
+                    "Enable READ UNCOMMITTED isolation level to allow queries 
to read "
+                    + "uncommitted rowset data (experimental)"},
+            affectQueryResultInPlan = true, affectQueryResultInExecution = 
true)
+    public boolean readUncommitted = false;
+
     // This variable is used to avoid FE fallback to the original parser. When 
we execute SQL in regression tests
     // for nereids, fallback will cause the Doris return the correct result 
although the syntax is unsupported
     // in nereids for some mistaken modification. You should set it on the
@@ -3589,7 +3600,7 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public boolean isInDebugMode() {
         return showHiddenColumns || skipDeleteBitmap || skipDeletePredicate || 
skipDeleteSign || skipStorageEngineMerge
-                || skipMissingVersion || skipBadTablet;
+                || skipMissingVersion || skipBadTablet || readUncommitted;
     }
 
     public String printDebugModeVariables() {
@@ -5067,6 +5078,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
         tResult.setSkipDeleteBitmap(skipDeleteBitmap);
 
+        tResult.setReadUncommitted(readUncommitted);
+
         tResult.setEnableFileCache(enableFileCache);
 
         tResult.setEnablePageCache(enablePageCache);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 6709520641b..455e7bf3653 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -426,6 +426,8 @@ struct TQueryOptions {
 
   185: optional bool enable_parquet_file_page_cache = true;
 
+  186: optional bool read_uncommitted = false;
+
   195: optional bool enable_left_semi_direct_return_opt;
 
   // For cloud, to control if the content would be written into file cache


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to