This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 00723e36cf [enhancement](merge-on-write) add delete bitmap correctness 
check for single load (#17147)
00723e36cf is described below

commit 00723e36cf6af9ee9385cc28d2126492574c78ae
Author: zhannngchen <[email protected]>
AuthorDate: Tue Feb 28 10:06:36 2023 +0800

    [enhancement](merge-on-write) add delete bitmap correctness check for 
single load (#17147)
    
    For Unique Key MoW table, if there are duplicate keys in one single load 
job and there's multiple segments, we need to calculate delete bitmap to mark 
these duplicate keys deleted.
    Add a check here to detect any bugs that might cause duplicate keys.
---
 be/src/olap/delta_writer.cpp                     |  4 +++-
 be/src/olap/rowset/beta_rowset_writer.cpp        |  7 +++++++
 be/src/olap/rowset/beta_rowset_writer.h          |  8 +++++++-
 be/src/olap/rowset/segment_v2/segment_writer.cpp |  3 +++
 be/src/olap/rowset/segment_v2/segment_writer.h   |  6 ++++++
 be/src/olap/tablet.cpp                           | 21 +++++++++++++++++++--
 be/src/olap/tablet.h                             |  5 +++--
 be/src/olap/tablet_meta.cpp                      |  8 ++++++++
 be/src/olap/tablet_meta.h                        |  2 ++
 be/src/olap/txn_manager.cpp                      | 14 ++++++--------
 be/src/olap/txn_manager.h                        |  3 ++-
 11 files changed, 66 insertions(+), 15 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 91d1efa17f..d5f2446b52 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -21,6 +21,7 @@
 #include "olap/data_dir.h"
 #include "olap/memtable.h"
 #include "olap/memtable_flush_executor.h"
+#include "olap/rowset/beta_rowset_writer.h"
 #include "olap/rowset/rowset_writer_context.h"
 #include "olap/schema.h"
 #include "olap/storage_engine.h"
@@ -346,7 +347,8 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& 
slave_tablet_nodes,
     if (_tablet->enable_unique_key_merge_on_write()) {
         _storage_engine->txn_manager()->set_txn_related_delete_bitmap(
                 _req.partition_id, _req.txn_id, _tablet->tablet_id(), 
_tablet->schema_hash(),
-                _tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids);
+                _tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids,
+                
dynamic_cast<BetaRowsetWriter*>(_rowset_writer.get())->get_num_mow_keys());
     }
 
     _delta_written_success = true;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index cd8f06aa73..e655cf0f34 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -759,6 +759,7 @@ void 
BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met
     int64_t total_data_size = 0;
     int64_t total_index_size = 0;
     std::vector<KeyBoundsPB> segments_encoded_key_bounds;
+    std::unordered_set<std::string> key_set;
     {
         std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
         for (const auto& itr : _segid_statistics_map) {
@@ -766,8 +767,13 @@ void 
BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met
             total_data_size += itr.second.data_size;
             total_index_size += itr.second.index_size;
             segments_encoded_key_bounds.push_back(itr.second.key_bounds);
+            if (_context.enable_unique_key_merge_on_write) {
+                DCHECK(itr.second.key_set.get() != nullptr);
+                key_set.insert(itr.second.key_set->begin(), 
itr.second.key_set->end());
+            }
         }
     }
+    _num_mow_keys = key_set.size();
     for (auto itr = _segments_encoded_key_bounds.begin(); itr != 
_segments_encoded_key_bounds.end();
          ++itr) {
         segments_encoded_key_bounds.push_back(*itr);
@@ -914,6 +920,7 @@ Status 
BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme
     segstat.data_size = segment_size;
     segstat.index_size = index_size;
     segstat.key_bounds = key_bounds;
+    segstat.key_set = (*writer)->get_key_set();
     {
         std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
         CHECK_EQ(_segid_statistics_map.find(segid) == 
_segid_statistics_map.end(), true);
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index f0d29fba30..309e190ad9 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -88,6 +88,8 @@ public:
         return _context.schema_change_recorder.get();
     }
 
+    uint64_t get_num_mow_keys() { return _num_mow_keys; }
+
 private:
     Status _add_block(const vectorized::Block* block,
                       std::unique_ptr<segment_v2::SegmentWriter>* writer);
@@ -170,9 +172,13 @@ protected:
         int64_t data_size;
         int64_t index_size;
         KeyBoundsPB key_bounds;
+        std::shared_ptr<std::unordered_set<std::string>> key_set;
     };
-    std::map<uint32_t, Statistics> _segid_statistics_map;
     std::mutex _segid_statistics_map_mutex;
+    std::map<uint32_t, Statistics> _segid_statistics_map;
+
+    // used for check correctness of unique key mow keys.
+    std::atomic<uint64_t> _num_mow_keys;
 
     bool _is_pending = false;
     bool _already_built = false;
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 4ee0cefec9..a0f4c169a0 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -219,6 +219,7 @@ Status SegmentWriter::init(const std::vector<uint32_t>& 
col_ids, bool has_key,
             _primary_key_index_builder.reset(
                     new PrimaryKeyIndexBuilder(_file_writer, seq_col_length));
             RETURN_IF_ERROR(_primary_key_index_builder->init());
+            _key_set.reset(new std::unordered_set<std::string>());
         } else {
             _short_key_index_builder.reset(
                     new ShortKeyIndexBuilder(_segment_id, 
_opts.num_rows_per_block));
@@ -326,6 +327,8 @@ Status SegmentWriter::append_block(const vectorized::Block* 
block, size_t row_po
             // create primary indexes
             for (size_t pos = 0; pos < num_rows; pos++) {
                 std::string key = _full_encode_keys(key_columns, pos);
+                DCHECK(_key_set.get() != nullptr);
+                _key_set->insert(key);
                 if (_tablet_schema->has_sequence_col()) {
                     _encode_seq_column(seq_column, pos, &key);
                 }
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h 
b/be/src/olap/rowset/segment_v2/segment_writer.h
index fa15893c62..41e53965d2 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -56,6 +56,8 @@ class ColumnWriter;
 extern const char* k_segment_magic;
 extern const uint32_t k_segment_magic_length;
 
+using KeySetPtr = std::shared_ptr<std::unordered_set<std::string>>;
+
 struct SegmentWriterOptions {
     uint32_t num_rows_per_block = 1024;
     bool enable_unique_key_merge_on_write = false;
@@ -105,6 +107,8 @@ public:
     Slice min_encoded_key();
     Slice max_encoded_key();
 
+    KeySetPtr get_key_set() { return _key_set; }
+
     DataDir* get_data_dir() { return _data_dir; }
     bool is_unique_key() { return _tablet_schema->keys_type() == UNIQUE_KEYS; }
 
@@ -165,6 +169,8 @@ private:
     const KeyCoder* _seq_coder = nullptr;
     std::vector<uint16_t> _key_index_size;
     size_t _short_key_row_pos = 0;
+    // used to check if there's duplicate key in aggregate key and unique key 
data model
+    KeySetPtr _key_set;
 
     std::vector<uint32_t> _column_ids;
     bool _has_key = true;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 219f0f7e11..e50b188e5a 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -65,6 +65,7 @@
 #include "olap/tablet_meta.h"
 #include "olap/tablet_meta_manager.h"
 #include "olap/tablet_schema.h"
+#include "olap/txn_manager.h"
 #include "segment_loader.h"
 #include "service/point_query_executor.h"
 #include "util/defer_op.h"
@@ -2523,8 +2524,9 @@ Status Tablet::update_delete_bitmap_without_lock(const 
RowsetSharedPtr& rowset)
     return Status::OK();
 }
 
-Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, 
DeleteBitmapPtr delete_bitmap,
-                                    const RowsetIdUnorderedSet& 
pre_rowset_ids) {
+Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, const 
TabletTxnInfo* load_info) {
+    DeleteBitmapPtr delete_bitmap = load_info->delete_bitmap;
+    const RowsetIdUnorderedSet& pre_rowset_ids = load_info->rowset_ids;
     RowsetIdUnorderedSet cur_rowset_ids;
     RowsetIdUnorderedSet rowset_ids_to_add;
     RowsetIdUnorderedSet rowset_ids_to_del;
@@ -2555,6 +2557,21 @@ Status Tablet::update_delete_bitmap(const 
RowsetSharedPtr& rowset, DeleteBitmapP
     RETURN_IF_ERROR(calc_delete_bitmap(rowset->rowset_id(), segments, 
&rowset_ids_to_add,
                                        delete_bitmap, cur_version - 1, true));
 
+    // Check the delete_bitmap correctness.
+    DeleteBitmap rs_bm(tablet_id());
+    delete_bitmap->subset({rowset->rowset_id(), 0, 0}, {rowset->rowset_id(), 
UINT32_MAX, INT64_MAX},
+                          &rs_bm);
+    auto num_rows = rowset->num_rows();
+    auto bitmap_cardinality = rs_bm.cardinality();
+    std::string err_msg = fmt::format(
+            "The delete bitmap of unique key table may not correct, expect num 
unique keys: {}, "
+            "now the num_rows: {}, delete bitmap cardinality: {}, num 
sgements: {}",
+            load_info->num_keys, num_rows, bitmap_cardinality, 
rowset->num_segments());
+    DCHECK_EQ(load_info->num_keys, num_rows - bitmap_cardinality) << err_msg;
+    if (load_info->num_keys != num_rows - bitmap_cardinality) {
+        return Status::InternalError(err_msg);
+    }
+
     // update version without write lock, compaction and publish_txn
     // will update delete bitmap, handle compaction with _rowset_update_lock
     // and publish_txn runs sequential so no need to lock here
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 3b69a3fd0e..e59e5cae30 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -55,6 +55,8 @@ class CumulativeCompactionPolicy;
 class CumulativeCompaction;
 class BaseCompaction;
 class RowsetWriter;
+
+struct TabletTxnInfo;
 struct RowsetWriterContext;
 
 using TabletSharedPtr = std::shared_ptr<Tablet>;
@@ -366,8 +368,7 @@ public:
                               bool check_pre_segments = false);
 
     Status update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset);
-    Status update_delete_bitmap(const RowsetSharedPtr& rowset, DeleteBitmapPtr 
delete_bitmap,
-                                const RowsetIdUnorderedSet& pre_rowset_ids);
+    Status update_delete_bitmap(const RowsetSharedPtr& rowset, const 
TabletTxnInfo* load_info);
     uint64_t calc_compaction_output_rowset_delete_bitmap(
             const std::vector<RowsetSharedPtr>& input_rowsets,
             const RowIdConversion& rowid_conversion, uint64_t start_version, 
uint64_t end_version,
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index cffa7fc10a..d2116b40bd 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -987,6 +987,14 @@ void DeleteBitmap::merge(const DeleteBitmap& other) {
     }
 }
 
+uint64_t DeleteBitmap::cardinality() {
+    uint64_t cardinality = 0;
+    for (auto entry : delete_bitmap) {
+        cardinality += entry.second.cardinality();
+    }
+    return cardinality;
+}
+
 // We cannot just copy the underlying memory to construct a string
 // due to equivalent objects may have different padding bytes.
 // Reading padding bytes is undefined behavior, neither copy nor
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 0fa3cffe26..c0a00d6fa5 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -369,6 +369,8 @@ public:
      */
     void merge(const DeleteBitmap& other);
 
+    uint64_t cardinality();
+
     /**
      * Checks if the given row is marked deleted in bitmap with the condition:
      * all the bitmaps that
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 9e80855bd3..5d3daef7d5 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -169,12 +169,10 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, 
TTransactionId transac
     return Status::OK();
 }
 
-void TxnManager::set_txn_related_delete_bitmap(TPartitionId partition_id,
-                                               TTransactionId transaction_id, 
TTabletId tablet_id,
-                                               SchemaHash schema_hash, 
TabletUid tablet_uid,
-                                               bool unique_key_merge_on_write,
-                                               DeleteBitmapPtr delete_bitmap,
-                                               const RowsetIdUnorderedSet& 
rowset_ids) {
+void TxnManager::set_txn_related_delete_bitmap(
+        TPartitionId partition_id, TTransactionId transaction_id, TTabletId 
tablet_id,
+        SchemaHash schema_hash, TabletUid tablet_uid, bool 
unique_key_merge_on_write,
+        DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, 
uint64_t num_keys) {
     pair<int64_t, int64_t> key(partition_id, transaction_id);
     TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
 
@@ -191,6 +189,7 @@ void TxnManager::set_txn_related_delete_bitmap(TPartitionId 
partition_id,
         load_info.unique_key_merge_on_write = unique_key_merge_on_write;
         load_info.delete_bitmap = delete_bitmap;
         load_info.rowset_ids = rowset_ids;
+        load_info.num_keys = num_keys;
     }
 }
 
@@ -321,8 +320,7 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId 
partition_id,
                     if (tablet == nullptr) {
                         return Status::OK();
                     }
-                    RETURN_IF_ERROR(tablet->update_delete_bitmap(
-                            rowset_ptr, load_info->delete_bitmap, 
load_info->rowset_ids));
+                    RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset_ptr, 
load_info));
                     std::shared_lock rlock(tablet->get_header_lock());
                     tablet->save_meta();
                 }
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 197307f589..7319234b27 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -58,6 +58,7 @@ struct TabletTxnInfo {
     // records rowsets calc in commit txn
     RowsetIdUnorderedSet rowset_ids;
     int64_t creation_time;
+    uint64_t num_keys;
 
     TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
             : load_id(load_id),
@@ -167,7 +168,7 @@ public:
                                        TTabletId tablet_id, SchemaHash 
schema_hash,
                                        TabletUid tablet_uid, bool 
unique_key_merge_on_write,
                                        DeleteBitmapPtr delete_bitmap,
-                                       const RowsetIdUnorderedSet& rowset_ids);
+                                       const RowsetIdUnorderedSet& rowset_ids, 
uint64_t num_keys);
 
 private:
     using TxnKey = std::pair<int64_t, int64_t>; // partition_id, 
transaction_id;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to