This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit a9b8a772bfe4f7be827068b0cf571d8f5f1d362e
Author: Hongjiang Zhang <hongjizh...@ebay.com>
AuthorDate: Tue Apr 6 15:07:53 2021 +0800

    KUDU-3223: Management of per-table level limit
    
    When the table's size or row exceeds a threshold, the insert and update
    privilege will be removed from the authz token, only scan and delete
    privilege are allowed.
    
    Admin is responsible to change the on_disk_size_limit or
    row_count_limiti per-table. -1 means no limit.
    
    This feature depends on authz token expiration, and it caused a delay
    for write forbidden. Anyway, it provides an approximately management for
    write.
    
    Change-Id: I2dbf365ad59f17c0a4e2e7ea6a5afaa7680724b0
    Reviewed-on: http://gerrit.cloudera.org:8080/17273
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/client.cc                       |  23 +-
 src/kudu/client/client.h                        |  46 +++
 src/kudu/client/table_alterer-internal.cc       |  11 +-
 src/kudu/client/table_alterer-internal.h        |   4 +
 src/kudu/client/table_statistics-internal.h     |  22 +-
 src/kudu/integration-tests/CMakeLists.txt       |   1 +
 src/kudu/integration-tests/write_limit-itest.cc | 449 ++++++++++++++++++++++++
 src/kudu/master/catalog_manager.cc              | 183 +++++++++-
 src/kudu/master/catalog_manager.h               |  13 +
 src/kudu/master/master.proto                    |  12 +
 10 files changed, 753 insertions(+), 11 deletions(-)

diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 40ee5f8..7246cab 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -65,6 +65,7 @@
 #include "kudu/common/partition.h"
 #include "kudu/common/partition_pruner.h"
 #include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/txn_id.h"
@@ -690,7 +691,9 @@ Status KuduClient::GetTableStatistics(const string& 
table_name,
   unique_ptr<KuduTableStatistics> table_statistics(new KuduTableStatistics);
   table_statistics->data_ = new KuduTableStatistics::Data(
       resp.has_on_disk_size() ? boost::optional<int64_t>(resp.on_disk_size()) 
: boost::none,
-      resp.has_live_row_count() ? 
boost::optional<int64_t>(resp.live_row_count()) : boost::none);
+      resp.has_live_row_count() ? 
boost::optional<int64_t>(resp.live_row_count()) : boost::none,
+      resp.has_disk_size_limit() ? 
boost::optional<int64_t>(resp.disk_size_limit()) : boost::none,
+      resp.has_row_count_limit() ? 
boost::optional<int64_t>(resp.row_count_limit()) : boost::none);
 
   *statistics = table_statistics.release();
   return Status::OK();
@@ -993,6 +996,14 @@ int64_t KuduTableStatistics::live_row_count() const {
   return data_->live_row_count_ ? *data_->live_row_count_ : -1;
 }
 
+int64_t KuduTableStatistics::on_disk_size_limit() const {
+  return data_->on_disk_size_limit_ ? *data_->on_disk_size_limit_ : -1;
+}
+
+int64_t KuduTableStatistics::live_row_count_limit() const {
+  return data_->live_row_count_limit_ ? *data_->live_row_count_limit_ : -1;
+}
+
 std::string KuduTableStatistics::ToString() const {
   return data_->ToString();
 }
@@ -1467,6 +1478,16 @@ KuduTableAlterer* 
KuduTableAlterer::AlterExtraConfig(const map<string, string>&
   return this;
 }
 
