This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 1a51d04cb89b2bae645c3bd3dea0b443205594cf
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Jan 23 19:02:32 2024 +0800

    [fix](move-memtable) fix schema use-after-free in delta writer v2 (#30254)
---
 be/src/olap/delta_writer_context.h                  |  4 ++--
 be/src/olap/delta_writer_v2.cpp                     |  2 +-
 be/src/olap/rowset_builder.cpp                      |  3 ++-
 be/src/runtime/load_stream.cpp                      |  5 +++--
 be/src/runtime/load_stream.h                        |  3 ++-
 be/src/runtime/tablets_channel.cpp                  |  6 +++---
 be/src/runtime/tablets_channel.h                    |  2 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp        |  2 +-
 be/test/olap/delta_writer_test.cpp                  | 16 ++++++++--------
 be/test/olap/engine_storage_migration_task_test.cpp |  4 ++--
 be/test/olap/memtable_memory_limiter_test.cpp       |  4 ++--
 be/test/olap/remote_rowset_gc_test.cpp              |  4 ++--
 be/test/olap/tablet_cooldown_test.cpp               |  4 ++--
 be/test/runtime/load_stream_test.cpp                |  6 +++---
 14 files changed, 34 insertions(+), 31 deletions(-)

diff --git a/be/src/olap/delta_writer_context.h 
b/be/src/olap/delta_writer_context.h
index aa8ee81f871..02262197527 100644
--- a/be/src/olap/delta_writer_context.h
+++ b/be/src/olap/delta_writer_context.h
@@ -39,9 +39,9 @@ struct WriteRequest {
     TupleDescriptor* tuple_desc = nullptr;
     // slots are in order of tablet's schema
     const std::vector<SlotDescriptor*>* slots = nullptr;
-    OlapTableSchemaParam* table_schema_param = nullptr;
+    std::shared_ptr<OlapTableSchemaParam> table_schema_param = nullptr;
     bool is_high_priority = false;
     bool write_file_cache = false;
 };
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 5987c63e658..db2c7ef80ce 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -109,7 +109,7 @@ Status DeltaWriterV2::init() {
     if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == 
nullptr) {
         return Status::InternalError("failed to find tablet schema for {}", 
_req.index_id);
     }
-    _build_current_tablet_schema(_req.index_id, _req.table_schema_param,
+    _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
                                  *_streams[0]->tablet_schema(_req.index_id));
     RowsetWriterContext context;
     context.txn_id = _req.txn_id;
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index e7fb520c0bf..ecfba5cebb9 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -185,7 +185,8 @@ Status RowsetBuilder::init() {
     RETURN_IF_ERROR(prepare_txn());
 
     // build tablet schema in request level
-    _build_current_tablet_schema(_req.index_id, _req.table_schema_param, 
*_tablet->tablet_schema());
+    _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
+                                 *_tablet->tablet_schema());
     RowsetWriterContext context;
     context.txn_id = _req.txn_id;
     context.load_id = _req.load_id;
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 7d494eba88a..c62f5e9da50 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -70,7 +70,8 @@ inline std::ostream& operator<<(std::ostream& ostr, const 
TabletStream& tablet_s
     return ostr;
 }
 
-Status TabletStream::init(OlapTableSchemaParam* schema, int64_t index_id, 
int64_t partition_id) {
+Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, 
int64_t index_id,
+                          int64_t partition_id) {
     WriteRequest req {
             .tablet_id = _id,
             .txn_id = _txn_id,
@@ -291,7 +292,7 @@ Status 
IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, in
                                         int64_t partition_id) {
     tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, 
_txn_id, _load_stream_mgr,
                                                    _profile);
-    RETURN_IF_ERROR(tablet_stream->init(_schema.get(), _id, partition_id));
+    RETURN_IF_ERROR(tablet_stream->init(_schema, _id, partition_id));
     _tablet_streams_map[tablet_id] = tablet_stream;
     return Status::OK();
 }
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index 6df9198504e..c3fad94693d 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -47,7 +47,8 @@ public:
     TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, LoadStreamMgr* 
load_stream_mgr,
                  RuntimeProfile* profile);
 
