Repository: kudu Updated Branches: refs/heads/master b5f3d1a10 -> 5f1ca32f3
http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/master/hms_notification_log_listener.h ---------------------------------------------------------------------- diff --git a/src/kudu/master/hms_notification_log_listener.h b/src/kudu/master/hms_notification_log_listener.h new file mode 100644 index 0000000..c4e152d --- /dev/null +++ b/src/kudu/master/hms_notification_log_listener.h @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> +#include <vector> + +#include "kudu/gutil/port.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/condition_variable.h" +#include "kudu/util/mutex.h" +#include "kudu/util/status.h" +#include "kudu/util/status_callback.h" + +namespace hive { +class NotificationEvent; +} + +namespace kudu { + +class MonoTime; +class Thread; + +namespace master { + +class CatalogManager; + +// A CatalogManager background task which listens for events occurring in the +// Hive Metastore, and synchronizes the Kudu catalog accordingly. +// +// As a background task, the lifetime of an instance of this class must be less +// than the catalog manager it belongs to. +// +// The notification log listener task continuously wakes up according to its +// configured poll period, however it performs no work when the master is a +// follower. +// +// When a change to the Kudu catalog is performed in response to a notification +// log event, the corresponding event ID is recorded in the sys catalog as the +// latest handled event. This ensures that masters do not double-apply +// notification events as leadership changes. +// +// The notification log listener listens for two types of events on Kudu tables: +// +// - ALTER TABLE RENAME +// Table rename is a special case of ALTER TABLE. The notification log +// listener listens for rename event notifications for Kudu tables, and +// renames the corresponding Kudu table. See below for why renames can be +// applied back to Kudu, but not other types of alterations. +// +// - DROP TABLE +// The notification log listener listens for drop table events for Kudu +// tables, and drops the corresponding Kudu table. This allows the catalogs +// to stay synchronized when DROP TABLE and DROP DATABASE CASCADE Hive +// commands are executed. +// +// The notification log listener can support renaming and dropping tables in a +// safe manner because the Kudu table ID is stored in the HMS table entry. Using +// the Kudu table ID, the exact table which the event applies to can always be +// identified. For other changes made in ALTER TABLE statements, such as ALTER +// TABLE DROP COLUMN, there is no way to identify with certainty which column +// has been dropped, since we do not store column IDs in the HMS table entries. +class HmsNotificationLogListenerTask { + public: + + explicit HmsNotificationLogListenerTask(CatalogManager* catalog_manager); + ~HmsNotificationLogListenerTask(); + + // Initializes the HMS notification log listener. When invoking this method, + // the catalog manager must be in the process of initializing. + Status Init() WARN_UNUSED_RESULT; + + // Shuts down the HMS notification log listener. This must be called before + // shutting down the catalog manager. + void Shutdown(); + + // Waits for the notification log listener to process the latest notification + // log event. + // + // Note: an error will be returned if the listener is unable to retrieve the + // latest notifications from the HMS. If individual notifications are unable + // to be processed, no error will be returned. + Status WaitForCatchUp(const MonoTime& deadline) WARN_UNUSED_RESULT; + + private: + + // Runs the main loop of the listening thread. + void RunLoop(); + + // Polls the Hive Metastore for notification events, and handle them. + Status Poll(); + + // Handles an ALTER TABLE event. Must only be called on the listening thread. + // + // The event is parsed, and if it is a rename table event for a Kudu table, + // the table is renamed in the local catalog. All other events are ignored. + Status HandleAlterTableEvent(const hive::NotificationEvent& event, + int64_t* durable_event_id) WARN_UNUSED_RESULT; + + // Handles a DROP TABLE event. Must only be called on the listening thread. + // + // The event is parsed, and if it is a drop table event for a Kudu table, the + // table is deleted in the local catalog. All other events are ignored. + Status HandleDropTableEvent(const hive::NotificationEvent& event, + int64_t* durable_event_id) WARN_UNUSED_RESULT; + + // The associated catalog manager. + // + // May be initialized to nullptr in the constructor to facilitate unit + // testing. In this case all interactions with the catalog manager and HMS + // are skipped. + CatalogManager* catalog_manager_; + + // The listening thread. + scoped_refptr<kudu::Thread> thread_; + + // Protects access to fields below. + mutable Mutex lock_; + + // Set to true if the task is in the process of shutting down. + // + // Protected by lock_. + bool closing_; + + // Manages waking the notification log listener thread when the catalog + // manager needs to ensure that all recent notification log events have been + // handled. + // + // Protected by lock_. + ConditionVariable wake_up_cv_; + + // Queue of callbacks to execute when the notification log listener is caught + // up. These callbacks enable the catalog manager to wait for the notification + // log listener to have processed the latest events before proceeding with + // metadata ops involving the HMS table namespace. + // + // Protected by lock_. + std::vector<StatusCallback> catch_up_callbacks_; +}; + +} // namespace master +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/master/master.proto ---------------------------------------------------------------------- diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto index 17fc02f..1b59ef0 100644 --- a/src/kudu/master/master.proto +++ b/src/kudu/master/master.proto @@ -204,6 +204,12 @@ message SysTskEntryPB { required security.TokenSigningPrivateKeyPB tsk = 1; } +// The on-disk entry in the sys.catalog table ("metadata" column) to represent +// the latest processed Hive Metastore notification log event ID. +message SysNotificationLogEventIdPB { + optional int64 latest_notification_log_event_id = 1; +} + //////////////////////////////////////////////////////////// // RPCs //////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/master/sys_catalog.cc ---------------------------------------------------------------------- diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc index 3cc8aa4..dd36fae 100644 --- a/src/kudu/master/sys_catalog.cc +++ b/src/kudu/master/sys_catalog.cc @@ -134,6 +134,8 @@ const char* const SysCatalogTable::kSysCertAuthorityEntryId = "root-ca-info"; const char* const SysCatalogTable::kInjectedFailureStatusMsg = "INJECTED FAILURE"; +const char* const SysCatalogTable::kLatestNotificationLogEntryIdRowId = + "latest_notification_log_entry_id"; namespace { @@ -506,6 +508,10 @@ Status SysCatalogTable::Write(const Actions& actions) { ReqUpdateTablets(&req, actions.tablets_to_update); ReqDeleteTablets(&req, actions.tablets_to_delete); + if (actions.hms_notification_log_event_id) { + ReqSetNotificationLogEventId(&req, *actions.hms_notification_log_event_id); + } + if (req.row_operations().rows().empty()) { // No actual changes were written (i.e the data to be updated matched the // previous version of the data). @@ -648,6 +654,24 @@ Status SysCatalogTable::VisitTskEntries(TskEntryVisitor* visitor) { return ProcessRows<SysTskEntryPB, TSK_ENTRY>(processor); } +Status SysCatalogTable::GetLatestNotificationLogEventId(int64_t* event_id) { + TRACE_EVENT0("master", "SysCatalogTable::GetLatestNotificationLogEventId"); + + *event_id = -1; + auto processor = [&](const string& entry_id, const SysNotificationLogEventIdPB& entry_data) { + if (entry_id != kLatestNotificationLogEntryIdRowId) { + // This is not the row we're looking for. + return Status::OK(); + } + DCHECK(entry_data.has_latest_notification_log_event_id()); + DCHECK(entry_data.latest_notification_log_event_id() >= 0); + *event_id = entry_data.latest_notification_log_event_id(); + return Status::OK(); + }; + + return ProcessRows<SysNotificationLogEventIdPB, HMS_NOTIFICATION_LOG>(processor); +} + Status SysCatalogTable::GetCertAuthorityEntry(SysCertAuthorityEntryPB* entry) { CHECK(entry); vector<SysCertAuthorityEntryPB> entries; @@ -789,6 +813,20 @@ void SysCatalogTable::ReqDeleteTablets(WriteRequestPB* req, } } +void SysCatalogTable::ReqSetNotificationLogEventId(WriteRequestPB* req, int64_t event_id) { + SysNotificationLogEventIdPB pb; + pb.set_latest_notification_log_event_id(event_id); + faststring metadata_buf; + pb_util::SerializeToString(pb, &metadata_buf); + + KuduPartialRow row(&schema_); + RowOperationsPBEncoder enc(req->mutable_row_operations()); + CHECK_OK(row.SetInt8(kSysCatalogTableColType, HMS_NOTIFICATION_LOG)); + CHECK_OK(row.SetStringNoCopy(kSysCatalogTableColId, kLatestNotificationLogEntryIdRowId)); + CHECK_OK(row.SetStringNoCopy(kSysCatalogTableColMetadata, metadata_buf)); + enc.Add(RowOperationsPB::UPSERT, row); +} + Status SysCatalogTable::VisitTablets(TabletVisitor* visitor) { TRACE_EVENT0("master", "SysCatalogTable::VisitTablets"); auto processor = [&]( http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/master/sys_catalog.h ---------------------------------------------------------------------- diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h index 8aa92eb..2b0f56f 100644 --- a/src/kudu/master/sys_catalog.h +++ b/src/kudu/master/sys_catalog.h @@ -23,12 +23,14 @@ #include <string> #include <vector> +#include <boost/optional/optional.hpp> #include <gtest/gtest_prod.h> #include "kudu/common/schema.h" #include "kudu/consensus/metadata.pb.h" #include "kudu/gutil/callback.h" #include "kudu/gutil/macros.h" +#include "kudu/gutil/port.h" #include "kudu/gutil/ref_counted.h" #include "kudu/master/catalog_manager.h" #include "kudu/tablet/tablet_replica.h" @@ -97,6 +99,7 @@ class TskEntryVisitor { // * root CA (certificate authority) certificate of the Kudu IPKI // * Kudu IPKI root CA cert's private key // * TSK (Token Signing Key) entries +// * Latest handled Hive Metastore notification log event ID // // The essential properties of the SysCatalogTable are: // * SysCatalogTable has only one tablet. @@ -111,6 +114,9 @@ class SysCatalogTable { // There should be no more than one entry of this type in the system table. static const char* const kSysCertAuthorityEntryId; + // The row ID of the latest notification log entry in the sys catalog table. + static const char* const kLatestNotificationLogEntryIdRowId; + typedef Callback<Status()> ElectedLeaderCallback; enum CatalogEntryType { @@ -118,6 +124,7 @@ class SysCatalogTable { TABLETS_ENTRY = 2, CERT_AUTHORITY_INFO = 3, // Kudu's root certificate authority entry. TSK_ENTRY = 4, // Token Signing Key entry. + HMS_NOTIFICATION_LOG = 5, // HMS notification log latest event ID. }; // 'leader_cb_' is invoked whenever this node is elected as a leader @@ -152,6 +159,7 @@ class SysCatalogTable { std::vector<scoped_refptr<TabletInfo>> tablets_to_add; std::vector<scoped_refptr<TabletInfo>> tablets_to_update; std::vector<scoped_refptr<TabletInfo>> tablets_to_delete; + boost::optional<int64_t> hms_notification_log_event_id; }; Status Write(const Actions& actions); @@ -164,6 +172,9 @@ class SysCatalogTable { // Scan for TSK-related entries in the system table. Status VisitTskEntries(TskEntryVisitor* visitor); + // Get the latest processed HMS notification log event ID. + Status GetLatestNotificationLogEventId(int64_t* event_id) WARN_UNUSED_RESULT; + // Retrive the CA entry (private key and certificate) from the system table. Status GetCertAuthorityEntry(SysCertAuthorityEntryPB* entry); @@ -253,6 +264,9 @@ class SysCatalogTable { void ReqDeleteTablets(tserver::WriteRequestPB* req, const std::vector<scoped_refptr<TabletInfo>>& tablets); + // Overwrite (upsert) the latest event ID in the table with the provided ID. + void ReqSetNotificationLogEventId(tserver::WriteRequestPB* req, int64_t event_id); + static std::string TskSeqNumberToEntryId(int64_t seq_number); // Special string injected into SyncWrite() random failures (if enabled). http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/util/test_util.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/test_util.cc b/src/kudu/util/test_util.cc index f612393..c960441 100644 --- a/src/kudu/util/test_util.cc +++ b/src/kudu/util/test_util.cc @@ -419,9 +419,10 @@ Status WaitForBind(pid_t pid, uint16_t* port, const char* kind, MonoDelta timeou // The first line is the pid. We ignore it. // The second line is the file descriptor number. We ignore it. // The third line has the bind address and port. + // Subsequent lines show active connections. vector<string> lines = strings::Split(lsof_out, "\n"); int32_t p = -1; - if (lines.size() != 3 || + if (lines.size() < 3 || lines[2].substr(0, 3) != "n*:" || !safe_strto32(lines[2].substr(3), &p) || p <= 0) {