+KuduTableAlterer* KuduTableAlterer::SetTableDiskSizeLimit(int64_t 
disk_size_limit) {
+  data_->disk_size_limit_ = disk_size_limit;
+  return this;
+}
+
+KuduTableAlterer* KuduTableAlterer::SetTableRowCountLimit(int64_t 
row_count_limit) {
+  data_->row_count_limit_ = row_count_limit;
+  return this;
+}
+
 KuduTableAlterer* KuduTableAlterer::timeout(const MonoDelta& timeout) {
   data_->timeout_ = timeout;
   return this;
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 55d8705..437b0c5 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -55,6 +55,7 @@ namespace kudu {
 
 class AuthzTokenTest;
 class ClientStressTest_TestUniqueClientIds_Test;
+class DisableWriteWhenExceedingQuotaTest;
 class KuduPartialRow;
 class MonoDelta;
 class Partition;
@@ -921,6 +922,7 @@ class KUDU_EXPORT KuduClient : public 
sp::enable_shared_from_this<KuduClient> {
   friend class internal::RetrieveAuthzTokenRpc;
   friend class internal::WriteRpc;
   friend class kudu::AuthzTokenTest;
+  friend class kudu::DisableWriteWhenExceedingQuotaTest;
   friend class kudu::SecurityUnknownTskTest;
   friend class transactions::CoordinatorRpc;
   friend class transactions::ParticipantRpc;
@@ -1306,6 +1308,22 @@ class KUDU_EXPORT KuduTableStatistics {
   /// @note This statistic is pre-replication.
   int64_t live_row_count() const;
 
+  /// @return The table's on disk size limit.
+  ///  -1 is returned if there is no disk size limit on this table.
+  ///
+  /// @note It is experimental and may change or disappear in future.
+  /// This feature currently applies size limit on a single table, but
+  /// it should also support database level size limit.
+  int64_t on_disk_size_limit() const;
+
+  /// @return The table's live row count limit.
+  ///  -1 is returned if there is no row count limit on this table.
+  ///
+  /// @note It is experimental and may change or disappear in future.
+  /// This feature currently applies row count limit on a single table,
+  /// but it should also support database level row count limit.
+  int64_t live_row_count_limit() const;
+
   /// Stringify this Statistics.
   ///
   /// @return A string describing this statistics
@@ -1785,6 +1803,34 @@ class KUDU_EXPORT KuduTableAlterer {
   /// @return Raw pointer to this alterer object.
   KuduTableAlterer* AlterExtraConfig(const std::map<std::string, std::string>& 
extra_configs);
 
+  /// Set the disk size limit of the table by the super user.
+  ///
+  /// @note The table limit alterations, including disk_size_limit and 
row_count_limit,
+  /// cannot be changed in the same alteration request with other alterations, 
because the
+  /// table 'limit' alteration needs the super user permission.
+  ///
+  /// @note It is experimental and may change or disappear in future.
+  /// This feature currently applies size limit on a single table.
+  ///
+  /// @param [in] disk_size_limit
+  ///   The max table disk size, -1 is for no limit
+  /// @return Raw pointer to this alterer object.
+  KuduTableAlterer* SetTableDiskSizeLimit(int64_t disk_size_limit);
+
+  /// Set the row count limit of the table by the super user.
+  ///
+  /// @note The table limit alterations, including disk_size_limit and 
row_count_limit,
+  /// cannot be changed in the same alteration request with other alterations, 
because the
+  /// table 'limit' alteration needs the super user permission.
+  ///
+  /// @note It is experimental and may change or disappear in future.
+  /// This feature currently applies row count limit on a single table.
+  ///
+  /// @param [in] row_count_limit
+  ///   The max row count of the table, -1 is for no limit
+  /// @return Raw pointer to this alterer object.
+  KuduTableAlterer* SetTableRowCountLimit(int64_t row_count_limit);
+
   /// Set a timeout for the alteration operation.
   ///
   /// This includes any waiting after the alter has been submitted
diff --git a/src/kudu/client/table_alterer-internal.cc 
b/src/kudu/client/table_alterer-internal.cc
index 4f4db75..874d02b 100644
--- a/src/kudu/client/table_alterer-internal.cc
+++ b/src/kudu/client/table_alterer-internal.cc
@@ -27,9 +27,9 @@
 #include "kudu/client/schema-internal.h"
 #include "kudu/client/schema.h"
 #include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
-#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/master/master.pb.h"
 
 using std::string;
@@ -60,6 +60,8 @@ Status KuduTableAlterer::Data::ToRequest(AlterTableRequestPB* 
req) {
   if (!rename_to_.is_initialized() &&
       !new_extra_configs_ &&
       !set_owner_to_.is_initialized() &&
+      !disk_size_limit_ &&
+      !row_count_limit_ &&
       steps_.empty()) {
     return Status::InvalidArgument("No alter steps provided");
   }
@@ -85,6 +87,13 @@ Status 
KuduTableAlterer::Data::ToRequest(AlterTableRequestPB* req) {
                              SCHEMA_PB_WITHOUT_COMMENT));
   }
 
+  if (disk_size_limit_) {
+    req->set_disk_size_limit(disk_size_limit_.get());
+  }
+  if (row_count_limit_) {
+    req->set_row_count_limit(row_count_limit_.get());
+  }
+
   for (const Step& s : steps_) {
     AlterTableRequestPB::Step* pb_step = req->add_alter_schema_steps();
     pb_step->set_type(s.step_type);
diff --git a/src/kudu/client/table_alterer-internal.h 
b/src/kudu/client/table_alterer-internal.h
index 4945a75..641daea 100644
--- a/src/kudu/client/table_alterer-internal.h
+++ b/src/kudu/client/table_alterer-internal.h
@@ -17,6 +17,7 @@
 #ifndef KUDU_CLIENT_TABLE_ALTERER_INTERNAL_H
 #define KUDU_CLIENT_TABLE_ALTERER_INTERNAL_H
 
+#include <cstdint>
 #include <map>
 #include <memory>
 #include <string>
@@ -78,6 +79,9 @@ class KuduTableAlterer::Data {
 
   boost::optional<std::map<std::string, std::string>> new_extra_configs_;
 
+  boost::optional<int64_t> disk_size_limit_;
+  boost::optional<int64_t> row_count_limit_;
+
   // Set to true if there are alter partition steps.
   bool has_alter_partitioning_steps = false;
 
diff --git a/src/kudu/client/table_statistics-internal.h 
b/src/kudu/client/table_statistics-internal.h
index 2f4f33b..2414f29 100644
--- a/src/kudu/client/table_statistics-internal.h
+++ b/src/kudu/client/table_statistics-internal.h
@@ -33,9 +33,14 @@ using strings::Substitute;
 
 class KuduTableStatistics::Data {
  public:
-  Data(boost::optional<int64_t> on_disk_size, boost::optional<int64_t> 
live_row_count)
-      : on_disk_size_(std::move(on_disk_size)),
-        live_row_count_(std::move(live_row_count)) {
+  Data(boost::optional<int64_t> on_disk_size,
+       boost::optional<int64_t> live_row_count,
+       boost::optional<int64_t> on_disk_size_limit,
+       boost::optional<int64_t> live_row_count_limit)
+      : on_disk_size_(on_disk_size),
+        live_row_count_(live_row_count),
+        on_disk_size_limit_(on_disk_size_limit),
+        live_row_count_limit_(live_row_count_limit) {
   }
 
   ~Data() {
@@ -43,13 +48,20 @@ class KuduTableStatistics::Data {
 
   string ToString() const {
     return Substitute("on disk size: $0\n"
-                      "live row count: $1\n",
+                      "live row count: $1\n"
+                      "on disk size limit: $2\n"
+                      "live row count limit: $3\n",
                       on_disk_size_ ? std::to_string(*on_disk_size_) : "N/A",
-                      live_row_count_ ? std::to_string(*live_row_count_) : 
"N/A");
+                      live_row_count_ ? std::to_string(*live_row_count_) : 
"N/A",
+                      on_disk_size_limit_ ? 
std::to_string(*on_disk_size_limit_) : "N/A",
+                      live_row_count_limit_ ? 
std::to_string(*live_row_count_limit_) : "N/A");
+
   }
 
   const boost::optional<int64_t> on_disk_size_;
   const boost::optional<int64_t> live_row_count_;
+  const boost::optional<int64_t> on_disk_size_limit_;
+  const boost::optional<int64_t> live_row_count_limit_;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(Data);
diff --git a/src/kudu/integration-tests/CMakeLists.txt 
b/src/kudu/integration-tests/CMakeLists.txt
index 3b4c184..f7208c3 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -145,6 +145,7 @@ ADD_KUDU_TEST(ts_tablet_manager-itest)
 ADD_KUDU_TEST(update_scan_delta_compact-test RUN_SERIAL true)
 ADD_KUDU_TEST(webserver-crawl-itest LABELS no_dist_test)
 ADD_KUDU_TEST(webserver-stress-itest RUN_SERIAL true)
+ADD_KUDU_TEST(write_limit-itest)
 ADD_KUDU_TEST(write_throttling-itest)
 
 if (NOT APPLE)
diff --git a/src/kudu/integration-tests/write_limit-itest.cc 
b/src/kudu/integration-tests/write_limit-itest.cc
new file mode 100644
index 0000000..b46a868
--- /dev/null
+++ b/src/kudu/integration-tests/write_limit-itest.cc
@@ -0,0 +1,449 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <stdint.h>
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client-test-util.h"
+#include "kudu/client/client.h"
+#include "kudu/client/client.pb.h"
+#include "kudu/client/schema.h"
+#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/client/write_op.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/tablet/key_value_test_schema.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_int32(heartbeat_interval_ms);
+DECLARE_string(superuser_acl);
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+DECLARE_bool(enable_table_write_limit);
+DECLARE_bool(tserver_enforce_access_control);
+DECLARE_double(table_write_limit_ratio);
+DECLARE_int32(log_segment_size_mb);
+DECLARE_int32(flush_threshold_mb);
+DECLARE_int32(flush_threshold_secs);
+DECLARE_int32(tablet_history_max_age_sec);
+DECLARE_int64(authz_token_validity_seconds);
+DECLARE_int64(table_disk_size_limit);
+DECLARE_int64(table_row_count_limit);
+
+namespace kudu {
+
+using cluster::InternalMiniCluster;
+using cluster::InternalMiniClusterOptions;
+using client::AuthenticationCredentialsPB;
+using client::sp::shared_ptr;
+using client::KuduClient;
+using client::KuduClientBuilder;
+using client::KuduDelete;
+using client::KuduError;
+using client::KuduInsert;
+using client::KuduScanner;
+using client::KuduSchema;
+using client::KuduSession;
+using client::KuduTable;
+using client::KuduTableAlterer;
+using client::KuduTableCreator;
+using client::KuduTableStatistics;
+using client::KuduUpdate;
+
+namespace {
+
+// Relatively low timeout used so we don't have to wait too long for an
+// "invalid token" error.
+const int kRpcTimeoutSecs = 3;
+const int kOperationTimeoutSecs = kRpcTimeoutSecs * 3;
+
+// Inserts a single row to the given key-value table for the given key.
+Status InsertKeyToTable(KuduTable* table, KuduSession* session, int key) {
+  RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+  unique_ptr<KuduInsert> insert(table->NewInsert());
+  KuduPartialRow* row = insert->mutable_row();
+  RETURN_NOT_OK(row->SetInt32(0, key));
+  RETURN_NOT_OK(row->SetInt32(1, key));
+  return session->Apply(insert.release());
+}
+
+// Update table value according to key.
+Status UpdateKeyToTable(KuduTable* table, KuduSession* session, int key, int 
value) {
+  RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+  unique_ptr<KuduUpdate> update(table->NewUpdate());
+  KuduPartialRow* row = update->mutable_row();
+  RETURN_NOT_OK(row->SetInt32("key", key));
+  RETURN_NOT_OK(row->SetInt32("val", value));
+  return session->Apply(update.release());
+}
+
+Status DeleteKeyToTable(KuduTable* table, KuduSession* session, int key) {
+  RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+  unique_ptr<KuduDelete> del(table->NewDelete());
+  KuduPartialRow* row = del->mutable_row();
+  RETURN_NOT_OK(row->SetInt32("key", key));
+  return session->Apply(del.release());
+}
+
+vector<Status> GetSessionErrors(KuduSession* session) {
+  vector<KuduError*> errors;
+  session->GetPendingErrors(&errors, nullptr);
+  vector<Status> ret(errors.size());
+  for (int i = 0; i < errors.size(); i++) {
+    ret[i] = errors[i]->status();
+  }
+  ElementDeleter deleter(&errors);
+  return ret;
+}
+
+Status ScanWholeTable(KuduTable* table, vector<string>* rows) {
+  KuduScanner scanner(table);
+  scanner.SetTimeoutMillis(kOperationTimeoutSecs * 1000);
+  return ScanToStrings(&scanner, rows);
+}
+
+Status SetTableLimit(const string& table_name,
+                     const shared_ptr<KuduClient>& client,
+                     int64_t disk_size_limit,
+                     int64_t row_count_limit) {
+  unique_ptr<KuduTableAlterer> alterer(
+      client->NewTableAlterer(table_name));
+  return alterer->SetTableDiskSizeLimit(disk_size_limit)
+                ->SetTableRowCountLimit(row_count_limit)
+                ->Alter();
+}
+
+} // anonymous namespace
+
+class DisableWriteWhenExceedingQuotaTest : public KuduTest {
+ public:
+  DisableWriteWhenExceedingQuotaTest()
+      : schema_(KuduSchema::FromSchema(CreateKeyValueTestSchema())) {}
+  const char* const kTableName = "test-table";
+  const char* const kUser = "token-user";
+  const char* const kSuperUser = "super-user";
+  const int64_t kRowCountLimit = 20;
+
+  void SetUp() override {
+    KuduTest::SetUp();
+    FLAGS_superuser_acl = kSuperUser;
+    FLAGS_tserver_enforce_access_control = true;
+    FLAGS_authz_token_validity_seconds = 1;
+    FLAGS_enable_table_write_limit = true;
+    FLAGS_table_row_count_limit = 2;
+    FLAGS_table_disk_size_limit = 1024 * 1024 * 2;
+    FLAGS_log_segment_size_mb = 1;
+    FLAGS_flush_threshold_mb = 0;
+    FLAGS_tablet_history_max_age_sec = 1;
+    FLAGS_flush_threshold_secs = 1;
+
+    // Create a table with a basic schema.
+    cluster_.reset(new InternalMiniCluster(env_, 
InternalMiniClusterOptions()));
+    ASSERT_OK(cluster_->Start());
+    ASSERT_OK(SetupClientAndTable());
+  }
+
+  // Sets up the client_ and client_table_ members.
+  Status SetupClient(const string& user) {
+    RETURN_NOT_OK(CreateClientForUser(user, &client_));
+    RETURN_NOT_OK(client_->OpenTable(kTableName, &client_table_));
+    table_id_ = client_table_->id();
+    return Status::OK();
+  }
+
+  // Sets up the client_ and client_table_ members.
+  Status SetupClientAndTable() {
+    RETURN_NOT_OK(CreateClientForUser(kUser, &client_));
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    RETURN_NOT_OK(table_creator->table_name(kTableName)
+                               .schema(&schema_)
+                               .num_replicas(1)
+                               .set_range_partition_columns({ "key" })
+                               .Create());
+    RETURN_NOT_OK(client_->OpenTable(kTableName, &client_table_));
+    table_id_ = client_table_->id();
+    return Status::OK();
+  }
+
+  // Creates a client for the given user.
+  Status CreateClientForUser(const string& user, shared_ptr<KuduClient>* 
client) const {
+    // Many tests will expect operations to fail, so let's get there quicker by
+    // setting a low timeout.
+    KuduClientBuilder client_builder;
+    
client_builder.default_rpc_timeout(MonoDelta::FromSeconds(kRpcTimeoutSecs));
+    string authn_creds;
+    AuthenticationCredentialsPB authn_pb;
+    authn_pb.set_real_user(user);
+    CHECK(authn_pb.SerializeToString(&authn_creds));
+    client_builder.import_authentication_credentials(std::move(authn_creds));
+    return cluster_->CreateClient(&client_builder, client);
+  }
+
+  // Wait for the tservers update table statisitcs to master.
+  static void WaitForTServerUpdatesStatisticsToMaster(int ms = 
FLAGS_heartbeat_interval_ms * 10) {
+    SleepFor(MonoDelta::FromMilliseconds(ms));
+  }
+
+  // Disable write privilege through authz token when exceeding size quota
+  void TestSizeLimit() {
+    shared_ptr<KuduSession> good_session(client_->NewSession());
+    Status s = Status::OK();
+    int k = 0;
+    for (k = 0; k < kRowCountLimit; k++) {
+      ASSERT_OK(InsertKeyToTable(client_table_.get(), good_session.get(), k));
+      s = good_session->Flush();
+      if (!s.ok()) {
+        // break the loop once the write is blocked
+        break;
+      }
+      WaitForTServerUpdatesStatisticsToMaster(1000);
+    }
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    if (!s.ok()) {
+      vector<Status> errors = GetSessionErrors(good_session.get());
+      for (const auto& e : errors) {
+        ASSERT_TRUE(e.IsRemoteError()) << e.ToString();
+        ASSERT_STR_CONTAINS(e.ToString(), "Not authorized");
+      }
+    }
+    // Scanning still works.
+    vector<string> rows;
+    ASSERT_OK(ScanWholeTable(client_table_.get(), &rows));
+    ASSERT_EQ(rows.size(), k);
+    // Update also blocked
+    ASSERT_OK(UpdateKeyToTable(client_table_.get(), good_session.get(), 0, 
1000));
+    s = good_session->Flush();
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    vector<Status> errors = GetSessionErrors(good_session.get());
+    for (const auto& e : errors) {
+      ASSERT_TRUE(e.IsRemoteError()) << e.ToString();
+      ASSERT_STR_CONTAINS(e.ToString(), "Not authorized");
+    }
+    // Delete operation is taken as UPDATE, which will also increase table 
size.
+    // So, remove all rows in order to reclaim the space
+    for (k--; k >= 0; k--) {
+      good_session = client_->NewSession();
+      ASSERT_OK(DeleteKeyToTable(client_table_.get(), good_session.get(), k));
+      ASSERT_OK(good_session->Flush());
+      WaitForTServerUpdatesStatisticsToMaster(1000);
+    }
+    // Suppose the table is empty
+    rows.clear();
+    ASSERT_OK(ScanWholeTable(client_table_.get(), &rows));
+    ASSERT_EQ(rows.size(), 0);
+    // Insertion should work again
+    ASSERT_OK(InsertKeyToTable(client_table_.get(), good_session.get(), 0));
+    ASSERT_OK(good_session->Flush());
+    WaitForTServerUpdatesStatisticsToMaster(1000);
+    // Remove the just inserted row to clean the table
+    good_session = client_->NewSession();
+    ASSERT_OK(DeleteKeyToTable(client_table_.get(), good_session.get(), 0));
+    ASSERT_OK(good_session->Flush());
+    WaitForTServerUpdatesStatisticsToMaster(1000);
+  }
+
+  // Disable write privilege through authz token when exceeding rows quota
+  void TestRowLimit() {
+    // insert 2 rows to fill the quota
+    shared_ptr<KuduSession> good_session(client_->NewSession());
+    auto key = next_row_key_++;
+    ASSERT_OK(InsertKeyToTable(client_table_.get(), good_session.get(), key));
+    ASSERT_OK(good_session->Flush());
+    WaitForTServerUpdatesStatisticsToMaster();
+
+    ASSERT_OK(InsertKeyToTable(client_table_.get(), good_session.get(), key + 
1));
+    Status s = good_session->Flush();
+    WaitForTServerUpdatesStatisticsToMaster();
+    // exceeds row quota and failed to update
+    shared_ptr<KuduSession> bad_session(client_->NewSession());
+    ASSERT_OK(InsertKeyToTable(client_table_.get(), bad_session.get(), key + 
2));
+    s = bad_session->Flush();
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    vector<Status> errors = GetSessionErrors(bad_session.get());
+    for (const auto& e : errors) {
+      ASSERT_TRUE(e.IsRemoteError()) << e.ToString();
+      ASSERT_STR_CONTAINS(e.ToString(), "Not authorized");
+    }
+
+    ASSERT_OK(UpdateKeyToTable(client_table_.get(), bad_session.get(), key, 
key + 2));
+    s = bad_session->Flush();
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    errors = GetSessionErrors(bad_session.get());
+    for (const auto& e : errors) {
+      ASSERT_TRUE(e.IsRemoteError()) << e.ToString();
+      ASSERT_STR_CONTAINS(e.ToString(), "Not authorized");
+    }
+    // Scans still works.
+    vector<string> rows;
+    ASSERT_OK(ScanWholeTable(client_table_.get(), &rows));
+    ASSERT_EQ(rows.size(), 2);
+    // remove one row to avoid reaching quota
+    good_session = client_->NewSession();
+    ASSERT_OK(DeleteKeyToTable(client_table_.get(), good_session.get(), key));
+    ASSERT_OK(good_session->Flush());
+    WaitForTServerUpdatesStatisticsToMaster();
+    // Check rows
+    rows.clear();
+    ASSERT_OK(ScanWholeTable(client_table_.get(), &rows));
+    ASSERT_EQ(rows.size(), 1);
+    // reinsert should be allowed
+    ASSERT_OK(InsertKeyToTable(client_table_.get(), good_session.get(), key + 
3));
+    ASSERT_OK(good_session->Flush());
+
+    // empty the table by deleting all rows
+    ASSERT_OK(DeleteKeyToTable(client_table_.get(), good_session.get(), key + 
3));
+    ASSERT_OK(good_session->Flush());
+    WaitForTServerUpdatesStatisticsToMaster();
+
+    ASSERT_OK(DeleteKeyToTable(client_table_.get(), good_session.get(), key + 
1));
+    ASSERT_OK(good_session->Flush());
+  }
+
+  // change the table limit through admin user
+  void ModifyLimit(const int64_t disk_size_limit, const int64_t 
row_count_limit) {
+    ASSERT_OK(SetupClient(kSuperUser));
+    ASSERT_OK(SetTableLimit(kTableName, client_, disk_size_limit, 
row_count_limit));
+  }
+
+ protected:
+  const KuduSchema schema_;
+  unique_ptr<InternalMiniCluster> cluster_;
+
+  // Client authenticated as the default user.
+  shared_ptr<KuduClient> client_;
+
+  // Table created with 'client_'.
+  shared_ptr<KuduTable> client_table_;
+  string table_id_;
+
+  // The next row key to insert.
+  int next_row_key_ = 0;
+};
+
+// Refuse the table limit change if the alteration contains any other 
non-table-limit
+// related operation, like renaming, changing owner, adding/altering/dropping 
column, etc.
+TEST_F(DisableWriteWhenExceedingQuotaTest, 
TestDisallowedToChangeLimitMixedWithOtherOps) {
+  const int64_t on_disk_size_limit = 100L * 1024 * 1024;
+  const int64_t row_count_limit = 10L;
+  ASSERT_OK(SetupClient(kSuperUser));
+  // Failed to change table disk size limit because it contains another
+  // non-table-limit related operation: rename the table
+  unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
+  Status s = alterer->RenameTo("failedRenaming")
+                    ->SetTableDiskSizeLimit(on_disk_size_limit)
+                    ->Alter();
+  ASSERT_TRUE(s.IsConfigurationError()) << s.ToString();
+
+  // Failed to change the row count limit of the table because it contains
+  // another non-table-limit related operation: set owner.
+  s = alterer->SetOwner("failedSetOwner")
+             ->SetTableRowCountLimit(row_count_limit)
+             ->Alter();
+  ASSERT_TRUE(s.IsConfigurationError()) << s.ToString();
+
+  // It is successful only changing the table 'limit'.
+  ASSERT_OK(SetTableLimit(kTableName, client_, on_disk_size_limit, 
row_count_limit));
+
+  // restart the cluster to verify it again
+  cluster_->Shutdown();
+  ASSERT_OK(cluster_->Start());
+  unique_ptr<KuduTableStatistics> statistics;
+  KuduTableStatistics *table_statistics;
+  ASSERT_OK(client_->GetTableStatistics(kTableName, &table_statistics));
+  statistics.reset(table_statistics);
+  ASSERT_EQ(on_disk_size_limit, statistics->on_disk_size_limit());
+  ASSERT_EQ(row_count_limit, statistics->live_row_count_limit());
+}
+
+// Refuse the table limit change if the user is not superuser
+TEST_F(DisableWriteWhenExceedingQuotaTest, 
TestOnlySuperUserAllowedToChangeTableLimit) {
+  const int64_t on_disk_size_limit = 100L * 1024 * 1024;
+  const int64_t row_count_limit = 10L;
+  Status s = SetTableLimit(kTableName, client_, on_disk_size_limit, 
row_count_limit);
+  ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+  NO_FATALS(ModifyLimit(on_disk_size_limit, row_count_limit));
+  ASSERT_OK(SetupClient(kSuperUser));
+
+  ASSERT_OK(SetTableLimit(kTableName, client_, on_disk_size_limit, 
row_count_limit));
+  // restart the cluster to verify it again
+  cluster_->Shutdown();
+  ASSERT_OK(cluster_->Start());
+  unique_ptr<KuduTableStatistics> statistics;
+  KuduTableStatistics *table_statistics;
+  ASSERT_OK(client_->GetTableStatistics(kTableName, &table_statistics));
+  statistics.reset(table_statistics);
+  ASSERT_EQ(on_disk_size_limit, statistics->on_disk_size_limit());
+  ASSERT_EQ(row_count_limit, statistics->live_row_count_limit());
+}
+
+// Verify the table's row limit
+TEST_F(DisableWriteWhenExceedingQuotaTest, 
TestDisableWritePrivilegeWhenExceedingRowsQuota) {
+  NO_FATALS(TestRowLimit());
+  // restart the cluster to verify it again
+  cluster_->Shutdown();
+  ASSERT_OK(cluster_->Start());
+  ASSERT_OK(SetupClient(kUser));
+  NO_FATALS(TestRowLimit());
+}
+
+// Verify the table's disk size limit
+TEST_F(DisableWriteWhenExceedingQuotaTest, 
TestDisableWritePrivilegeWhenExceedingSizeQuota) {
+  // modify the table limit to allow more rows but small size
+  NO_FATALS(ModifyLimit(1024L * 1024 + 120L * 1024, kRowCountLimit));
+  // refresh the client
+  ASSERT_OK(SetupClient(kUser));
+  NO_FATALS(TestSizeLimit());
+
+  // restart the cluster to verify it again
+  cluster_->Shutdown();
+  ASSERT_OK(cluster_->Start());
+  ASSERT_OK(SetupClient(kUser));
+  NO_FATALS(TestSizeLimit());
+}
+
+TEST_F(DisableWriteWhenExceedingQuotaTest, 
TestDisableWriteWhenExceedingRowsQuotaWithFactor) {
+  FLAGS_table_write_limit_ratio = 0.5;
+  // restart the cluster to make the limit factor take effect
+  cluster_->Shutdown();
+  ASSERT_OK(cluster_->Start());
+  // modify the table limit to allow more rows but small size
+  NO_FATALS(ModifyLimit(10L * 1024 * 1024, 4L));
+  // refresh the client
+  ASSERT_OK(SetupClient(kUser));
+  NO_FATALS(TestRowLimit());
+
+  // modify the table limit to allow more rows but small size
+  NO_FATALS(ModifyLimit(2L * 1024 * 1024 + 120L * 1024, kRowCountLimit));
+  // refresh the client
+  ASSERT_OK(SetupClient(kUser));
+  NO_FATALS(TestSizeLimit());
+}
+} // namespace kudu
diff --git a/src/kudu/master/catalog_manager.cc 
b/src/kudu/master/catalog_manager.cc
index dd7ce71..c213461 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -332,6 +332,29 @@ DEFINE_bool(enable_per_range_hash_schemas, false,
             "Whether the ability to specify different hash schemas per range 
is enabled");
 TAG_FLAG(enable_per_range_hash_schemas, unsafe);
 
+DEFINE_bool(enable_table_write_limit, false,
+            "Enable the table write limit. "
+            "When the table's size or row count is approaching the limit, "
+            "the write may be forbidden.");
+TAG_FLAG(enable_table_write_limit, experimental);
+TAG_FLAG(enable_table_write_limit, runtime);
+
+DEFINE_int64(table_disk_size_limit, -1,
+             "Set the target size in bytes of a table to write. "
+             "This is a system wide configuration for every newly "
+             "created table.");
+TAG_FLAG(table_disk_size_limit, experimental);
+
+DEFINE_int64(table_row_count_limit, -1,
+             "Set the target row count of a table to write. "
+             "This is a system wide configuration for every newly "
+             "created table.");
+TAG_FLAG(table_row_count_limit, experimental);
+
+DEFINE_double(table_write_limit_ratio, 0.95,
+              "Set the ratio of how much write limit can be reached");
+TAG_FLAG(table_write_limit_ratio, experimental);
+
 DECLARE_bool(raft_prepare_replacement_before_eviction);
 DECLARE_int64(tsk_rotation_seconds);
 
@@ -399,6 +422,30 @@ using strings::Substitute;
 namespace kudu {
 namespace master {
 
+static bool ValidateTableWriteLimitRatio(const char* flagname, double value) {
+  if (value > 1.0) {
+    LOG(ERROR) << Substitute("$0 must be less than or equal to 1.0, value $1 
is invalid.",
+                             flagname, value);
+    return false;
+  }
+  if (value < 0) {
+    LOG(ERROR) << Substitute("$0 must be greater than 0, value $1 is invalid",
+                               flagname, value);
+  }
+  return true;
+}
+DEFINE_validator(table_write_limit_ratio, &ValidateTableWriteLimitRatio);
+
+static bool ValidateTableLimit(const char* flag, int64_t limit) {
+  if (limit != -1 && limit < 0) {
+     LOG(ERROR) << Substitute("$0 must be greater than or equal to -1, "
+                              "$1 is invalid", flag, limit);
+     return false;
+  }
+  return true;
+}
+DEFINE_validator(table_disk_size_limit, &ValidateTableLimit);
+DEFINE_validator(table_row_count_limit, &ValidateTableLimit);
 ////////////////////////////////////////////////////////////
 // Table Loader
 ////////////////////////////////////////////////////////////
@@ -2023,6 +2070,23 @@ scoped_refptr<TableInfo> CatalogManager::CreateTableInfo(
   if (req.has_owner()) {
     metadata->set_owner(req.owner());
   }
+  // Set the table limit
+  if (FLAGS_enable_table_write_limit) {
+    if (FLAGS_table_disk_size_limit != TableInfo::TABLE_WRITE_DEFAULT_LIMIT) {
+      metadata->set_table_disk_size_limit(FLAGS_table_disk_size_limit);
+    } else {
+      metadata->clear_table_disk_size_limit();
+    }
+    if (FLAGS_table_row_count_limit != TableInfo::TABLE_WRITE_DEFAULT_LIMIT) {
+      metadata->set_table_row_count_limit(FLAGS_table_row_count_limit);
+    } else {
+      metadata->clear_table_row_count_limit();
+    }
+
+    LOG(INFO) << Substitute("table size write limit: $0, table row write 
limit: $1",
+                            FLAGS_table_disk_size_limit,
+                            FLAGS_table_row_count_limit);
+  }
   return table;
 }
 
@@ -2655,7 +2719,6 @@ Status CatalogManager::AlterTableRpc(const 
AlterTableRequestPB& req,
   if (rpc) {
     user = rpc->remote_user().username();
   }
-
   // If the HMS integration is enabled, the alteration includes a table
   // rename and the table should be altered in the HMS, then don't directly
   // rename the table in the Kudu catalog. Instead, rename the table
@@ -2797,6 +2860,21 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
     }
   }
 
+  // Pre-check the modifications' validity:
+  // Alterations done by admin should not be combined with other table 
alterations.
+  bool table_limit_change = req.has_disk_size_limit() ||
+                            req.has_row_count_limit();
+  bool other_schema_change = req.has_new_table_name() ||
+                             req.has_new_table_owner() ||
+                             !req.new_extra_configs().empty() ||
+                             !alter_schema_steps.empty() ||
+                             !alter_partitioning_steps.empty();
+  if (table_limit_change && other_schema_change) {
+    return SetupError(Status::ConfigurationError(
+                      "Alter table limit cannot be combined with other 
alterations"),
+                      resp, MasterErrorPB::UNKNOWN_ERROR);
+  }
+
   // 2. Lookup the table, verify if it exists, lock it for modification, and 
then
   //    checks that the user is authorized to operate on the table.
   scoped_refptr<TableInfo> table;
@@ -2815,7 +2893,16 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
                                                               username == 
owner),
                         resp, MasterErrorPB::NOT_AUTHORIZED);
     }
-
+    if (req.has_disk_size_limit() || req.has_row_count_limit()) {
+      // Table limit is used to stop writing from the table owner,
+      // so, the owner is disallowed to change the table limit.
+      if (user && !master_->IsServiceUserOrSuperUser(*user)) {
+        return SetupError(
+              Status::NotAuthorized("must be a service user or "
+              "a super user to modify table limit"),
+              resp, MasterErrorPB::NOT_AUTHORIZED);
+      }
+    }
     return SetupError(authz_provider_->AuthorizeAlterTable(table_name, 
new_table, username,
                                                            username == owner),
                       resp, MasterErrorPB::NOT_AUTHORIZED);
@@ -2832,6 +2919,32 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
   string normalized_table_name = NormalizeTableName(l.data().name());
   *resp->mutable_table_id() = table->id();
 
+  // Modify the table limit.
+  if (table_limit_change) {
+    if (req.has_disk_size_limit()) {
+      if (req.disk_size_limit() == TableInfo::TABLE_WRITE_DEFAULT_LIMIT) {
+        l.mutable_data()->pb.clear_table_disk_size_limit();
+      } else if (req.disk_size_limit() >= 0) {
+        l.mutable_data()->pb.set_table_disk_size_limit(req.disk_size_limit());
+      } else {
+        return SetupError(Status::InvalidArgument("disk size limit must "
+            "be greater than or equal to -1"),
+            resp, MasterErrorPB::UNKNOWN_ERROR);
+      }
+    }
+    if (req.has_row_count_limit()) {
+      if (req.row_count_limit() == TableInfo::TABLE_WRITE_DEFAULT_LIMIT) {
+        l.mutable_data()->pb.clear_table_row_count_limit();
+      } else if (req.row_count_limit() >= 0) {
+        l.mutable_data()->pb.set_table_row_count_limit(req.row_count_limit());
+      } else {
+        return SetupError(Status::InvalidArgument("row count limit must "
+            "be greater than or equal to -1"),
+            resp, MasterErrorPB::UNKNOWN_ERROR);
+      }
+    }
+  }
+
   // 3. Calculate and validate new schema for the on-disk state, not persisted 
yet.
   Schema new_schema;
   ColumnId next_col_id = ColumnId(l.data().pb.next_column_id());
@@ -2946,7 +3059,8 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
   // or extra configuration has changed.
   bool has_metadata_changes = has_schema_changes ||
       req.has_new_table_name() || req.has_new_table_owner() ||
-      !req.new_extra_configs().empty();
+      !req.new_extra_configs().empty() || req.has_disk_size_limit() ||
+      req.has_row_count_limit();
   // Set to true if there are partitioning changes.
   bool has_partitioning_changes = !alter_partitioning_steps.empty();
   // Set to true if metadata changes need to be applied to existing tablets.
@@ -3174,6 +3288,11 @@ Status CatalogManager::GetTableSchema(const 
GetTableSchemaRequestPB* req,
                                                          *user == 
l.data().owner(),
                                                          schema_pb, 
&table_privilege),
                    resp, MasterErrorPB::UNKNOWN_ERROR));
+    if (FLAGS_enable_table_write_limit &&
+        PREDICT_FALSE(IsTableWriteDisabled(table, l.data().name()))) {
+      table_privilege.clear_insert_privilege();
+      table_privilege.clear_update_privilege();
+    }
     security::SignedTokenPB authz_token;
     RETURN_NOT_OK(token_signer->GenerateAuthzToken(
         *user, std::move(table_privilege), &authz_token));
@@ -3308,10 +3427,66 @@ Status CatalogManager::GetTableStatistics(const 
GetTableStatisticsRequestPB* req
       resp->set_live_row_count(table->GetMetrics()->live_row_count->value());
     }
   }
-
+  if (FLAGS_enable_table_write_limit) {
+    resp->set_disk_size_limit(l.data().pb.table_disk_size_limit());
+    resp->set_row_count_limit(l.data().pb.table_row_count_limit());
+  }
   return Status::OK();
 }
 
+bool CatalogManager::IsTableWriteDisabled(const scoped_refptr<TableInfo>& 
table,
+                                          const std::string& table_name) {
+  uint64_t table_disk_size = 0;
+  uint64_t table_rows = 0;
+  if (table->GetMetrics()->TableSupportsOnDiskSize()) {
+    table_disk_size = table->GetMetrics()->on_disk_size->value();
+  }
+  if (table->GetMetrics()->TableSupportsLiveRowCount()) {
+    table_rows = table->GetMetrics()->live_row_count->value();
+  }
+  bool disallow_write = false;
+  int64_t table_disk_size_limit = TableInfo::TABLE_WRITE_DEFAULT_LIMIT;
+  int64_t table_rows_limit = TableInfo::TABLE_WRITE_DEFAULT_LIMIT;
+  {
+    // Release the table_lock in time
+    TableMetadataLock table_lock(table.get(), LockMode::READ);
+    const auto& pb = table_lock.data().pb;
+
+    // If we are approaching the limit target of the table, we treat it
+    // as limit reached, because here depends on authz token to disable
+    // writing, and authz token has a fixed expiration time. We cannot
+    // disable write immediately.
+    if (pb.has_table_disk_size_limit()) {
+      table_disk_size_limit = pb.table_disk_size_limit();
+      if (static_cast<double>(table_disk_size) >=
+           (static_cast<double>(table_disk_size_limit) *
+            FLAGS_table_write_limit_ratio)) {
+        disallow_write = true;
+      }
+    }
+    if (!disallow_write && pb.has_table_row_count_limit()) {
+      table_rows_limit = pb.table_row_count_limit();
+      if (static_cast<double>(table_rows) >=
+              (static_cast<double>(table_rows_limit) *
+               FLAGS_table_write_limit_ratio)) {
+        disallow_write = true;
+      }
+    }
+  }
+
+  if (disallow_write) {
+    // The writing into the table is disallowed.
+    LOG(INFO) << Substitute("table $0 row count is $1, on disk size is $2, "
+                            "row count limit is $3, size limit is $4, writing 
is forbidden",
+                            table_name,
+                            table_rows,
+                            table_disk_size,
+                            table_rows_limit,
+                            table_disk_size_limit);
+  }
+  return disallow_write;
+}
+
 Status CatalogManager::GetTableInfo(const string& table_id, 
scoped_refptr<TableInfo> *table) {
   leader_lock_.AssertAcquiredForReading();
 
diff --git a/src/kudu/master/catalog_manager.h 
b/src/kudu/master/catalog_manager.h
index 73bcb0f..565ee6c 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -270,6 +270,14 @@ struct PersistentTableInfo {
 // spin-lock.
 class TableInfo : public RefCountedThreadSafe<TableInfo> {
  public:
+  enum {
+    // It is useful to place size limits on tables that may be ingested to
+    // by long-lived applications. To avoid breaking existing tables, there
+    // is no limit enforced.
+    // The default value -1 means no limit.
+    TABLE_WRITE_DEFAULT_LIMIT = -1L
+  };
+
   typedef PersistentTableInfo cow_state;
   typedef std::map<std::string, scoped_refptr<TabletInfo>> TabletInfoMap;
 
@@ -837,6 +845,11 @@ class CatalogManager : public 
tserver::TabletReplicaLookupIf {
   typedef std::unordered_map<std::string, scoped_refptr<TableInfo>> 
TableInfoMap;
   typedef std::unordered_map<std::string, scoped_refptr<TabletInfo>> 
TabletInfoMap;
 
+  // Check whether the table's write limit is reached,
+  // if true, the write permission should be disabled.
+  static bool IsTableWriteDisabled(const scoped_refptr<TableInfo>& table,
+                                   const std::string& table_name);
+
   // Delete the specified table in the catalog. If 'user' is provided,
   // checks that the user is authorized to delete the table. Otherwise,
   // it indicates its an internal operation (originates from catalog
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 51d57e6..f1e8a48 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -206,6 +206,11 @@ message SysTablesEntryPB {
   // The table type. If not set, it is assumed this table is a user-defined
   // table, rather than a system table.
   optional TableTypePB table_type = 14;
+
+  // Table disk size limit.
+  optional int64 table_disk_size_limit = 15;
+  // Table row count limit.
+  optional int64 table_row_count_limit = 16;
 }
 
 // The on-disk entry in the sys.catalog table ("metadata" column) to represent
@@ -595,6 +600,10 @@ message GetTableStatisticsResponsePB {
   // The table statistics from table metrics.
   optional int64 on_disk_size = 2;
   optional int64 live_row_count = 3;
+
+  // The table limit
+  optional int64 disk_size_limit = 4;
+  optional int64 row_count_limit = 5;
 }
 
 message GetTableLocationsRequestPB {
@@ -713,6 +722,9 @@ message AlterTableRequestPB {
   map<string, string> new_extra_configs = 6;
 
   optional string new_table_owner = 7;
+
+  optional int64 disk_size_limit = 8;
+  optional int64 row_count_limit = 9;
 }
 
 message AlterTableResponsePB {

Reply via email to