This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 1a51d04cb89b2bae645c3bd3dea0b443205594cf Author: Kaijie Chen <[email protected]> AuthorDate: Tue Jan 23 19:02:32 2024 +0800 [fix](move-memtable) fix schema use-after-free in delta writer v2 (#30254) --- be/src/olap/delta_writer_context.h | 4 ++-- be/src/olap/delta_writer_v2.cpp | 2 +- be/src/olap/rowset_builder.cpp | 3 ++- be/src/runtime/load_stream.cpp | 5 +++-- be/src/runtime/load_stream.h | 3 ++- be/src/runtime/tablets_channel.cpp | 6 +++--- be/src/runtime/tablets_channel.h | 2 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 2 +- be/test/olap/delta_writer_test.cpp | 16 ++++++++-------- be/test/olap/engine_storage_migration_task_test.cpp | 4 ++-- be/test/olap/memtable_memory_limiter_test.cpp | 4 ++-- be/test/olap/remote_rowset_gc_test.cpp | 4 ++-- be/test/olap/tablet_cooldown_test.cpp | 4 ++-- be/test/runtime/load_stream_test.cpp | 6 +++--- 14 files changed, 34 insertions(+), 31 deletions(-) diff --git a/be/src/olap/delta_writer_context.h b/be/src/olap/delta_writer_context.h index aa8ee81f871..02262197527 100644 --- a/be/src/olap/delta_writer_context.h +++ b/be/src/olap/delta_writer_context.h @@ -39,9 +39,9 @@ struct WriteRequest { TupleDescriptor* tuple_desc = nullptr; // slots are in order of tablet's schema const std::vector<SlotDescriptor*>* slots = nullptr; - OlapTableSchemaParam* table_schema_param = nullptr; + std::shared_ptr<OlapTableSchemaParam> table_schema_param = nullptr; bool is_high_priority = false; bool write_file_cache = false; }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 5987c63e658..db2c7ef80ce 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -109,7 +109,7 @@ Status DeltaWriterV2::init() { if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == nullptr) { return Status::InternalError("failed to find tablet schema for {}", _req.index_id); } - _build_current_tablet_schema(_req.index_id, _req.table_schema_param, + _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), *_streams[0]->tablet_schema(_req.index_id)); RowsetWriterContext context; context.txn_id = _req.txn_id; diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index e7fb520c0bf..ecfba5cebb9 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -185,7 +185,8 @@ Status RowsetBuilder::init() { RETURN_IF_ERROR(prepare_txn()); // build tablet schema in request level - _build_current_tablet_schema(_req.index_id, _req.table_schema_param, *_tablet->tablet_schema()); + _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), + *_tablet->tablet_schema()); RowsetWriterContext context; context.txn_id = _req.txn_id; context.load_id = _req.load_id; diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 7d494eba88a..c62f5e9da50 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -70,7 +70,8 @@ inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_s return ostr; } -Status TabletStream::init(OlapTableSchemaParam* schema, int64_t index_id, int64_t partition_id) { +Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id, + int64_t partition_id) { WriteRequest req { .tablet_id = _id, .txn_id = _txn_id, @@ -291,7 +292,7 @@ Status IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, in int64_t partition_id) { tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr, _profile); - RETURN_IF_ERROR(tablet_stream->init(_schema.get(), _id, partition_id)); + RETURN_IF_ERROR(tablet_stream->init(_schema, _id, partition_id)); _tablet_streams_map[tablet_id] = tablet_stream; return Status::OK(); } diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 6df9198504e..c3fad94693d 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -47,7 +47,8 @@ public: TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile); - Status init(OlapTableSchemaParam* schema, int64_t index_id, int64_t partition_id); + Status init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id, + int64_t partition_id); Status append_data(const PStreamHeader& header, butil::IOBuf* data); Status add_segment(const PStreamHeader& header, butil::IOBuf* data); diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index d248ad09d7a..f3f2648ba70 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -135,7 +135,7 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) { << ", timeout(s): " << request.load_channel_timeout_s(); _txn_id = request.txn_id(); _index_id = request.index_id(); - _schema = std::make_unique<OlapTableSchemaParam>(); + _schema = std::make_shared<OlapTableSchemaParam>(); RETURN_IF_ERROR(_schema->init(request.schema())); _tuple_desc = _schema->tuple_desc(); @@ -189,7 +189,7 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para wrequest.tuple_desc = _tuple_desc; wrequest.slots = index_slots; wrequest.is_high_priority = _is_high_priority; - wrequest.table_schema_param = _schema.get(); + wrequest.table_schema_param = _schema; // TODO(plat1ko): CloudDeltaWriter auto delta_writer = std::make_unique<DeltaWriter>(*StorageEngine::instance(), &wrequest, @@ -451,7 +451,7 @@ Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& req .load_id = request.id(), .tuple_desc = _tuple_desc, .slots = index_slots, - .table_schema_param = _schema.get(), + .table_schema_param = _schema, .is_high_priority = _is_high_priority, .write_file_cache = request.write_file_cache(), }; diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 15f68ba8e38..27db9387602 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -150,7 +150,7 @@ protected: // initialized in open function int64_t _txn_id = -1; int64_t _index_id = -1; - std::unique_ptr<OlapTableSchemaParam> _schema; + std::shared_ptr<OlapTableSchemaParam> _schema; TupleDescriptor* _tuple_desc = nullptr; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index cfeeb782bed..c42f0955a2a 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -427,7 +427,7 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block .partition_id = rows.partition_id, .load_id = _load_id, .tuple_desc = _output_tuple_desc, - .table_schema_param = _schema.get(), + .table_schema_param = _schema, .is_high_priority = _is_high_priority, .write_file_cache = _write_file_cache, }; diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index c48ebf98a48..d38200a335e 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -490,7 +490,7 @@ TEST_F(TestDeltaWriter, open) { DescriptorTbl* desc_tbl = nullptr; static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl)); TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); - OlapTableSchemaParam param; + auto param = std::make_shared<OlapTableSchemaParam>(); PUniqueId load_id; load_id.set_hi(0); @@ -504,7 +504,7 @@ TEST_F(TestDeltaWriter, open) { write_req.tuple_desc = tuple_desc; write_req.slots = &(tuple_desc->slots()); write_req.is_high_priority = true; - write_req.table_schema_param = ¶m; + write_req.table_schema_param = param; // test vec delta writer profile = std::make_unique<RuntimeProfile>("LoadChannels"); @@ -536,7 +536,7 @@ TEST_F(TestDeltaWriter, vec_write) { static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl)); TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); // const std::vector<SlotDescriptor*>& slots = tuple_desc->slots(); - OlapTableSchemaParam param; + auto param = std::make_shared<OlapTableSchemaParam>(); PUniqueId load_id; load_id.set_hi(0); @@ -550,7 +550,7 @@ TEST_F(TestDeltaWriter, vec_write) { write_req.tuple_desc = tuple_desc; write_req.slots = &(tuple_desc->slots()); write_req.is_high_priority = false; - write_req.table_schema_param = ¶m; + write_req.table_schema_param = param; profile = std::make_unique<RuntimeProfile>("LoadChannels"); auto delta_writer = std::make_unique<DeltaWriter>(*k_engine, &write_req, profile.get(), TUniqueId {}); @@ -699,7 +699,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { DescriptorTbl* desc_tbl = nullptr; static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl)); TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); - OlapTableSchemaParam param; + auto param = std::make_shared<OlapTableSchemaParam>(); PUniqueId load_id; load_id.set_hi(0); @@ -713,7 +713,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { write_req.tuple_desc = tuple_desc; write_req.slots = &(tuple_desc->slots()); write_req.is_high_priority = false; - write_req.table_schema_param = ¶m; + write_req.table_schema_param = param; profile = std::make_unique<RuntimeProfile>("LoadChannels"); auto delta_writer = std::make_unique<DeltaWriter>(*k_engine, &write_req, profile.get(), TUniqueId {}); @@ -814,7 +814,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) { DescriptorTbl* desc_tbl = nullptr; static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl)); TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); - OlapTableSchemaParam param; + auto param = std::make_shared<OlapTableSchemaParam>(); PUniqueId load_id; load_id.set_hi(0); @@ -828,7 +828,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) { write_req.tuple_desc = tuple_desc; write_req.slots = &(tuple_desc->slots()); write_req.is_high_priority = false; - write_req.table_schema_param = ¶m; + write_req.table_schema_param = param; std::unique_ptr<RuntimeProfile> profile1; profile1 = std::make_unique<RuntimeProfile>("LoadChannels1"); std::unique_ptr<RuntimeProfile> profile2; diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp index c65340522c6..47effac2a26 100644 --- a/be/test/olap/engine_storage_migration_task_test.cpp +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -187,7 +187,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { DescriptorTbl* desc_tbl = nullptr; static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl)); TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); - OlapTableSchemaParam param; + auto param = std::make_shared<OlapTableSchemaParam>(); PUniqueId load_id; load_id.set_hi(0); @@ -201,7 +201,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { write_req.tuple_desc = tuple_desc; write_req.slots = &(tuple_desc->slots()); write_req.is_high_priority = false; - write_req.table_schema_param = ¶m; + write_req.table_schema_param = param; profile = std::make_unique<RuntimeProfile>("LoadChannels"); auto delta_writer = diff --git a/be/test/olap/memtable_memory_limiter_test.cpp b/be/test/olap/memtable_memory_limiter_test.cpp index 34c7361e33e..13fdceebd06 100644 --- a/be/test/olap/memtable_memory_limiter_test.cpp +++ b/be/test/olap/memtable_memory_limiter_test.cpp @@ -128,7 +128,7 @@ TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) { DescriptorTbl* desc_tbl = nullptr; static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl)); TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); - OlapTableSchemaParam param; + auto param = std::make_shared<OlapTableSchemaParam>(); PUniqueId load_id; load_id.set_hi(0); @@ -142,7 +142,7 @@ TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) { write_req.tuple_desc = tuple_desc; write_req.slots = &(tuple_desc->slots()); write_req.is_high_priority = false; - write_req.table_schema_param = ¶m; + write_req.table_schema_param = param; profile = std::make_unique<RuntimeProfile>("MemTableMemoryLimiterTest"); auto delta_writer = std::make_unique<DeltaWriter>(*_engine, &write_req, profile.get(), TUniqueId {}); diff --git a/be/test/olap/remote_rowset_gc_test.cpp b/be/test/olap/remote_rowset_gc_test.cpp index fe8ab3b0f10..6fb9a2eb54a 100644 --- a/be/test/olap/remote_rowset_gc_test.cpp +++ b/be/test/olap/remote_rowset_gc_test.cpp @@ -180,7 +180,7 @@ TEST_F(RemoteRowsetGcTest, normal) { DescriptorTbl* desc_tbl = nullptr; DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl); TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); - OlapTableSchemaParam param; + auto param = std::make_shared<OlapTableSchemaParam>(); PUniqueId load_id; load_id.set_hi(0); @@ -194,7 +194,7 @@ TEST_F(RemoteRowsetGcTest, normal) { write_req.tuple_desc = tuple_desc; write_req.slots = &(tuple_desc->slots()); write_req.is_high_priority = false; - write_req.table_schema_param = ¶m; + write_req.table_schema_param = param; std::unique_ptr<RuntimeProfile> profile; profile = std::make_unique<RuntimeProfile>("LoadChannels"); DeltaWriter* delta_writer = nullptr; diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index d83ea3eb016..3211bb473d4 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -323,7 +323,7 @@ void createTablet(TabletSharedPtr* tablet, int64_t replica_id, int32_t schema_ha DescriptorTbl* desc_tbl = nullptr; static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl)); TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); - OlapTableSchemaParam param; + auto param = std::make_shared<OlapTableSchemaParam>(); // write data PUniqueId load_id; @@ -339,7 +339,7 @@ void createTablet(TabletSharedPtr* tablet, int64_t replica_id, int32_t schema_ha write_req.tuple_desc = tuple_desc; write_req.slots = &(tuple_desc->slots()); write_req.is_high_priority = false; - write_req.table_schema_param = ¶m; + write_req.table_schema_param = param; profile = std::make_unique<RuntimeProfile>("LoadChannels"); auto delta_writer = diff --git a/be/test/runtime/load_stream_test.cpp b/be/test/runtime/load_stream_test.cpp index bc1f66a3f10..ca065f91a68 100644 --- a/be/test/runtime/load_stream_test.cpp +++ b/be/test/runtime/load_stream_test.cpp @@ -446,9 +446,9 @@ public: id.set_hi(1); id.set_lo(1); - OlapTableSchemaParam param; - construct_schema(¶m); - *request.mutable_schema() = *param.to_protobuf(); + auto param = std::make_shared<OlapTableSchemaParam>(); + construct_schema(param.get()); + *request.mutable_schema() = *param->to_protobuf(); *request.mutable_load_id() = id; request.set_txn_id(NORMAL_TXN_ID); request.set_src_id(sender_id); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
