This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new ccde65b9420 [fix](Cooldown) Enhance calculate logic of
_has_data_to_cooldown (#30244) (#30299)
ccde65b9420 is described below
commit ccde65b9420ca92c4cc8da0049678e78d832126a
Author: AlexYue <[email protected]>
AuthorDate: Thu Jan 25 13:25:34 2024 +0800
[fix](Cooldown) Enhance calculate logic of _has_data_to_cooldown (#30244)
(#30299)
---
be/src/olap/tablet.cpp | 15 ++++-
be/test/olap/tablet_cooldown_test.cpp | 112 +++++++++++++++++++++++++---------
2 files changed, 97 insertions(+), 30 deletions(-)
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index f4bd7a5a710..ebf406abf2f 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2390,8 +2390,21 @@ bool Tablet::_has_data_to_cooldown() {
int64_t min_local_version = std::numeric_limits<int64_t>::max();
RowsetSharedPtr rowset;
std::shared_lock meta_rlock(_meta_lock);
+ // Ususally once the tablet has done cooldown successfully then the first
+ // rowset would always be remote rowset
+ bool has_cooldowned = false;
+ for (const auto& [_, rs] : _rs_version_map) {
+ if (!rs->is_local()) {
+ has_cooldowned = true;
+ break;
+ }
+ }
for (auto& [v, rs] : _rs_version_map) {
- if (rs->is_local() && v.first < min_local_version &&
rs->data_disk_size() > 0) {
+ auto predicate = rs->is_local() && v.first < min_local_version;
+ if (!has_cooldowned) {
+ predicate = predicate && (rs->data_disk_size() > 0);
+ }
+ if (predicate) {
// this is a local rowset and has data
min_local_version = v.first;
rowset = rs;
diff --git a/be/test/olap/tablet_cooldown_test.cpp
b/be/test/olap/tablet_cooldown_test.cpp
index 3211bb473d4..984afcc90dc 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -219,9 +219,6 @@ public:
ASSERT_TRUE(st.ok()) << st;
st =
io::global_local_filesystem()->create_directory(config::storage_root_path);
ASSERT_TRUE(st.ok()) << st;
- EXPECT_TRUE(io::global_local_filesystem()
-
->create_directory(get_remote_path(remote_tablet_path(kTabletId)))
- .ok());
std::vector<StorePath> paths {{config::storage_root_path, -1}};
@@ -307,28 +304,10 @@ static TDescriptorTable
create_descriptor_tablet_with_sequence_col() {
return desc_tbl_builder.desc_tbl();
}
-void createTablet(TabletSharedPtr* tablet, int64_t replica_id, int32_t
schema_hash,
- int64_t tablet_id, int64_t txn_id, int64_t partition_id) {
- // create tablet
- std::unique_ptr<RuntimeProfile> profile;
- profile = std::make_unique<RuntimeProfile>("CreateTablet");
- TCreateTabletReq request;
- create_tablet_request_with_sequence_col(tablet_id, schema_hash, &request);
- request.__set_replica_id(replica_id);
- Status st = k_engine->create_tablet(request, profile.get());
- ASSERT_EQ(Status::OK(), st);
-
- TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col();
- ObjectPool obj_pool;
- DescriptorTbl* desc_tbl = nullptr;
- static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
- TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
- auto param = std::make_shared<OlapTableSchemaParam>();
-
- // write data
- PUniqueId load_id;
- load_id.set_hi(0);
- load_id.set_lo(0);
+static void write_rowset(TabletSharedPtr* tablet, PUniqueId load_id, int64_t
replica_id,
+ int32_t schema_hash, int64_t tablet_id, int64_t
txn_id,
+ int64_t partition_id, TupleDescriptor* tuple_desc,
bool with_data = true) {
+ auto profile = std::make_unique<RuntimeProfile>("LoadChannels");
WriteRequest write_req;
write_req.tablet_id = tablet_id;
@@ -339,9 +318,8 @@ void createTablet(TabletSharedPtr* tablet, int64_t
replica_id, int32_t schema_ha
write_req.tuple_desc = tuple_desc;
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = false;
- write_req.table_schema_param = param;
+ write_req.table_schema_param = std::make_shared<OlapTableSchemaParam>();
- profile = std::make_unique<RuntimeProfile>("LoadChannels");
auto delta_writer =
std::make_unique<DeltaWriter>(*k_engine, &write_req,
profile.get(), TUniqueId {});
ASSERT_NE(delta_writer, nullptr);
@@ -352,9 +330,10 @@ void createTablet(TabletSharedPtr* tablet, int64_t
replica_id, int32_t schema_ha
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}
-
+ Status st;
auto columns = block.mutate_columns();
- {
+
+ if (with_data) {
int8_t c1 = 123;
columns[0]->insert_data((const char*)&c1, sizeof(c1));
@@ -399,6 +378,42 @@ void createTablet(TabletSharedPtr* tablet, int64_t
replica_id, int32_t schema_ha
st = (*tablet)->add_inc_rowset(rowset);
ASSERT_EQ(Status::OK(), st);
}
+}
+
+void createTablet(TabletSharedPtr* tablet, int64_t replica_id, int32_t
schema_hash,
+ int64_t tablet_id, int64_t txn_id, int64_t partition_id,
bool with_data = true) {
+ EXPECT_TRUE(io::global_local_filesystem()
+
->delete_directory(get_remote_path(remote_tablet_path(tablet_id)))
+ .ok());
+ EXPECT_TRUE(io::global_local_filesystem()
+
->create_directory(get_remote_path(remote_tablet_path(tablet_id)))
+ .ok());
+ // create tablet
+ std::unique_ptr<RuntimeProfile> profile;
+ profile = std::make_unique<RuntimeProfile>("CreateTablet");
+ TCreateTabletReq request;
+ create_tablet_request_with_sequence_col(tablet_id, schema_hash, &request);
+ request.__set_replica_id(replica_id);
+ Status st = k_engine->create_tablet(request, profile.get());
+ ASSERT_EQ(Status::OK(), st);
+ if (!with_data) {
+ *tablet = k_engine->tablet_manager()->get_tablet(tablet_id);
+ return;
+ }
+
+ TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col();
+ ObjectPool obj_pool;
+ DescriptorTbl* desc_tbl = nullptr;
+ static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
+ TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+
+ PUniqueId load_id;
+ load_id.set_hi(0);
+ load_id.set_lo(0);
+
+ write_rowset(tablet, std::move(load_id), replica_id, schema_hash,
tablet_id, txn_id,
+ partition_id, tuple_desc);
+
EXPECT_EQ(1, (*tablet)->num_rows());
}
@@ -427,4 +442,43 @@ TEST_F(TabletCooldownTest, normal) {
ASSERT_EQ(segments.size(), 1);
}
+TEST_F(TabletCooldownTest, cooldown_data) {
+ TabletSharedPtr tablet1;
+ createTablet(&tablet1, kReplicaId + 1, kSchemaHash + 1, kTabletId + 1,
kTxnId + 1,
+ kPartitionId + 1, false);
+ // test cooldown
+ tablet1->set_storage_policy_id(kStoragePolicyId);
+ // Tablet with only rowset[0-1] will not be as suitable as cooldown
candidate
+ ASSERT_FALSE(tablet1->_has_data_to_cooldown());
+
+ TabletSharedPtr tablet2;
+ createTablet(&tablet2, kReplicaId + 2, kSchemaHash + 2, kTabletId + 2,
kTxnId + 2,
+ kPartitionId + 2);
+ // test cooldown
+ tablet2->set_storage_policy_id(kStoragePolicyId);
+ Status st = tablet2->cooldown(); // rowset [0-1]
+ ASSERT_NE(Status::OK(), st);
+ tablet2->update_cooldown_conf(1, kReplicaId + 2);
+ // cooldown for upload node
+ st = tablet2->cooldown(); // rowset [0-1]
+ ASSERT_EQ(Status::OK(), st);
+ st = tablet2->cooldown(); // rowset [2-2]
+ ASSERT_EQ(Status::OK(), st);
+ // Write one empty local rowset into tablet2 to test if this rowset would
be uploaded or not
+ TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col();
+ ObjectPool obj_pool;
+ DescriptorTbl* desc_tbl = nullptr;
+ static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
+ TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+
+ PUniqueId load_id;
+ load_id.set_hi(1);
+ load_id.set_lo(1);
+ write_rowset(&tablet2, std::move(load_id), kReplicaId + 2, kSchemaHash +
2, kTabletId + 2,
+ kTxnId + 3, kPartitionId + 2, tuple_desc, false);
+
+ st = tablet2->cooldown(); // rowset [3-3]
+ ASSERT_EQ(Status::OK(), st);
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]