This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit a9b8a772bfe4f7be827068b0cf571d8f5f1d362e Author: Hongjiang Zhang <hongjizh...@ebay.com> AuthorDate: Tue Apr 6 15:07:53 2021 +0800 KUDU-3223: Management of per-table level limit When the table's size or row exceeds a threshold, the insert and update privilege will be removed from the authz token, only scan and delete privilege are allowed. Admin is responsible to change the on_disk_size_limit or row_count_limiti per-table. -1 means no limit. This feature depends on authz token expiration, and it caused a delay for write forbidden. Anyway, it provides an approximately management for write. Change-Id: I2dbf365ad59f17c0a4e2e7ea6a5afaa7680724b0 Reviewed-on: http://gerrit.cloudera.org:8080/17273 Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong <aw...@cloudera.com> --- src/kudu/client/client.cc | 23 +- src/kudu/client/client.h | 46 +++ src/kudu/client/table_alterer-internal.cc | 11 +- src/kudu/client/table_alterer-internal.h | 4 + src/kudu/client/table_statistics-internal.h | 22 +- src/kudu/integration-tests/CMakeLists.txt | 1 + src/kudu/integration-tests/write_limit-itest.cc | 449 ++++++++++++++++++++++++ src/kudu/master/catalog_manager.cc | 183 +++++++++- src/kudu/master/catalog_manager.h | 13 + src/kudu/master/master.proto | 12 + 10 files changed, 753 insertions(+), 11 deletions(-) diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc index 40ee5f8..7246cab 100644 --- a/src/kudu/client/client.cc +++ b/src/kudu/client/client.cc @@ -65,6 +65,7 @@ #include "kudu/common/partition.h" #include "kudu/common/partition_pruner.h" #include "kudu/common/row_operations.h" +#include "kudu/common/row_operations.pb.h" #include "kudu/common/scan_spec.h" #include "kudu/common/schema.h" #include "kudu/common/txn_id.h" @@ -690,7 +691,9 @@ Status KuduClient::GetTableStatistics(const string& table_name, unique_ptr<KuduTableStatistics> table_statistics(new KuduTableStatistics); table_statistics->data_ = new KuduTableStatistics::Data( resp.has_on_disk_size() ? boost::optional<int64_t>(resp.on_disk_size()) : boost::none, - resp.has_live_row_count() ? boost::optional<int64_t>(resp.live_row_count()) : boost::none); + resp.has_live_row_count() ? boost::optional<int64_t>(resp.live_row_count()) : boost::none, + resp.has_disk_size_limit() ? boost::optional<int64_t>(resp.disk_size_limit()) : boost::none, + resp.has_row_count_limit() ? boost::optional<int64_t>(resp.row_count_limit()) : boost::none); *statistics = table_statistics.release(); return Status::OK(); @@ -993,6 +996,14 @@ int64_t KuduTableStatistics::live_row_count() const { return data_->live_row_count_ ? *data_->live_row_count_ : -1; } +int64_t KuduTableStatistics::on_disk_size_limit() const { + return data_->on_disk_size_limit_ ? *data_->on_disk_size_limit_ : -1; +} + +int64_t KuduTableStatistics::live_row_count_limit() const { + return data_->live_row_count_limit_ ? *data_->live_row_count_limit_ : -1; +} + std::string KuduTableStatistics::ToString() const { return data_->ToString(); } @@ -1467,6 +1478,16 @@ KuduTableAlterer* KuduTableAlterer::AlterExtraConfig(const map<string, string>& return this; } +KuduTableAlterer* KuduTableAlterer::SetTableDiskSizeLimit(int64_t disk_size_limit) { + data_->disk_size_limit_ = disk_size_limit; + return this; +} + +KuduTableAlterer* KuduTableAlterer::SetTableRowCountLimit(int64_t row_count_limit) { + data_->row_count_limit_ = row_count_limit; + return this; +} + KuduTableAlterer* KuduTableAlterer::timeout(const MonoDelta& timeout) { data_->timeout_ = timeout; return this; diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index 55d8705..437b0c5 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -55,6 +55,7 @@ namespace kudu { class AuthzTokenTest; class ClientStressTest_TestUniqueClientIds_Test; +class DisableWriteWhenExceedingQuotaTest; class KuduPartialRow; class MonoDelta; class Partition; @@ -921,6 +922,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> { friend class internal::RetrieveAuthzTokenRpc; friend class internal::WriteRpc; friend class kudu::AuthzTokenTest; + friend class kudu::DisableWriteWhenExceedingQuotaTest; friend class kudu::SecurityUnknownTskTest; friend class transactions::CoordinatorRpc; friend class transactions::ParticipantRpc; @@ -1306,6 +1308,22 @@ class KUDU_EXPORT KuduTableStatistics { /// @note This statistic is pre-replication. int64_t live_row_count() const; + /// @return The table's on disk size limit. + /// -1 is returned if there is no disk size limit on this table. + /// + /// @note It is experimental and may change or disappear in future. + /// This feature currently applies size limit on a single table, but + /// it should also support database level size limit. + int64_t on_disk_size_limit() const; + + /// @return The table's live row count limit. + /// -1 is returned if there is no row count limit on this table. + /// + /// @note It is experimental and may change or disappear in future. + /// This feature currently applies row count limit on a single table, + /// but it should also support database level row count limit. + int64_t live_row_count_limit() const; + /// Stringify this Statistics. /// /// @return A string describing this statistics @@ -1785,6 +1803,34 @@ class KUDU_EXPORT KuduTableAlterer { /// @return Raw pointer to this alterer object. KuduTableAlterer* AlterExtraConfig(const std::map<std::string, std::string>& extra_configs); + /// Set the disk size limit of the table by the super user. + /// + /// @note The table limit alterations, including disk_size_limit and row_count_limit, + /// cannot be changed in the same alteration request with other alterations, because the + /// table 'limit' alteration needs the super user permission. + /// + /// @note It is experimental and may change or disappear in future. + /// This feature currently applies size limit on a single table. + /// + /// @param [in] disk_size_limit + /// The max table disk size, -1 is for no limit + /// @return Raw pointer to this alterer object. + KuduTableAlterer* SetTableDiskSizeLimit(int64_t disk_size_limit); + + /// Set the row count limit of the table by the super user. + /// + /// @note The table limit alterations, including disk_size_limit and row_count_limit, + /// cannot be changed in the same alteration request with other alterations, because the + /// table 'limit' alteration needs the super user permission. + /// + /// @note It is experimental and may change or disappear in future. + /// This feature currently applies row count limit on a single table. + /// + /// @param [in] row_count_limit + /// The max row count of the table, -1 is for no limit + /// @return Raw pointer to this alterer object. + KuduTableAlterer* SetTableRowCountLimit(int64_t row_count_limit); + /// Set a timeout for the alteration operation. /// /// This includes any waiting after the alter has been submitted diff --git a/src/kudu/client/table_alterer-internal.cc b/src/kudu/client/table_alterer-internal.cc index 4f4db75..874d02b 100644 --- a/src/kudu/client/table_alterer-internal.cc +++ b/src/kudu/client/table_alterer-internal.cc @@ -27,9 +27,9 @@ #include "kudu/client/schema-internal.h" #include "kudu/client/schema.h" #include "kudu/common/row_operations.h" +#include "kudu/common/row_operations.pb.h" #include "kudu/common/schema.h" #include "kudu/common/wire_protocol.h" -#include "kudu/common/wire_protocol.pb.h" #include "kudu/master/master.pb.h" using std::string; @@ -60,6 +60,8 @@ Status KuduTableAlterer::Data::ToRequest(AlterTableRequestPB* req) { if (!rename_to_.is_initialized() && !new_extra_configs_ && !set_owner_to_.is_initialized() && + !disk_size_limit_ && + !row_count_limit_ && steps_.empty()) { return Status::InvalidArgument("No alter steps provided"); } @@ -85,6 +87,13 @@ Status KuduTableAlterer::Data::ToRequest(AlterTableRequestPB* req) { SCHEMA_PB_WITHOUT_COMMENT)); } + if (disk_size_limit_) { + req->set_disk_size_limit(disk_size_limit_.get()); + } + if (row_count_limit_) { + req->set_row_count_limit(row_count_limit_.get()); + } + for (const Step& s : steps_) { AlterTableRequestPB::Step* pb_step = req->add_alter_schema_steps(); pb_step->set_type(s.step_type); diff --git a/src/kudu/client/table_alterer-internal.h b/src/kudu/client/table_alterer-internal.h index 4945a75..641daea 100644 --- a/src/kudu/client/table_alterer-internal.h +++ b/src/kudu/client/table_alterer-internal.h @@ -17,6 +17,7 @@ #ifndef KUDU_CLIENT_TABLE_ALTERER_INTERNAL_H #define KUDU_CLIENT_TABLE_ALTERER_INTERNAL_H +#include <cstdint> #include <map> #include <memory> #include <string> @@ -78,6 +79,9 @@ class KuduTableAlterer::Data { boost::optional<std::map<std::string, std::string>> new_extra_configs_; + boost::optional<int64_t> disk_size_limit_; + boost::optional<int64_t> row_count_limit_; + // Set to true if there are alter partition steps. bool has_alter_partitioning_steps = false; diff --git a/src/kudu/client/table_statistics-internal.h b/src/kudu/client/table_statistics-internal.h index 2f4f33b..2414f29 100644 --- a/src/kudu/client/table_statistics-internal.h +++ b/src/kudu/client/table_statistics-internal.h @@ -33,9 +33,14 @@ using strings::Substitute; class KuduTableStatistics::Data { public: - Data(boost::optional<int64_t> on_disk_size, boost::optional<int64_t> live_row_count) - : on_disk_size_(std::move(on_disk_size)), - live_row_count_(std::move(live_row_count)) { + Data(boost::optional<int64_t> on_disk_size, + boost::optional<int64_t> live_row_count, + boost::optional<int64_t> on_disk_size_limit, + boost::optional<int64_t> live_row_count_limit) + : on_disk_size_(on_disk_size), + live_row_count_(live_row_count), + on_disk_size_limit_(on_disk_size_limit), + live_row_count_limit_(live_row_count_limit) { } ~Data() { @@ -43,13 +48,20 @@ class KuduTableStatistics::Data { string ToString() const { return Substitute("on disk size: $0\n" - "live row count: $1\n", + "live row count: $1\n" + "on disk size limit: $2\n" + "live row count limit: $3\n", on_disk_size_ ? std::to_string(*on_disk_size_) : "N/A", - live_row_count_ ? std::to_string(*live_row_count_) : "N/A"); + live_row_count_ ? std::to_string(*live_row_count_) : "N/A", + on_disk_size_limit_ ? std::to_string(*on_disk_size_limit_) : "N/A", + live_row_count_limit_ ? std::to_string(*live_row_count_limit_) : "N/A"); + } const boost::optional<int64_t> on_disk_size_; const boost::optional<int64_t> live_row_count_; + const boost::optional<int64_t> on_disk_size_limit_; + const boost::optional<int64_t> live_row_count_limit_; private: DISALLOW_COPY_AND_ASSIGN(Data); diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt index 3b4c184..f7208c3 100644 --- a/src/kudu/integration-tests/CMakeLists.txt +++ b/src/kudu/integration-tests/CMakeLists.txt @@ -145,6 +145,7 @@ ADD_KUDU_TEST(ts_tablet_manager-itest) ADD_KUDU_TEST(update_scan_delta_compact-test RUN_SERIAL true) ADD_KUDU_TEST(webserver-crawl-itest LABELS no_dist_test) ADD_KUDU_TEST(webserver-stress-itest RUN_SERIAL true) +ADD_KUDU_TEST(write_limit-itest) ADD_KUDU_TEST(write_throttling-itest) if (NOT APPLE) diff --git a/src/kudu/integration-tests/write_limit-itest.cc b/src/kudu/integration-tests/write_limit-itest.cc new file mode 100644 index 0000000..b46a868 --- /dev/null +++ b/src/kudu/integration-tests/write_limit-itest.cc @@ -0,0 +1,449 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <stdint.h> + +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include <gflags/gflags_declare.h> +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/client/client-test-util.h" +#include "kudu/client/client.h" +#include "kudu/client/client.pb.h" +#include "kudu/client/schema.h" +#include "kudu/client/shared_ptr.h" // IWYU pragma: keep +#include "kudu/client/write_op.h" +#include "kudu/common/partial_row.h" +#include "kudu/gutil/stl_util.h" +#include "kudu/mini-cluster/internal_mini_cluster.h" +#include "kudu/tablet/key_value_test_schema.h" +#include "kudu/util/monotime.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +DECLARE_int32(heartbeat_interval_ms); +DECLARE_string(superuser_acl); +using std::string; +using std::unique_ptr; +using std::vector; + +DECLARE_bool(enable_table_write_limit); +DECLARE_bool(tserver_enforce_access_control); +DECLARE_double(table_write_limit_ratio); +DECLARE_int32(log_segment_size_mb); +DECLARE_int32(flush_threshold_mb); +DECLARE_int32(flush_threshold_secs); +DECLARE_int32(tablet_history_max_age_sec); +DECLARE_int64(authz_token_validity_seconds); +DECLARE_int64(table_disk_size_limit); +DECLARE_int64(table_row_count_limit); + +namespace kudu { + +using cluster::InternalMiniCluster; +using cluster::InternalMiniClusterOptions; +using client::AuthenticationCredentialsPB; +using client::sp::shared_ptr; +using client::KuduClient; +using client::KuduClientBuilder; +using client::KuduDelete; +using client::KuduError; +using client::KuduInsert; +using client::KuduScanner; +using client::KuduSchema; +using client::KuduSession; +using client::KuduTable; +using client::KuduTableAlterer; +using client::KuduTableCreator; +using client::KuduTableStatistics; +using client::KuduUpdate; + +namespace { + +// Relatively low timeout used so we don't have to wait too long for an +// "invalid token" error. +const int kRpcTimeoutSecs = 3; +const int kOperationTimeoutSecs = kRpcTimeoutSecs * 3; + +// Inserts a single row to the given key-value table for the given key. +Status InsertKeyToTable(KuduTable* table, KuduSession* session, int key) { + RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + unique_ptr<KuduInsert> insert(table->NewInsert()); + KuduPartialRow* row = insert->mutable_row(); + RETURN_NOT_OK(row->SetInt32(0, key)); + RETURN_NOT_OK(row->SetInt32(1, key)); + return session->Apply(insert.release()); +} + +// Update table value according to key. +Status UpdateKeyToTable(KuduTable* table, KuduSession* session, int key, int value) { + RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + unique_ptr<KuduUpdate> update(table->NewUpdate()); + KuduPartialRow* row = update->mutable_row(); + RETURN_NOT_OK(row->SetInt32("key", key)); + RETURN_NOT_OK(row->SetInt32("val", value)); + return session->Apply(update.release()); +} + +Status DeleteKeyToTable(KuduTable* table, KuduSession* session, int key) { + RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + unique_ptr<KuduDelete> del(table->NewDelete()); + KuduPartialRow* row = del->mutable_row(); + RETURN_NOT_OK(row->SetInt32("key", key)); + return session->Apply(del.release()); +} + +vector<Status> GetSessionErrors(KuduSession* session) { + vector<KuduError*> errors; + session->GetPendingErrors(&errors, nullptr); + vector<Status> ret(errors.size()); + for (int i = 0; i < errors.size(); i++) { + ret[i] = errors[i]->status(); + } + ElementDeleter deleter(&errors); + return ret; +} + +Status ScanWholeTable(KuduTable* table, vector<string>* rows) { + KuduScanner scanner(table); + scanner.SetTimeoutMillis(kOperationTimeoutSecs * 1000); + return ScanToStrings(&scanner, rows); +} + +Status SetTableLimit(const string& table_name, + const shared_ptr<KuduClient>& client, + int64_t disk_size_limit, + int64_t row_count_limit) { + unique_ptr<KuduTableAlterer> alterer( + client->NewTableAlterer(table_name)); + return alterer->SetTableDiskSizeLimit(disk_size_limit) + ->SetTableRowCountLimit(row_count_limit) + ->Alter(); +} + +} // anonymous namespace + +class DisableWriteWhenExceedingQuotaTest : public KuduTest { + public: + DisableWriteWhenExceedingQuotaTest() + : schema_(KuduSchema::FromSchema(CreateKeyValueTestSchema())) {} + const char* const kTableName = "test-table"; + const char* const kUser = "token-user"; + const char* const kSuperUser = "super-user"; + const int64_t kRowCountLimit = 20; + + void SetUp() override { + KuduTest::SetUp(); + FLAGS_superuser_acl = kSuperUser; + FLAGS_tserver_enforce_access_control = true; + FLAGS_authz_token_validity_seconds = 1; + FLAGS_enable_table_write_limit = true; + FLAGS_table_row_count_limit = 2; + FLAGS_table_disk_size_limit = 1024 * 1024 * 2; + FLAGS_log_segment_size_mb = 1; + FLAGS_flush_threshold_mb = 0; + FLAGS_tablet_history_max_age_sec = 1; + FLAGS_flush_threshold_secs = 1; + + // Create a table with a basic schema. + cluster_.reset(new InternalMiniCluster(env_, InternalMiniClusterOptions())); + ASSERT_OK(cluster_->Start()); + ASSERT_OK(SetupClientAndTable()); + } + + // Sets up the client_ and client_table_ members. + Status SetupClient(const string& user) { + RETURN_NOT_OK(CreateClientForUser(user, &client_)); + RETURN_NOT_OK(client_->OpenTable(kTableName, &client_table_)); + table_id_ = client_table_->id(); + return Status::OK(); + } + + // Sets up the client_ and client_table_ members. + Status SetupClientAndTable() { + RETURN_NOT_OK(CreateClientForUser(kUser, &client_)); + unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); + RETURN_NOT_OK(table_creator->table_name(kTableName) + .schema(&schema_) + .num_replicas(1) + .set_range_partition_columns({ "key" }) + .Create()); + RETURN_NOT_OK(client_->OpenTable(kTableName, &client_table_)); + table_id_ = client_table_->id(); + return Status::OK(); + } + + // Creates a client for the given user. + Status CreateClientForUser(const string& user, shared_ptr<KuduClient>* client) const { + // Many tests will expect operations to fail, so let's get there quicker by + // setting a low timeout. + KuduClientBuilder client_builder; + client_builder.default_rpc_timeout(MonoDelta::FromSeconds(kRpcTimeoutSecs)); + string authn_creds; + AuthenticationCredentialsPB authn_pb; + authn_pb.set_real_user(user); + CHECK(authn_pb.SerializeToString(&authn_creds)); + client_builder.import_authentication_credentials(std::move(authn_creds)); + return cluster_->CreateClient(&client_builder, client); + } + + // Wait for the tservers update table statisitcs to master. + static void WaitForTServerUpdatesStatisticsToMaster(int ms = FLAGS_heartbeat_interval_ms * 10) { + SleepFor(MonoDelta::FromMilliseconds(ms)); + } + + // Disable write privilege through authz token when exceeding size quota + void TestSizeLimit() { + shared_ptr<KuduSession> good_session(client_->NewSession()); + Status s = Status::OK(); + int k = 0; + for (k = 0; k < kRowCountLimit; k++) { + ASSERT_OK(InsertKeyToTable(client_table_.get(), good_session.get(), k)); + s = good_session->Flush(); + if (!s.ok()) { + // break the loop once the write is blocked + break; + } + WaitForTServerUpdatesStatisticsToMaster(1000); + } + ASSERT_TRUE(s.IsIOError()) << s.ToString(); + if (!s.ok()) { + vector<Status> errors = GetSessionErrors(good_session.get()); + for (const auto& e : errors) { + ASSERT_TRUE(e.IsRemoteError()) << e.ToString(); + ASSERT_STR_CONTAINS(e.ToString(), "Not authorized"); + } + } + // Scanning still works. + vector<string> rows; + ASSERT_OK(ScanWholeTable(client_table_.get(), &rows)); + ASSERT_EQ(rows.size(), k); + // Update also blocked + ASSERT_OK(UpdateKeyToTable(client_table_.get(), good_session.get(), 0, 1000)); + s = good_session->Flush(); + ASSERT_TRUE(s.IsIOError()) << s.ToString(); + vector<Status> errors = GetSessionErrors(good_session.get()); + for (const auto& e : errors) { + ASSERT_TRUE(e.IsRemoteError()) << e.ToString(); + ASSERT_STR_CONTAINS(e.ToString(), "Not authorized"); + } + // Delete operation is taken as UPDATE, which will also increase table size. + // So, remove all rows in order to reclaim the space + for (k--; k >= 0; k--) { + good_session = client_->NewSession(); + ASSERT_OK(DeleteKeyToTable(client_table_.get(), good_session.get(), k)); + ASSERT_OK(good_session->Flush()); + WaitForTServerUpdatesStatisticsToMaster(1000); + } + // Suppose the table is empty + rows.clear(); + ASSERT_OK(ScanWholeTable(client_table_.get(), &rows)); + ASSERT_EQ(rows.size(), 0); + // Insertion should work again + ASSERT_OK(InsertKeyToTable(client_table_.get(), good_session.get(), 0)); + ASSERT_OK(good_session->Flush()); + WaitForTServerUpdatesStatisticsToMaster(1000); + // Remove the just inserted row to clean the table + good_session = client_->NewSession(); + ASSERT_OK(DeleteKeyToTable(client_table_.get(), good_session.get(), 0)); + ASSERT_OK(good_session->Flush()); + WaitForTServerUpdatesStatisticsToMaster(1000); + } + + // Disable write privilege through authz token when exceeding rows quota + void TestRowLimit() { + // insert 2 rows to fill the quota + shared_ptr<KuduSession> good_session(client_->NewSession()); + auto key = next_row_key_++; + ASSERT_OK(InsertKeyToTable(client_table_.get(), good_session.get(), key)); + ASSERT_OK(good_session->Flush()); + WaitForTServerUpdatesStatisticsToMaster(); + + ASSERT_OK(InsertKeyToTable(client_table_.get(), good_session.get(), key + 1)); + Status s = good_session->Flush(); + WaitForTServerUpdatesStatisticsToMaster(); + // exceeds row quota and failed to update + shared_ptr<KuduSession> bad_session(client_->NewSession()); + ASSERT_OK(InsertKeyToTable(client_table_.get(), bad_session.get(), key + 2)); + s = bad_session->Flush(); + ASSERT_TRUE(s.IsIOError()) << s.ToString(); + vector<Status> errors = GetSessionErrors(bad_session.get()); + for (const auto& e : errors) { + ASSERT_TRUE(e.IsRemoteError()) << e.ToString(); + ASSERT_STR_CONTAINS(e.ToString(), "Not authorized"); + } + + ASSERT_OK(UpdateKeyToTable(client_table_.get(), bad_session.get(), key, key + 2)); + s = bad_session->Flush(); + ASSERT_TRUE(s.IsIOError()) << s.ToString(); + errors = GetSessionErrors(bad_session.get()); + for (const auto& e : errors) { + ASSERT_TRUE(e.IsRemoteError()) << e.ToString(); + ASSERT_STR_CONTAINS(e.ToString(), "Not authorized"); + } + // Scans still works. + vector<string> rows; + ASSERT_OK(ScanWholeTable(client_table_.get(), &rows)); + ASSERT_EQ(rows.size(), 2); + // remove one row to avoid reaching quota + good_session = client_->NewSession(); + ASSERT_OK(DeleteKeyToTable(client_table_.get(), good_session.get(), key)); + ASSERT_OK(good_session->Flush()); + WaitForTServerUpdatesStatisticsToMaster(); + // Check rows + rows.clear(); + ASSERT_OK(ScanWholeTable(client_table_.get(), &rows)); + ASSERT_EQ(rows.size(), 1); + // reinsert should be allowed + ASSERT_OK(InsertKeyToTable(client_table_.get(), good_session.get(), key + 3)); + ASSERT_OK(good_session->Flush()); + + // empty the table by deleting all rows + ASSERT_OK(DeleteKeyToTable(client_table_.get(), good_session.get(), key + 3)); + ASSERT_OK(good_session->Flush()); + WaitForTServerUpdatesStatisticsToMaster(); + + ASSERT_OK(DeleteKeyToTable(client_table_.get(), good_session.get(), key + 1)); + ASSERT_OK(good_session->Flush()); + } + + // change the table limit through admin user + void ModifyLimit(const int64_t disk_size_limit, const int64_t row_count_limit) { + ASSERT_OK(SetupClient(kSuperUser)); + ASSERT_OK(SetTableLimit(kTableName, client_, disk_size_limit, row_count_limit)); + } + + protected: + const KuduSchema schema_; + unique_ptr<InternalMiniCluster> cluster_; + + // Client authenticated as the default user. + shared_ptr<KuduClient> client_; + + // Table created with 'client_'. + shared_ptr<KuduTable> client_table_; + string table_id_; + + // The next row key to insert. + int next_row_key_ = 0; +}; + +// Refuse the table limit change if the alteration contains any other non-table-limit +// related operation, like renaming, changing owner, adding/altering/dropping column, etc. +TEST_F(DisableWriteWhenExceedingQuotaTest, TestDisallowedToChangeLimitMixedWithOtherOps) { + const int64_t on_disk_size_limit = 100L * 1024 * 1024; + const int64_t row_count_limit = 10L; + ASSERT_OK(SetupClient(kSuperUser)); + // Failed to change table disk size limit because it contains another + // non-table-limit related operation: rename the table + unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName)); + Status s = alterer->RenameTo("failedRenaming") + ->SetTableDiskSizeLimit(on_disk_size_limit) + ->Alter(); + ASSERT_TRUE(s.IsConfigurationError()) << s.ToString(); + + // Failed to change the row count limit of the table because it contains + // another non-table-limit related operation: set owner. + s = alterer->SetOwner("failedSetOwner") + ->SetTableRowCountLimit(row_count_limit) + ->Alter(); + ASSERT_TRUE(s.IsConfigurationError()) << s.ToString(); + + // It is successful only changing the table 'limit'. + ASSERT_OK(SetTableLimit(kTableName, client_, on_disk_size_limit, row_count_limit)); + + // restart the cluster to verify it again + cluster_->Shutdown(); + ASSERT_OK(cluster_->Start()); + unique_ptr<KuduTableStatistics> statistics; + KuduTableStatistics *table_statistics; + ASSERT_OK(client_->GetTableStatistics(kTableName, &table_statistics)); + statistics.reset(table_statistics); + ASSERT_EQ(on_disk_size_limit, statistics->on_disk_size_limit()); + ASSERT_EQ(row_count_limit, statistics->live_row_count_limit()); +} + +// Refuse the table limit change if the user is not superuser +TEST_F(DisableWriteWhenExceedingQuotaTest, TestOnlySuperUserAllowedToChangeTableLimit) { + const int64_t on_disk_size_limit = 100L * 1024 * 1024; + const int64_t row_count_limit = 10L; + Status s = SetTableLimit(kTableName, client_, on_disk_size_limit, row_count_limit); + ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString(); + NO_FATALS(ModifyLimit(on_disk_size_limit, row_count_limit)); + ASSERT_OK(SetupClient(kSuperUser)); + + ASSERT_OK(SetTableLimit(kTableName, client_, on_disk_size_limit, row_count_limit)); + // restart the cluster to verify it again + cluster_->Shutdown(); + ASSERT_OK(cluster_->Start()); + unique_ptr<KuduTableStatistics> statistics; + KuduTableStatistics *table_statistics; + ASSERT_OK(client_->GetTableStatistics(kTableName, &table_statistics)); + statistics.reset(table_statistics); + ASSERT_EQ(on_disk_size_limit, statistics->on_disk_size_limit()); + ASSERT_EQ(row_count_limit, statistics->live_row_count_limit()); +} + +// Verify the table's row limit +TEST_F(DisableWriteWhenExceedingQuotaTest, TestDisableWritePrivilegeWhenExceedingRowsQuota) { + NO_FATALS(TestRowLimit()); + // restart the cluster to verify it again + cluster_->Shutdown(); + ASSERT_OK(cluster_->Start()); + ASSERT_OK(SetupClient(kUser)); + NO_FATALS(TestRowLimit()); +} + +// Verify the table's disk size limit +TEST_F(DisableWriteWhenExceedingQuotaTest, TestDisableWritePrivilegeWhenExceedingSizeQuota) { + // modify the table limit to allow more rows but small size + NO_FATALS(ModifyLimit(1024L * 1024 + 120L * 1024, kRowCountLimit)); + // refresh the client + ASSERT_OK(SetupClient(kUser)); + NO_FATALS(TestSizeLimit()); + + // restart the cluster to verify it again + cluster_->Shutdown(); + ASSERT_OK(cluster_->Start()); + ASSERT_OK(SetupClient(kUser)); + NO_FATALS(TestSizeLimit()); +} + +TEST_F(DisableWriteWhenExceedingQuotaTest, TestDisableWriteWhenExceedingRowsQuotaWithFactor) { + FLAGS_table_write_limit_ratio = 0.5; + // restart the cluster to make the limit factor take effect + cluster_->Shutdown(); + ASSERT_OK(cluster_->Start()); + // modify the table limit to allow more rows but small size + NO_FATALS(ModifyLimit(10L * 1024 * 1024, 4L)); + // refresh the client + ASSERT_OK(SetupClient(kUser)); + NO_FATALS(TestRowLimit()); + + // modify the table limit to allow more rows but small size + NO_FATALS(ModifyLimit(2L * 1024 * 1024 + 120L * 1024, kRowCountLimit)); + // refresh the client + ASSERT_OK(SetupClient(kUser)); + NO_FATALS(TestSizeLimit()); +} +} // namespace kudu diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index dd7ce71..c213461 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -332,6 +332,29 @@ DEFINE_bool(enable_per_range_hash_schemas, false, "Whether the ability to specify different hash schemas per range is enabled"); TAG_FLAG(enable_per_range_hash_schemas, unsafe); +DEFINE_bool(enable_table_write_limit, false, + "Enable the table write limit. " + "When the table's size or row count is approaching the limit, " + "the write may be forbidden."); +TAG_FLAG(enable_table_write_limit, experimental); +TAG_FLAG(enable_table_write_limit, runtime); + +DEFINE_int64(table_disk_size_limit, -1, + "Set the target size in bytes of a table to write. " + "This is a system wide configuration for every newly " + "created table."); +TAG_FLAG(table_disk_size_limit, experimental); + +DEFINE_int64(table_row_count_limit, -1, + "Set the target row count of a table to write. " + "This is a system wide configuration for every newly " + "created table."); +TAG_FLAG(table_row_count_limit, experimental); + +DEFINE_double(table_write_limit_ratio, 0.95, + "Set the ratio of how much write limit can be reached"); +TAG_FLAG(table_write_limit_ratio, experimental); + DECLARE_bool(raft_prepare_replacement_before_eviction); DECLARE_int64(tsk_rotation_seconds); @@ -399,6 +422,30 @@ using strings::Substitute; namespace kudu { namespace master { +static bool ValidateTableWriteLimitRatio(const char* flagname, double value) { + if (value > 1.0) { + LOG(ERROR) << Substitute("$0 must be less than or equal to 1.0, value $1 is invalid.", + flagname, value); + return false; + } + if (value < 0) { + LOG(ERROR) << Substitute("$0 must be greater than 0, value $1 is invalid", + flagname, value); + } + return true; +} +DEFINE_validator(table_write_limit_ratio, &ValidateTableWriteLimitRatio); + +static bool ValidateTableLimit(const char* flag, int64_t limit) { + if (limit != -1 && limit < 0) { + LOG(ERROR) << Substitute("$0 must be greater than or equal to -1, " + "$1 is invalid", flag, limit); + return false; + } + return true; +} +DEFINE_validator(table_disk_size_limit, &ValidateTableLimit); +DEFINE_validator(table_row_count_limit, &ValidateTableLimit); //////////////////////////////////////////////////////////// // Table Loader //////////////////////////////////////////////////////////// @@ -2023,6 +2070,23 @@ scoped_refptr<TableInfo> CatalogManager::CreateTableInfo( if (req.has_owner()) { metadata->set_owner(req.owner()); } + // Set the table limit + if (FLAGS_enable_table_write_limit) { + if (FLAGS_table_disk_size_limit != TableInfo::TABLE_WRITE_DEFAULT_LIMIT) { + metadata->set_table_disk_size_limit(FLAGS_table_disk_size_limit); + } else { + metadata->clear_table_disk_size_limit(); + } + if (FLAGS_table_row_count_limit != TableInfo::TABLE_WRITE_DEFAULT_LIMIT) { + metadata->set_table_row_count_limit(FLAGS_table_row_count_limit); + } else { + metadata->clear_table_row_count_limit(); + } + + LOG(INFO) << Substitute("table size write limit: $0, table row write limit: $1", + FLAGS_table_disk_size_limit, + FLAGS_table_row_count_limit); + } return table; } @@ -2655,7 +2719,6 @@ Status CatalogManager::AlterTableRpc(const AlterTableRequestPB& req, if (rpc) { user = rpc->remote_user().username(); } - // If the HMS integration is enabled, the alteration includes a table // rename and the table should be altered in the HMS, then don't directly // rename the table in the Kudu catalog. Instead, rename the table @@ -2797,6 +2860,21 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req, } } + // Pre-check the modifications' validity: + // Alterations done by admin should not be combined with other table alterations. + bool table_limit_change = req.has_disk_size_limit() || + req.has_row_count_limit(); + bool other_schema_change = req.has_new_table_name() || + req.has_new_table_owner() || + !req.new_extra_configs().empty() || + !alter_schema_steps.empty() || + !alter_partitioning_steps.empty(); + if (table_limit_change && other_schema_change) { + return SetupError(Status::ConfigurationError( + "Alter table limit cannot be combined with other alterations"), + resp, MasterErrorPB::UNKNOWN_ERROR); + } + // 2. Lookup the table, verify if it exists, lock it for modification, and then // checks that the user is authorized to operate on the table. scoped_refptr<TableInfo> table; @@ -2815,7 +2893,16 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req, username == owner), resp, MasterErrorPB::NOT_AUTHORIZED); } - + if (req.has_disk_size_limit() || req.has_row_count_limit()) { + // Table limit is used to stop writing from the table owner, + // so, the owner is disallowed to change the table limit. + if (user && !master_->IsServiceUserOrSuperUser(*user)) { + return SetupError( + Status::NotAuthorized("must be a service user or " + "a super user to modify table limit"), + resp, MasterErrorPB::NOT_AUTHORIZED); + } + } return SetupError(authz_provider_->AuthorizeAlterTable(table_name, new_table, username, username == owner), resp, MasterErrorPB::NOT_AUTHORIZED); @@ -2832,6 +2919,32 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req, string normalized_table_name = NormalizeTableName(l.data().name()); *resp->mutable_table_id() = table->id(); + // Modify the table limit. + if (table_limit_change) { + if (req.has_disk_size_limit()) { + if (req.disk_size_limit() == TableInfo::TABLE_WRITE_DEFAULT_LIMIT) { + l.mutable_data()->pb.clear_table_disk_size_limit(); + } else if (req.disk_size_limit() >= 0) { + l.mutable_data()->pb.set_table_disk_size_limit(req.disk_size_limit()); + } else { + return SetupError(Status::InvalidArgument("disk size limit must " + "be greater than or equal to -1"), + resp, MasterErrorPB::UNKNOWN_ERROR); + } + } + if (req.has_row_count_limit()) { + if (req.row_count_limit() == TableInfo::TABLE_WRITE_DEFAULT_LIMIT) { + l.mutable_data()->pb.clear_table_row_count_limit(); + } else if (req.row_count_limit() >= 0) { + l.mutable_data()->pb.set_table_row_count_limit(req.row_count_limit()); + } else { + return SetupError(Status::InvalidArgument("row count limit must " + "be greater than or equal to -1"), + resp, MasterErrorPB::UNKNOWN_ERROR); + } + } + } + // 3. Calculate and validate new schema for the on-disk state, not persisted yet. Schema new_schema; ColumnId next_col_id = ColumnId(l.data().pb.next_column_id()); @@ -2946,7 +3059,8 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req, // or extra configuration has changed. bool has_metadata_changes = has_schema_changes || req.has_new_table_name() || req.has_new_table_owner() || - !req.new_extra_configs().empty(); + !req.new_extra_configs().empty() || req.has_disk_size_limit() || + req.has_row_count_limit(); // Set to true if there are partitioning changes. bool has_partitioning_changes = !alter_partitioning_steps.empty(); // Set to true if metadata changes need to be applied to existing tablets. @@ -3174,6 +3288,11 @@ Status CatalogManager::GetTableSchema(const GetTableSchemaRequestPB* req, *user == l.data().owner(), schema_pb, &table_privilege), resp, MasterErrorPB::UNKNOWN_ERROR)); + if (FLAGS_enable_table_write_limit && + PREDICT_FALSE(IsTableWriteDisabled(table, l.data().name()))) { + table_privilege.clear_insert_privilege(); + table_privilege.clear_update_privilege(); + } security::SignedTokenPB authz_token; RETURN_NOT_OK(token_signer->GenerateAuthzToken( *user, std::move(table_privilege), &authz_token)); @@ -3308,10 +3427,66 @@ Status CatalogManager::GetTableStatistics(const GetTableStatisticsRequestPB* req resp->set_live_row_count(table->GetMetrics()->live_row_count->value()); } } - + if (FLAGS_enable_table_write_limit) { + resp->set_disk_size_limit(l.data().pb.table_disk_size_limit()); + resp->set_row_count_limit(l.data().pb.table_row_count_limit()); + } return Status::OK(); } +bool CatalogManager::IsTableWriteDisabled(const scoped_refptr<TableInfo>& table, + const std::string& table_name) { + uint64_t table_disk_size = 0; + uint64_t table_rows = 0; + if (table->GetMetrics()->TableSupportsOnDiskSize()) { + table_disk_size = table->GetMetrics()->on_disk_size->value(); + } + if (table->GetMetrics()->TableSupportsLiveRowCount()) { + table_rows = table->GetMetrics()->live_row_count->value(); + } + bool disallow_write = false; + int64_t table_disk_size_limit = TableInfo::TABLE_WRITE_DEFAULT_LIMIT; + int64_t table_rows_limit = TableInfo::TABLE_WRITE_DEFAULT_LIMIT; + { + // Release the table_lock in time + TableMetadataLock table_lock(table.get(), LockMode::READ); + const auto& pb = table_lock.data().pb; + + // If we are approaching the limit target of the table, we treat it + // as limit reached, because here depends on authz token to disable + // writing, and authz token has a fixed expiration time. We cannot + // disable write immediately. + if (pb.has_table_disk_size_limit()) { + table_disk_size_limit = pb.table_disk_size_limit(); + if (static_cast<double>(table_disk_size) >= + (static_cast<double>(table_disk_size_limit) * + FLAGS_table_write_limit_ratio)) { + disallow_write = true; + } + } + if (!disallow_write && pb.has_table_row_count_limit()) { + table_rows_limit = pb.table_row_count_limit(); + if (static_cast<double>(table_rows) >= + (static_cast<double>(table_rows_limit) * + FLAGS_table_write_limit_ratio)) { + disallow_write = true; + } + } + } + + if (disallow_write) { + // The writing into the table is disallowed. + LOG(INFO) << Substitute("table $0 row count is $1, on disk size is $2, " + "row count limit is $3, size limit is $4, writing is forbidden", + table_name, + table_rows, + table_disk_size, + table_rows_limit, + table_disk_size_limit); + } + return disallow_write; +} + Status CatalogManager::GetTableInfo(const string& table_id, scoped_refptr<TableInfo> *table) { leader_lock_.AssertAcquiredForReading(); diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h index 73bcb0f..565ee6c 100644 --- a/src/kudu/master/catalog_manager.h +++ b/src/kudu/master/catalog_manager.h @@ -270,6 +270,14 @@ struct PersistentTableInfo { // spin-lock. class TableInfo : public RefCountedThreadSafe<TableInfo> { public: + enum { + // It is useful to place size limits on tables that may be ingested to + // by long-lived applications. To avoid breaking existing tables, there + // is no limit enforced. + // The default value -1 means no limit. + TABLE_WRITE_DEFAULT_LIMIT = -1L + }; + typedef PersistentTableInfo cow_state; typedef std::map<std::string, scoped_refptr<TabletInfo>> TabletInfoMap; @@ -837,6 +845,11 @@ class CatalogManager : public tserver::TabletReplicaLookupIf { typedef std::unordered_map<std::string, scoped_refptr<TableInfo>> TableInfoMap; typedef std::unordered_map<std::string, scoped_refptr<TabletInfo>> TabletInfoMap; + // Check whether the table's write limit is reached, + // if true, the write permission should be disabled. + static bool IsTableWriteDisabled(const scoped_refptr<TableInfo>& table, + const std::string& table_name); + // Delete the specified table in the catalog. If 'user' is provided, // checks that the user is authorized to delete the table. Otherwise, // it indicates its an internal operation (originates from catalog diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto index 51d57e6..f1e8a48 100644 --- a/src/kudu/master/master.proto +++ b/src/kudu/master/master.proto @@ -206,6 +206,11 @@ message SysTablesEntryPB { // The table type. If not set, it is assumed this table is a user-defined // table, rather than a system table. optional TableTypePB table_type = 14; + + // Table disk size limit. + optional int64 table_disk_size_limit = 15; + // Table row count limit. + optional int64 table_row_count_limit = 16; } // The on-disk entry in the sys.catalog table ("metadata" column) to represent @@ -595,6 +600,10 @@ message GetTableStatisticsResponsePB { // The table statistics from table metrics. optional int64 on_disk_size = 2; optional int64 live_row_count = 3; + + // The table limit + optional int64 disk_size_limit = 4; + optional int64 row_count_limit = 5; } message GetTableLocationsRequestPB { @@ -713,6 +722,9 @@ message AlterTableRequestPB { map<string, string> new_extra_configs = 6; optional string new_table_owner = 7; + + optional int64 disk_size_limit = 8; + optional int64 row_count_limit = 9; } message AlterTableResponsePB {