This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 3d496deec [www] add slow scans section 3d496deec is described below commit 3d496deec61b5d44ef133b553e282f5014a8b36f Author: kedeng <kdeng...@gmail.com> AuthorDate: Wed Dec 28 15:25:39 2022 +0800 [www] add slow scans section We can get the history of completed scans from /scans page right now. But it is not easy to distinguish slow ones among those. I introduced an extra section to the scan page to show slow scans separately. A scan is called 'slow' if it takes more time than defined by --slow_scanner_threshold_ms. The number of elements in the slow scans history is limited by --slow_scan_history_count. Change-Id: Ibd1dcac8b81c5eefd306e7020c9c52b3f28e603c Reviewed-on: http://gerrit.cloudera.org:8080/19392 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <ale...@apache.org> --- src/kudu/client/scan_token-test.cc | 73 ++++++++++- src/kudu/tserver/scanners.cc | 211 ++++++++++++++++++++++-------- src/kudu/tserver/scanners.h | 50 +++++-- src/kudu/tserver/tablet_server-test.cc | 8 +- src/kudu/tserver/tablet_server.cc | 6 +- src/kudu/tserver/tserver_path_handlers.cc | 64 +++++---- www/scans.mustache | 69 +++++++++- 7 files changed, 379 insertions(+), 102 deletions(-) diff --git a/src/kudu/client/scan_token-test.cc b/src/kudu/client/scan_token-test.cc index 1cf0ccde0..d46bfc12b 100644 --- a/src/kudu/client/scan_token-test.cc +++ b/src/kudu/client/scan_token-test.cc @@ -73,6 +73,8 @@ #include "kudu/util/test_util.h" DECLARE_bool(tserver_enforce_access_control); +DECLARE_int32(scanner_inject_latency_on_each_batch_ms); +DECLARE_int32(slow_scanner_threshold_ms); METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableSchema); METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableLocations); @@ -85,6 +87,7 @@ using kudu::master::CatalogManager; using kudu::master::TabletInfo; using kudu::tablet::TabletReplica; using kudu::tserver::MiniTabletServer; +using kudu::tserver::SharedScanDescriptor; using std::atomic; using std::map; using std::string; @@ -328,7 +331,7 @@ class ScanTokenTest : public KuduTest { void GetScannerCount(map<string, int32_t>* scanner_count_by_ts_uuid) { scanner_count_by_ts_uuid->clear(); for (int i = 0; i < cluster_->num_tablet_servers(); i++) { - vector<tserver::ScanDescriptor> scanners = + vector<SharedScanDescriptor> scanners = cluster_->mini_tablet_server(i)->server()->scanner_manager()->ListScans(); scanner_count_by_ts_uuid->insert( {cluster_->mini_tablet_server(i)->server()->instance_pb().permanent_uuid(), @@ -336,6 +339,20 @@ class ScanTokenTest : public KuduTest { } } + uint32_t GetSlowScansCount() { + // Just use for the cluster with only 1 tserver. + vector<SharedScanDescriptor> scans = + cluster_->mini_tablet_server(0)->server()->scanner_manager()->ListSlowScans(); + return scans.size(); + } + + uint32_t GetCompletedScansCount() { + // Just use for the cluster with only 1 tserver. + vector<SharedScanDescriptor> scans = + cluster_->mini_tablet_server(0)->server()->scanner_manager()->ListScans(); + return scans.size(); + } + KuduSchema GetTokenProjectionSchema(const KuduScanToken& token) { string serialized_token; CHECK_OK(token.Serialize(&serialized_token)); @@ -349,6 +366,60 @@ class ScanTokenTest : public KuduTest { unique_ptr<InternalMiniCluster> cluster_; }; +TEST_F(ScanTokenTest, SlowScansListTest) { + constexpr const char* const kTableName = "slow_scans_show"; + // Create schema + KuduSchema schema; + { + KuduSchemaBuilder builder; + builder.AddColumn("key")->NotNull()->Type(KuduColumnSchema::INT64)->PrimaryKey(); + ASSERT_OK(builder.Build(&schema)); + } + + // Create table + shared_ptr<KuduTable> table; + ASSERT_OK(CreateAndOpenTable(kTableName, schema, &table)); + + // Only 1 tserver is OK. + ASSERT_EQ(1, cluster_->num_tablet_servers()); + + // Create session + shared_ptr<KuduSession> session = client_->NewSession(); + session->SetTimeoutMillis(10000); + ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND)); + + // Insert rows + for (int i = 0; i < 200; i++) { + unique_ptr<KuduInsert> insert(table->NewInsert()); + ASSERT_OK(insert->mutable_row()->SetInt64("key", i)); + ASSERT_OK(session->Apply(insert.release())); + } + ASSERT_OK(session->Flush()); + + { + vector<KuduScanToken*> tokens; + ElementDeleter deleter(&tokens); + ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens)); + + ASSERT_EQ(200, CountRows(tokens)); + ASSERT_EQ(1, GetCompletedScansCount()); + ASSERT_EQ(0, GetSlowScansCount()); + } + + { + vector<KuduScanToken*> tokens; + ElementDeleter deleter(&tokens); + ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens)); + + // Create a slow scan scenarios. + FLAGS_scanner_inject_latency_on_each_batch_ms = 50; + FLAGS_slow_scanner_threshold_ms = 40; + ASSERT_EQ(200, CountRows(tokens)); + ASSERT_EQ(2, GetCompletedScansCount()); + ASSERT_EQ(1, GetSlowScansCount()); + } +} + TEST_F(ScanTokenTest, TestScanTokens) { // Create schema KuduSchema schema; diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc index 30bf4f7be..d68aa161b 100644 --- a/src/kudu/tserver/scanners.cc +++ b/src/kudu/tserver/scanners.cc @@ -56,14 +56,27 @@ DEFINE_int32(scanner_ttl_ms, 60000, TAG_FLAG(scanner_ttl_ms, advanced); TAG_FLAG(scanner_ttl_ms, runtime); -DEFINE_int32(scanner_gc_check_interval_us, 5 * 1000L *1000L, // 5 seconds +DEFINE_int32(scanner_gc_check_interval_us, 5 * 1000L * 1000L, // 5 seconds "Number of microseconds in the interval at which we remove expired scanners"); TAG_FLAG(scanner_gc_check_interval_us, hidden); -DEFINE_int32(scan_history_count, 20, - "Number of completed scans to keep history for. Determines how many historical " - "scans will be shown on the tablet server's scans dashboard."); -TAG_FLAG(scan_history_count, experimental); +DEFINE_int32(completed_scan_history_count, 10, + "Number of latest scans to keep history for. Determines how many historical " + "latest scans will be shown on the tablet server's scans dashboard."); +TAG_FLAG(completed_scan_history_count, experimental); + +// TODO(kedeng) : Add flag to control the display of slow scans, and avoid full scanning +// affecting normal Kudu service without perception. +DEFINE_int32(slow_scanner_threshold_ms, 60 * 1000L, // 1 minute + "Number of milliseconds for the threshold of slow scan."); +TAG_FLAG(slow_scanner_threshold_ms, advanced); +TAG_FLAG(slow_scanner_threshold_ms, runtime); + +DEFINE_int32(slow_scan_history_count, 10, + "Number of slow scans to keep history for. Determines how many historical " + "slow scans will be shown on the tablet server's scans dashboard. The " + "threshold for a slow scan is defined with --slow_scanner_threshold_ms."); +TAG_FLAG(slow_scan_history_count, experimental); DECLARE_int32(rpc_default_keepalive_time_ms); @@ -108,7 +121,8 @@ namespace tserver { ScannerManager::ScannerManager(const scoped_refptr<MetricEntity>& metric_entity) : shutdown_(false), shutdown_cv_(&shutdown_lock_), - completed_scans_offset_(0) { + completed_scans_offset_(0), + slow_scans_offset_(0) { if (metric_entity) { metrics_.reset(new ScannerMetrics(metric_entity)); METRIC_active_scanners.InstantiateFunctionGauge( @@ -119,8 +133,12 @@ ScannerManager::ScannerManager(const scoped_refptr<MetricEntity>& metric_entity) scanner_maps_.push_back(new ScannerMapStripe()); } - if (FLAGS_scan_history_count > 0) { - completed_scans_.reserve(FLAGS_scan_history_count); + if (FLAGS_completed_scan_history_count > 0) { + completed_scans_.reserve(FLAGS_completed_scan_history_count); + } + + if (FLAGS_slow_scan_history_count > 0) { + slow_scans_.reserve(FLAGS_slow_scan_history_count); } } @@ -136,14 +154,14 @@ ScannerManager::~ScannerManager() { STLDeleteElements(&scanner_maps_); } -Status ScannerManager::StartRemovalThread() { - RETURN_NOT_OK(Thread::Create("scanners", "removal_thread", - [this]() { this->RunRemovalThread(); }, +Status ScannerManager::StartCollectAndRemovalThread() { + RETURN_NOT_OK(Thread::Create("scanners", "collect_and_removal_thread", + [this]() { this->RunCollectAndRemovalThread(); }, &removal_thread_)); return Status::OK(); } -void ScannerManager::RunRemovalThread() { +void ScannerManager::RunCollectAndRemovalThread() { while (true) { // Loop until we are shutdown. { @@ -153,6 +171,7 @@ void ScannerManager::RunRemovalThread() { } shutdown_cv_.WaitFor(MonoDelta::FromMicroseconds(FLAGS_scanner_gc_check_interval_us)); } + CollectSlowScanners(); RemoveExpiredScanners(); } } @@ -205,7 +224,7 @@ Status ScannerManager::LookupScanner(const string& scanner_id, } bool ScannerManager::UnregisterScanner(const string& scanner_id) { - ScanDescriptor descriptor; + SharedScanDescriptor descriptor; ScannerMapStripe& stripe = GetStripeByScannerId(scanner_id); { std::lock_guard<RWMutex> l(stripe.lock_); @@ -217,7 +236,7 @@ bool ScannerManager::UnregisterScanner(const string& scanner_id) { bool is_initted = it->second->is_initted(); if (is_initted) { descriptor = it->second->Descriptor(); - descriptor.state = it->second->iter()->HasNext() ? ScanState::kFailed : ScanState::kComplete; + descriptor->state = it->second->iter()->HasNext() ? ScanState::kFailed : ScanState::kComplete; } stripe.scanners_by_id_.erase(it); if (!is_initted) { @@ -225,8 +244,19 @@ bool ScannerManager::UnregisterScanner(const string& scanner_id) { } } - std::lock_guard<RWMutex> l(completed_scans_lock_); - RecordCompletedScanUnlocked(std::move(descriptor)); + { + std::lock_guard<percpu_rwlock> l(completed_scans_lock_); + RecordCompletedScanUnlocked(descriptor); + } + + { + const MonoTime start_time = descriptor->start_time; + if (start_time + MonoDelta::FromMilliseconds(FLAGS_slow_scanner_threshold_ms) + < MonoTime::Now()) { + std::lock_guard<percpu_rwlock> l(slow_scans_lock_); + RecordSlowScanUnlocked(descriptor); + } + } return true; } @@ -248,46 +278,110 @@ void ScannerManager::ListScanners(std::vector<SharedScanner>* scanners) const { } } -vector<ScanDescriptor> ScannerManager::ListScans() const { - unordered_map<string, ScanDescriptor> scans; +vector<SharedScanDescriptor> ScannerManager::ListScans() const { + unordered_map<string, SharedScanDescriptor> scans; for (const ScannerMapStripe* stripe : scanner_maps_) { shared_lock<RWMutex> l(stripe->lock_); for (const auto& se : stripe->scanners_by_id_) { if (se.second->is_initted()) { - ScanDescriptor desc = se.second->Descriptor(); - desc.state = ScanState::kActive; + SharedScanDescriptor desc = se.second->Descriptor(); + desc->state = ScanState::kActive; EmplaceOrDie(&scans, se.first, std::move(desc)); } } } { - shared_lock<RWMutex> l(completed_scans_lock_); + kudu::shared_lock<rw_spinlock> l(completed_scans_lock_.get_lock()); // A scanner in 'scans' may have completed between the above loop and here. // As we'd rather have the finalized descriptor of the completed scan, // update over the old descriptor in this case. for (const auto& scan : completed_scans_) { - InsertOrUpdate(&scans, scan.scanner_id, scan); + InsertOrUpdate(&scans, scan->scanner_id, scan); } } - vector<ScanDescriptor> ret; + vector<SharedScanDescriptor> ret; ret.reserve(scans.size()); AppendValuesFromMap(scans, &ret); // Sort oldest to newest, so that the ordering is consistent across calls. - std::sort(ret.begin(), ret.end(), [] (const ScanDescriptor& a, const ScanDescriptor& b) { - return a.start_time > b.start_time; + std::sort(ret.begin(), ret.end(), [] (const SharedScanDescriptor& a, + const SharedScanDescriptor& b) { + return a->start_time > b->start_time; }); return ret; } +vector<SharedScanDescriptor> ScannerManager::ListSlowScans() const { + // Get all the scans first. + unordered_map<string, SharedScanDescriptor> scans; + { + kudu::shared_lock<rw_spinlock> l(slow_scans_lock_.get_lock()); + for (const auto& scan : slow_scans_) { + InsertOrUpdate(&scans, scan->scanner_id, scan); + } + } + + vector<SharedScanDescriptor> ret; + ret.reserve(scans.size()); + AppendValuesFromMap(scans, &ret); + + // Sort oldest to newest, so that the ordering is consistent across calls. + std::sort(ret.begin(), ret.end(), [] (const SharedScanDescriptor& a, + const SharedScanDescriptor& b) { + return a->start_time > b->start_time; + }); + + return ret; +} + +void ScannerManager::CollectSlowScanners() { + const MonoTime now = MonoTime::Now(); + + vector<SharedScanDescriptor> descriptors; + int32_t slow_scanner_threshold = FLAGS_slow_scanner_threshold_ms; + for (ScannerMapStripe* stripe : scanner_maps_) { + std::lock_guard<RWMutex> l(stripe->lock_); + for (auto it = stripe->scanners_by_id_.begin(); it != stripe->scanners_by_id_.end(); ++it) { + const SharedScanner& scanner = it->second; + if (!scanner->is_initted()) { + // Ignore uninitialized scans. + continue; + } + const MonoTime start_time = scanner->start_time(); + if (start_time + MonoDelta::FromMilliseconds(slow_scanner_threshold) >= now) { + continue; + } + + MonoDelta delta_time = now - start_time - + MonoDelta::FromMilliseconds(slow_scanner_threshold); + // TODO(kedeng) : Add flag to control whether to print this log. + LOG(INFO) << Substitute( + "Slow scanner id: $0, of tablet $1, " + "exceed the time threshold $2 ms for $3 ms.", + it->first, + scanner->tablet_id(), + slow_scanner_threshold, + delta_time.ToMilliseconds()); + descriptors.emplace_back(scanner->Descriptor()); + } + } + + std::lock_guard<percpu_rwlock> l(slow_scans_lock_); + for (auto& descriptor : descriptors) { + if (std::find(slow_scans_.begin(), slow_scans_.end(), descriptor) == slow_scans_.end()) { + RecordSlowScanUnlocked(descriptor); + } + } +} + void ScannerManager::RemoveExpiredScanners() { MonoDelta scanner_ttl = MonoDelta::FromMilliseconds(FLAGS_scanner_ttl_ms); const MonoTime now = MonoTime::Now(); - vector<ScanDescriptor> descriptors; + vector<SharedScanDescriptor> descriptors; for (ScannerMapStripe* stripe : scanner_maps_) { std::lock_guard<RWMutex> l(stripe->lock_); for (auto it = stripe->scanners_by_id_.begin(); it != stripe->scanners_by_id_.end();) { @@ -316,28 +410,39 @@ void ScannerManager::RemoveExpiredScanners() { } } - std::lock_guard<RWMutex> l(completed_scans_lock_); + std::lock_guard<percpu_rwlock> l(completed_scans_lock_); for (auto& descriptor : descriptors) { - descriptor.last_access_time = now; - descriptor.state = ScanState::kExpired; - RecordCompletedScanUnlocked(std::move(descriptor)); + descriptor->last_access_time = now; + descriptor->state = ScanState::kExpired; + RecordCompletedScanUnlocked(descriptor); } } -void ScannerManager::RecordCompletedScanUnlocked(ScanDescriptor descriptor) { - if (completed_scans_.capacity() == 0) { +void ScannerManager::CircularUpdateRecordInFifo(vector<SharedScanDescriptor>& scans_vec, + size_t& scans_offset, + const SharedScanDescriptor& descriptor) { + if (scans_vec.capacity() == 0) { return; } - if (completed_scans_.size() == completed_scans_.capacity()) { - completed_scans_[completed_scans_offset_++] = std::move(descriptor); - if (completed_scans_offset_ == completed_scans_.capacity()) { - completed_scans_offset_ = 0; + + if (scans_vec.size() == scans_vec.capacity()) { + scans_vec[scans_offset++] = descriptor; + if (scans_offset == scans_vec.capacity()) { + scans_offset = 0; } } else { - completed_scans_.emplace_back(std::move(descriptor)); + scans_vec.emplace_back(descriptor); } } +void ScannerManager::RecordCompletedScanUnlocked(const SharedScanDescriptor& descriptor) { + return CircularUpdateRecordInFifo(completed_scans_, completed_scans_offset_, descriptor); +} + +void ScannerManager::RecordSlowScanUnlocked(const SharedScanDescriptor& descriptor) { + return CircularUpdateRecordInFifo(slow_scans_, slow_scans_offset_, descriptor); +} + const std::string Scanner::kNullTabletId = "null tablet"; Scanner::Scanner(string id, const scoped_refptr<TabletReplica>& tablet_replica, @@ -407,39 +512,39 @@ IteratorStats Scanner::UpdateStatsAndGetDelta() { return delta_stats; } -ScanDescriptor Scanner::Descriptor() const { +SharedScanDescriptor Scanner::Descriptor() const { // Ignore non-initialized scans. The initializing state is transient, and // handling it correctly is complicated. Since the scanner is initialized we // can assume iter_, spec_, and client_projection_schema_ are valid // pointers. CHECK(is_initted()); - ScanDescriptor descriptor; - descriptor.tablet_id = tablet_id(); - descriptor.scanner_id = id(); - descriptor.remote_user = remote_user(); - descriptor.start_time = start_time_; + SharedScanDescriptor descriptor(new ScanDescriptor); + descriptor->tablet_id = tablet_id(); + descriptor->scanner_id = id(); + descriptor->remote_user = remote_user(); + descriptor->start_time = start_time_; for (const auto& column : client_projection_schema_->columns()) { - descriptor.projected_columns.emplace_back(column.name()); + descriptor->projected_columns.emplace_back(column.name()); } const auto& tablet_metadata = tablet_replica_->tablet_metadata(); - descriptor.table_name = tablet_metadata->table_name(); + descriptor->table_name = tablet_metadata->table_name(); SchemaPtr schema_ptr = tablet_metadata->schema(); if (spec().lower_bound_key()) { - descriptor.predicates.emplace_back( + descriptor->predicates.emplace_back( Substitute("PRIMARY KEY >= $0", KUDU_REDACT( spec().lower_bound_key()->Stringify(*schema_ptr)))); } if (spec().exclusive_upper_bound_key()) { - descriptor.predicates.emplace_back( + descriptor->predicates.emplace_back( Substitute("PRIMARY KEY < $0", KUDU_REDACT( spec().exclusive_upper_bound_key()->Stringify(*schema_ptr)))); } for (const auto& predicate : spec().predicates()) { - descriptor.predicates.emplace_back(predicate.second.ToString()); + descriptor->predicates.emplace_back(predicate.second.ToString()); } vector<IteratorStats> iterator_stats; @@ -447,13 +552,13 @@ ScanDescriptor Scanner::Descriptor() const { DCHECK_EQ(iterator_stats.size(), iter_->schema().num_columns()); for (int col_idx = 0; col_idx < iterator_stats.size(); col_idx++) { - descriptor.iterator_stats.emplace_back(iter_->schema().column(col_idx).name(), - iterator_stats[col_idx]); + descriptor->iterator_stats.emplace_back(iter_->schema().column(col_idx).name(), + iterator_stats[col_idx]); } - descriptor.last_call_seq_id = ANNOTATE_UNPROTECTED_READ(call_seq_id_); - descriptor.last_access_time = last_access_time_.load(std::memory_order_relaxed); - descriptor.cpu_times = cpu_times(); + descriptor->last_call_seq_id = ANNOTATE_UNPROTECTED_READ(call_seq_id_); + descriptor->last_access_time = last_access_time_.load(std::memory_order_relaxed); + descriptor->cpu_times = cpu_times(); return descriptor; } diff --git a/src/kudu/tserver/scanners.h b/src/kudu/tserver/scanners.h index 5fa2c1a15..531a33f68 100644 --- a/src/kudu/tserver/scanners.h +++ b/src/kudu/tserver/scanners.h @@ -38,6 +38,7 @@ #include "kudu/tablet/tablet_replica.h" #include "kudu/tserver/tserver.pb.h" #include "kudu/util/condition_variable.h" +#include "kudu/util/locks.h" #include "kudu/util/memory/arena.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" @@ -62,6 +63,7 @@ struct ScanDescriptor; struct ScannerMetrics; typedef std::shared_ptr<Scanner> SharedScanner; +typedef scoped_refptr<ScanDescriptor> SharedScanDescriptor; // Manages the live scanners within a Tablet Server. // @@ -76,8 +78,8 @@ class ScannerManager { explicit ScannerManager(const scoped_refptr<MetricEntity>& metric_entity); ~ScannerManager(); - // Starts the expired scanner removal thread. - Status StartRemovalThread(); + // Starts the slow scans collect and the expired scanner removal thread. + Status StartCollectAndRemovalThread(); // Create a new scanner with a unique ID, inserting it into the map. Further // lookups for the scanner must provide the username associated with @@ -111,11 +113,21 @@ class ScannerManager { void ListScanners(std::vector<SharedScanner>* scanners) const; // List active and recently completed scans. - std::vector<ScanDescriptor> ListScans() const; + std::vector<SharedScanDescriptor> ListScans() const; + + // List recent slow scans. + // A scan is 'slow' if it takes more than --slow_scanner_threshold_ms to + // complete. + // The number of elements in the result vector is limited by + // --slow_scan_history_count. + std::vector<SharedScanDescriptor> ListSlowScans() const; // Iterate through scanners and remove any which are past their TTL. void RemoveExpiredScanners(); + // Collect slow scanners whose scan times exceed the threshold. + void CollectSlowScanners(); + private: FRIEND_TEST(ScannerTest, TestExpire); @@ -132,13 +144,23 @@ class ScannerManager { ScannerMap scanners_by_id_; }; - // Periodically call RemoveExpiredScanners(). - void RunRemovalThread(); + // Periodically call CollectSlowScanners() and RemoveExpiredScanners(). + void RunCollectAndRemovalThread(); ScannerMapStripe& GetStripeByScannerId(const std::string& scanner_id); // Adds the scan descriptor to the completed scans FIFO. - void RecordCompletedScanUnlocked(ScanDescriptor descriptor); + void RecordCompletedScanUnlocked(const SharedScanDescriptor& descriptor); + + // Adds the scan descriptor to the slow scans FIFO. + void RecordSlowScanUnlocked(const SharedScanDescriptor& descriptor); + + // Update the record in the vector with the fifo method. + // If the container is full when updating, update the data inserted first. + // The number of elements in the vector is init at the vector initialization. + static void CircularUpdateRecordInFifo(std::vector<SharedScanDescriptor>& scans_vec, + size_t& scans_offset, + const SharedScanDescriptor& descriptor); // (Optional) scanner metrics for this instance. std::unique_ptr<ScannerMetrics> metrics_; @@ -152,10 +174,15 @@ class ScannerManager { std::vector<ScannerMapStripe*> scanner_maps_; // completed_scans_ is a FIFO ring buffer of completed scans. - mutable RWMutex completed_scans_lock_; - std::vector<ScanDescriptor> completed_scans_; + mutable percpu_rwlock completed_scans_lock_; + std::vector<SharedScanDescriptor> completed_scans_; size_t completed_scans_offset_; + // slow_scans_ is a FIFO ring buffer of slow scans. + mutable percpu_rwlock slow_scans_lock_; + std::vector<SharedScanDescriptor> slow_scans_; + size_t slow_scans_offset_; + // Generator for scanner IDs. ObjectIdGenerator oid_generator_; @@ -358,7 +385,7 @@ class Scanner { // Does not require the AccessLock. // // REQUIRES: is_initted() must be true. - ScanDescriptor Descriptor() const; + SharedScanDescriptor Descriptor() const; // Returns the amount of CPU time accounted to this scanner. // Does not require the AccessLock. @@ -456,7 +483,10 @@ enum class ScanState { // ScanDescriptor holds information about a scan. The ScanDescriptor can outlive // the associated scanner without holding open any of the scanner's resources. -struct ScanDescriptor { +// +// These are ref-counted so that ScanDescriptor is copyable, with this we can avoid +// duplicating data for elements in slow_scans_ and completed_scans_ in ScannerManager. +struct ScanDescriptor : public RefCountedThreadSafe<ScanDescriptor> { // The tablet ID. std::string tablet_id; // The scanner ID. diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc index 9d2787505..7ddbb3836 100644 --- a/src/kudu/tserver/tablet_server-test.cc +++ b/src/kudu/tserver/tablet_server-test.cc @@ -3195,10 +3195,10 @@ TEST_F(TabletServerTest, TestScanWithSimplifiablePredicates) { auto scan_descriptors = mini_server_->server()->scanner_manager()->ListScans(); ASSERT_EQ(1, projection.columns().size()); ASSERT_EQ(1, scan_descriptors.size()); - ASSERT_EQ(projection.columns().size(), scan_descriptors[0].projected_columns.size()); - ASSERT_EQ(2, scan_descriptors[0].predicates.size()); - ASSERT_EQ(projection.columns().size(), scan_descriptors[0].iterator_stats.size()); - ASSERT_EQ(projection.column(0).name(), scan_descriptors[0].iterator_stats[0].first); + ASSERT_EQ(projection.columns().size(), scan_descriptors[0]->projected_columns.size()); + ASSERT_EQ(2, scan_descriptors[0]->predicates.size()); + ASSERT_EQ(projection.columns().size(), scan_descriptors[0]->iterator_stats.size()); + ASSERT_EQ(projection.column(0).name(), scan_descriptors[0]->iterator_stats[0].first); } // Drain all the rows from the scanner. diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc index 43f60140a..694a4710f 100644 --- a/src/kudu/tserver/tablet_server.cc +++ b/src/kudu/tserver/tablet_server.cc @@ -20,6 +20,8 @@ #include <functional> #include <memory> #include <ostream> +#include <type_traits> +#include <unordered_set> #include <utility> #include <vector> @@ -117,8 +119,8 @@ Status TabletServer::Init() { RETURN_NOT_OK_PREPEND(tablet_manager_->Init(start_tablets, tablets_processed, total_tablets), "Could not init Tablet Manager"); - RETURN_NOT_OK_PREPEND(scanner_manager_->StartRemovalThread(), - "Could not start expired Scanner removal thread"); + RETURN_NOT_OK_PREPEND(scanner_manager_->StartCollectAndRemovalThread(), + "Could not start slow scans collect and expired Scanner removal thread"); state_ = kInitialized; return Status::OK(); diff --git a/src/kudu/tserver/tserver_path_handlers.cc b/src/kudu/tserver/tserver_path_handlers.cc index abfaef3f6..37e925650 100644 --- a/src/kudu/tserver/tserver_path_handlers.cc +++ b/src/kudu/tserver/tserver_path_handlers.cc @@ -96,8 +96,8 @@ using std::string; using std::vector; using strings::Substitute; -DECLARE_int32(scan_history_count); - +DECLARE_int32(completed_scan_history_count); +DECLARE_int32(slow_scan_history_count); namespace kudu { namespace tserver { @@ -504,29 +504,29 @@ const char* ScanStateToString(const ScanState& scan_state) { } // Formats the scan descriptor's pseudo-SQL query string as HTML. -string ScanQueryHtml(const ScanDescriptor& scan) { +string ScanQueryHtml(const SharedScanDescriptor& scan) { string query = "<b>SELECT</b> "; - if (scan.projected_columns.empty()) { + if (scan->projected_columns.empty()) { query.append("COUNT(*)"); } else { - query.append(JoinMapped(scan.projected_columns, EscapeForHtmlToString, ",<br> ")); + query.append(JoinMapped(scan->projected_columns, EscapeForHtmlToString, ",<br> ")); } query.append("<br> <b>FROM</b> "); - if (scan.table_name.empty()) { + if (scan->table_name.empty()) { query.append("<unknown>"); } else { - query.append(EscapeForHtmlToString(scan.table_name)); + query.append(EscapeForHtmlToString(scan->table_name)); } - if (!scan.predicates.empty()) { + if (!scan->predicates.empty()) { query.append("<br> <b>WHERE</b> "); - query.append(JoinMapped(scan.predicates, EscapeForHtmlToString, "<br> <b>AND</b> ")); + query.append(JoinMapped(scan->predicates, EscapeForHtmlToString, "<br> <b>AND</b> ")); } return query; } -void IteratorStatsToJson(const ScanDescriptor& scan, EasyJson* json) { +void IteratorStatsToJson(const SharedScanDescriptor& scan, EasyJson* json) { auto fill_stats = [] (EasyJson& row, const string& column, const IteratorStats& stats) { row["column"] = column; @@ -541,7 +541,7 @@ void IteratorStatsToJson(const ScanDescriptor& scan, EasyJson* json) { }; IteratorStats total_stats; - for (const auto& column : scan.iterator_stats) { + for (const auto& column : scan->iterator_stats) { EasyJson row = json->PushBack(EasyJson::kObject); fill_stats(row, column.first, column.second); total_stats += column.second; @@ -551,28 +551,28 @@ void IteratorStatsToJson(const ScanDescriptor& scan, EasyJson* json) { fill_stats(total_row, "total", total_stats); } -void ScanToJson(const ScanDescriptor& scan, EasyJson* json) { +void ScanToJson(const SharedScanDescriptor& scan, EasyJson* json) { MonoTime now = MonoTime::Now(); MonoDelta duration; - if (scan.state == ScanState::kActive) { - duration = now - scan.start_time; + if (scan->state == ScanState::kActive) { + duration = now - scan->start_time; } else { - duration = scan.last_access_time - scan.start_time; + duration = scan->last_access_time - scan->start_time; } - MonoDelta time_since_start = now - scan.start_time; + MonoDelta time_since_start = now - scan->start_time; - json->Set("tablet_id", scan.tablet_id); - json->Set("scanner_id", scan.scanner_id); - json->Set("state", ScanStateToString(scan.state)); + json->Set("tablet_id", scan->tablet_id); + json->Set("scanner_id", scan->scanner_id); + json->Set("state", ScanStateToString(scan->state)); json->Set("query", ScanQueryHtml(scan)); - json->Set("requestor", scan.remote_user.username()); + json->Set("requestor", scan->remote_user.username()); json->Set("duration", HumanReadableElapsedTime::ToShortString(duration.ToSeconds())); - json->Set("num_round_trips", scan.last_call_seq_id); + json->Set("num_round_trips", scan->last_call_seq_id); json->Set("time_since_start", HumanReadableElapsedTime::ToShortString(time_since_start.ToSeconds())); - const auto& cpu_times = scan.cpu_times; + const auto& cpu_times = scan->cpu_times; json->Set("wall_secs", HumanReadableElapsedTime::ToShortString(cpu_times.wall_seconds())); json->Set("user_secs", @@ -598,14 +598,24 @@ const char* kLongTimingTitle = "wall time, user cpu time, and system cpu time " void TabletServerPathHandlers::HandleScansPage(const Webserver::WebRequest& /*req*/, Webserver::WebResponse* resp) { EasyJson* output = &resp->output; - (*output)["scan_history_count"] = FLAGS_scan_history_count; + (*output)["completed_scan_history_count"] = FLAGS_completed_scan_history_count; output->Set("timing_title", kLongTimingTitle); - EasyJson scans = output->Set("scans", EasyJson::kArray); - vector<ScanDescriptor> descriptors = tserver_->scanner_manager()->ListScans(); + EasyJson completed_scans = output->Set("completed_scans", EasyJson::kArray); + vector<SharedScanDescriptor> descriptors = tserver_->scanner_manager()->ListScans(); for (const auto& descriptor : descriptors) { - EasyJson scan = scans.PushBack(EasyJson::kObject); - ScanToJson(descriptor, &scan); + EasyJson completed_scan = completed_scans.PushBack(EasyJson::kObject); + ScanToJson(descriptor, &completed_scan); + } + + (*output)["slow_scan_history_count"] = FLAGS_slow_scan_history_count; + EasyJson slow_scans = output->Set("slow_scans", EasyJson::kArray); + vector<SharedScanDescriptor> slow_descriptors = + tserver_->scanner_manager()->ListSlowScans(); + + for (const auto& slow_descriptor : slow_descriptors) { + EasyJson slow_scan = slow_scans.PushBack(EasyJson::kObject); + ScanToJson(slow_descriptor, &slow_scan); } } diff --git a/www/scans.mustache b/www/scans.mustache index 064ec09a6..f567f9629 100644 --- a/www/scans.mustache +++ b/www/scans.mustache @@ -17,9 +17,9 @@ specific language governing permissions and limitations under the License. }}{{#raw}}{{{.}}}{{/raw}}{{^raw}} -<h1>Scans</h1> -<p class="lead">This page lists all running scans and last {{scan_history_count}} completed scans. -The number of historical scans is configurable with the <samp>--scan_history_count</samp> flag.</p> +<h1>Completed Scans</h1> +<p class="lead">This table lists all running scans and last {{completed_scan_history_count}} completed scans. +The number of historical completed scans is configurable with the <samp>--completed_scan_history_count</samp> flag.</p> <table class="table table-striped"> <thead> <tr> @@ -36,7 +36,7 @@ The number of historical scans is configurable with the <samp>--scan_history_cou </tr> </thead> <tbody> - {{#scans}} + {{#completed_scans}} <tr> <td><a href="{{base_url}}/tablet?id={{tablet_id}}"><samp>{{tablet_id}}</samp></a></td> <td><samp>{{scanner_id}}</samp></td> @@ -72,7 +72,66 @@ The number of historical scans is configurable with the <samp>--scan_history_cou </table> </td> </tr> - {{/scans}} + {{/completed_scans}} + </tbody> +</table> + +<h1>Slow Scans</h1> +<p class="lead">This table lists all slow scans and last {{slow_scan_history_count}} slow scans. +The number of historical slow scans is configurable with the <samp>--slow_scan_history_count</samp> flag.</p> +<table class="table table-striped"> + <thead> + <tr> + <th>Tablet id</th> + <th>Scanner id</th> + <th>State</th> + <th title="pseudo-SQL query description">Query</th> + <th>Requestor</th> + <th title="amount of time that the scanner was/has been open on the server">Duration</th> + <th title="number of round trips">Round trips</th> + <th title="elapsed time since the scan started">Time since start</th> + <th title="{{timing_title}}">Timing</th> + <th>Column Stats</th> + </tr> + </thead> + <tbody> + {{#slow_scans}} + <tr> + <td><a href="{{base_url}}/tablet?id={{tablet_id}}"><samp>{{tablet_id}}</samp></a></td> + <td><samp>{{scanner_id}}</samp></td> + <td>{{state}}</td> + {{! The query string is pre-formatted HTML, so don't escape it (triple-brace). }} + <td><pre>{{{query}}}</pre></td> + <td><samp>{{requestor}}</samp></td> + <td title="{{duration_title}}">{{duration}}</td> + <td>{{num_round_trips}}</td> + <td title="{{time_since_start_title}}">{{time_since_start}}</td> + <td>real: {{wall_secs}} user: {{user_secs}} sys: {{sys_secs}}</td> + + <td> + <table class="table table-striped"> + <thead> + <tr> + <th>column</th> + <th title="cells read from the column (disk or cache), exclusive of the MRS">cells read</th> + <th title="bytes read from the column (disk or cache), exclusive of the MRS">bytes read</th> + <th title="CFile data blocks read from the column (disk or cache)">blocks read</th> + </tr> + </thead> + <tbody> + {{#stats}} + <tr> + <td>{{column}}</td> + <td title="{{cells_read_title}}">{{cells_read}}</td> + <td title="{{bytes_read_title}}">{{bytes_read}}</td> + <td title="{{blocks_read_title}}">{{blocks_read}}</td> + </tr> + {{/stats}} + </tbody> + </table> + </td> + </tr> + {{/slow_scans}} </tbody> </table> {{/raw}}