-    Status init(OlapTableSchemaParam* schema, int64_t index_id, int64_t 
partition_id);
+    Status init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id,
+                int64_t partition_id);
 
     Status append_data(const PStreamHeader& header, butil::IOBuf* data);
     Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index d248ad09d7a..f3f2648ba70 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -135,7 +135,7 @@ Status BaseTabletsChannel::open(const 
PTabletWriterOpenRequest& request) {
               << ", timeout(s): " << request.load_channel_timeout_s();
     _txn_id = request.txn_id();
     _index_id = request.index_id();
-    _schema = std::make_unique<OlapTableSchemaParam>();
+    _schema = std::make_shared<OlapTableSchemaParam>();
     RETURN_IF_ERROR(_schema->init(request.schema()));
     _tuple_desc = _schema->tuple_desc();
 
@@ -189,7 +189,7 @@ Status BaseTabletsChannel::incremental_open(const 
PTabletWriterOpenRequest& para
         wrequest.tuple_desc = _tuple_desc;
         wrequest.slots = index_slots;
         wrequest.is_high_priority = _is_high_priority;
-        wrequest.table_schema_param = _schema.get();
+        wrequest.table_schema_param = _schema;
 
         // TODO(plat1ko): CloudDeltaWriter
         auto delta_writer = 
std::make_unique<DeltaWriter>(*StorageEngine::instance(), &wrequest,
@@ -451,7 +451,7 @@ Status BaseTabletsChannel::_open_all_writers(const 
PTabletWriterOpenRequest& req
                 .load_id = request.id(),
                 .tuple_desc = _tuple_desc,
                 .slots = index_slots,
-                .table_schema_param = _schema.get(),
+                .table_schema_param = _schema,
                 .is_high_priority = _is_high_priority,
                 .write_file_cache = request.write_file_cache(),
         };
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 15f68ba8e38..27db9387602 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -150,7 +150,7 @@ protected:
     // initialized in open function
     int64_t _txn_id = -1;
     int64_t _index_id = -1;
-    std::unique_ptr<OlapTableSchemaParam> _schema;
+    std::shared_ptr<OlapTableSchemaParam> _schema;
 
     TupleDescriptor* _tuple_desc = nullptr;
 
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index cfeeb782bed..c42f0955a2a 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -427,7 +427,7 @@ Status 
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
                 .partition_id = rows.partition_id,
                 .load_id = _load_id,
                 .tuple_desc = _output_tuple_desc,
-                .table_schema_param = _schema.get(),
+                .table_schema_param = _schema,
                 .is_high_priority = _is_high_priority,
                 .write_file_cache = _write_file_cache,
         };
diff --git a/be/test/olap/delta_writer_test.cpp 
b/be/test/olap/delta_writer_test.cpp
index c48ebf98a48..d38200a335e 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -490,7 +490,7 @@ TEST_F(TestDeltaWriter, open) {
     DescriptorTbl* desc_tbl = nullptr;
     static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
     TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
-    OlapTableSchemaParam param;
+    auto param = std::make_shared<OlapTableSchemaParam>();
 
     PUniqueId load_id;
     load_id.set_hi(0);
@@ -504,7 +504,7 @@ TEST_F(TestDeltaWriter, open) {
     write_req.tuple_desc = tuple_desc;
     write_req.slots = &(tuple_desc->slots());
     write_req.is_high_priority = true;
-    write_req.table_schema_param = &param;
+    write_req.table_schema_param = param;
 
     // test vec delta writer
     profile = std::make_unique<RuntimeProfile>("LoadChannels");
@@ -536,7 +536,7 @@ TEST_F(TestDeltaWriter, vec_write) {
     static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
     TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
     //     const std::vector<SlotDescriptor*>& slots = tuple_desc->slots();
-    OlapTableSchemaParam param;
+    auto param = std::make_shared<OlapTableSchemaParam>();
 
     PUniqueId load_id;
     load_id.set_hi(0);
@@ -550,7 +550,7 @@ TEST_F(TestDeltaWriter, vec_write) {
     write_req.tuple_desc = tuple_desc;
     write_req.slots = &(tuple_desc->slots());
     write_req.is_high_priority = false;
-    write_req.table_schema_param = &param;
+    write_req.table_schema_param = param;
     profile = std::make_unique<RuntimeProfile>("LoadChannels");
     auto delta_writer =
             std::make_unique<DeltaWriter>(*k_engine, &write_req, 
profile.get(), TUniqueId {});
@@ -699,7 +699,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
     DescriptorTbl* desc_tbl = nullptr;
     static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
     TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
-    OlapTableSchemaParam param;
+    auto param = std::make_shared<OlapTableSchemaParam>();
 
     PUniqueId load_id;
     load_id.set_hi(0);
@@ -713,7 +713,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
     write_req.tuple_desc = tuple_desc;
     write_req.slots = &(tuple_desc->slots());
     write_req.is_high_priority = false;
-    write_req.table_schema_param = &param;
+    write_req.table_schema_param = param;
     profile = std::make_unique<RuntimeProfile>("LoadChannels");
     auto delta_writer =
             std::make_unique<DeltaWriter>(*k_engine, &write_req, 
profile.get(), TUniqueId {});
@@ -814,7 +814,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
     DescriptorTbl* desc_tbl = nullptr;
     static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
     TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
-    OlapTableSchemaParam param;
+    auto param = std::make_shared<OlapTableSchemaParam>();
 
     PUniqueId load_id;
     load_id.set_hi(0);
@@ -828,7 +828,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
     write_req.tuple_desc = tuple_desc;
     write_req.slots = &(tuple_desc->slots());
     write_req.is_high_priority = false;
-    write_req.table_schema_param = &param;
+    write_req.table_schema_param = param;
     std::unique_ptr<RuntimeProfile> profile1;
     profile1 = std::make_unique<RuntimeProfile>("LoadChannels1");
     std::unique_ptr<RuntimeProfile> profile2;
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp 
b/be/test/olap/engine_storage_migration_task_test.cpp
index c65340522c6..47effac2a26 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -187,7 +187,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) 
{
     DescriptorTbl* desc_tbl = nullptr;
     static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
     TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
-    OlapTableSchemaParam param;
+    auto param = std::make_shared<OlapTableSchemaParam>();
 
     PUniqueId load_id;
     load_id.set_hi(0);
@@ -201,7 +201,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) 
{
     write_req.tuple_desc = tuple_desc;
     write_req.slots = &(tuple_desc->slots());
     write_req.is_high_priority = false;
-    write_req.table_schema_param = &param;
+    write_req.table_schema_param = param;
 
     profile = std::make_unique<RuntimeProfile>("LoadChannels");
     auto delta_writer =
diff --git a/be/test/olap/memtable_memory_limiter_test.cpp 
b/be/test/olap/memtable_memory_limiter_test.cpp
index 34c7361e33e..13fdceebd06 100644
--- a/be/test/olap/memtable_memory_limiter_test.cpp
+++ b/be/test/olap/memtable_memory_limiter_test.cpp
@@ -128,7 +128,7 @@ TEST_F(MemTableMemoryLimiterTest, 
handle_memtable_flush_test) {
     DescriptorTbl* desc_tbl = nullptr;
     static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
     TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
-    OlapTableSchemaParam param;
+    auto param = std::make_shared<OlapTableSchemaParam>();
 
     PUniqueId load_id;
     load_id.set_hi(0);
@@ -142,7 +142,7 @@ TEST_F(MemTableMemoryLimiterTest, 
handle_memtable_flush_test) {
     write_req.tuple_desc = tuple_desc;
     write_req.slots = &(tuple_desc->slots());
     write_req.is_high_priority = false;
-    write_req.table_schema_param = &param;
+    write_req.table_schema_param = param;
     profile = std::make_unique<RuntimeProfile>("MemTableMemoryLimiterTest");
     auto delta_writer =
             std::make_unique<DeltaWriter>(*_engine, &write_req, profile.get(), 
TUniqueId {});
diff --git a/be/test/olap/remote_rowset_gc_test.cpp 
b/be/test/olap/remote_rowset_gc_test.cpp
index fe8ab3b0f10..6fb9a2eb54a 100644
--- a/be/test/olap/remote_rowset_gc_test.cpp
+++ b/be/test/olap/remote_rowset_gc_test.cpp
@@ -180,7 +180,7 @@ TEST_F(RemoteRowsetGcTest, normal) {
     DescriptorTbl* desc_tbl = nullptr;
     DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
     TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
-    OlapTableSchemaParam param;
+    auto param = std::make_shared<OlapTableSchemaParam>();
 
     PUniqueId load_id;
     load_id.set_hi(0);
@@ -194,7 +194,7 @@ TEST_F(RemoteRowsetGcTest, normal) {
     write_req.tuple_desc = tuple_desc;
     write_req.slots = &(tuple_desc->slots());
     write_req.is_high_priority = false;
-    write_req.table_schema_param = &param;
+    write_req.table_schema_param = param;
     std::unique_ptr<RuntimeProfile> profile;
     profile = std::make_unique<RuntimeProfile>("LoadChannels");
     DeltaWriter* delta_writer = nullptr;
diff --git a/be/test/olap/tablet_cooldown_test.cpp 
b/be/test/olap/tablet_cooldown_test.cpp
index d83ea3eb016..3211bb473d4 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -323,7 +323,7 @@ void createTablet(TabletSharedPtr* tablet, int64_t 
replica_id, int32_t schema_ha
     DescriptorTbl* desc_tbl = nullptr;
     static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
     TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
-    OlapTableSchemaParam param;
+    auto param = std::make_shared<OlapTableSchemaParam>();
 
     // write data
     PUniqueId load_id;
@@ -339,7 +339,7 @@ void createTablet(TabletSharedPtr* tablet, int64_t 
replica_id, int32_t schema_ha
     write_req.tuple_desc = tuple_desc;
     write_req.slots = &(tuple_desc->slots());
     write_req.is_high_priority = false;
-    write_req.table_schema_param = &param;
+    write_req.table_schema_param = param;
 
     profile = std::make_unique<RuntimeProfile>("LoadChannels");
     auto delta_writer =
diff --git a/be/test/runtime/load_stream_test.cpp 
b/be/test/runtime/load_stream_test.cpp
index bc1f66a3f10..ca065f91a68 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -446,9 +446,9 @@ public:
             id.set_hi(1);
             id.set_lo(1);
 
-            OlapTableSchemaParam param;
-            construct_schema(&param);
-            *request.mutable_schema() = *param.to_protobuf();
+            auto param = std::make_shared<OlapTableSchemaParam>();
+            construct_schema(param.get());
+            *request.mutable_schema() = *param->to_protobuf();
             *request.mutable_load_id() = id;
             request.set_txn_id(NORMAL_TXN_ID);
             request.set_src_id(sender_id);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to