See <https://ci.trafficserver.apache.org/job/clang-format/42/changes>
Changes: [jplev] clang-format and move copyrights to NOTICES for RAFT code. ------------------------------------------ [...truncated 5348 lines...] - replicas_.clear(); - for (auto &n : config_.node()) - if (n != node_) { - other_nodes_.insert(n); - other_config_nodes_.insert(n); - } - for (auto &n : pending_config_.node()) - if (n != node_) - other_config_nodes_.insert(n); - replicas_.insert(config_.replica().begin(), config_.replica().end()); - replicas_.insert(pending_config_.replica().begin(), pending_config_.replica().end()); - replicas_.insert(other_nodes_.begin(), other_nodes_.end()); - replicas_.insert(other_config_nodes_.begin(), other_config_nodes_.end()); - ::std::string old_leader = leader_; - if (!other_nodes_.size()) - leader_ = node_; - else if (!i_am_in_nodes() && other_nodes_.size() == 1) - leader_ = *other_nodes_.begin(); - else if (leader_ == node_ && !i_am_in_nodes()) - leader_ = ""; - return leader_ != old_leader; - } + bool + ConfigChanged() + { // Returns: true if the leader_ changed. + other_nodes_.clear(); + other_config_nodes_.clear(); + replicas_.clear(); + for (auto &n : config_.node()) + if (n != node_) { + other_nodes_.insert(n); + other_config_nodes_.insert(n); + } + for (auto &n : pending_config_.node()) + if (n != node_) + other_config_nodes_.insert(n); + replicas_.insert(config_.replica().begin(), config_.replica().end()); + replicas_.insert(pending_config_.replica().begin(), pending_config_.replica().end()); + replicas_.insert(other_nodes_.begin(), other_nodes_.end()); + replicas_.insert(other_config_nodes_.begin(), other_config_nodes_.end()); + ::std::string old_leader = leader_; + if (!other_nodes_.size()) + leader_ = node_; + else if (!i_am_in_nodes() && other_nodes_.size() == 1) + leader_ = *other_nodes_.begin(); + else if (leader_ == node_ && !i_am_in_nodes()) + leader_ = ""; + return leader_ != old_leader; + } - bool - SendReplicationMessage(const ::std::string &n, const LogEntry &entry, NodeState *s) - { - Message m(InitializeMessage()); - m.mutable_entry()->CopyFrom(entry); - if (!server_->SendMessage(this, n, m)) - return false; - s->sent_index = entry.index() + entry.extent(); - s->sent_term = entry.term(); - return true; - } + bool + SendReplicationMessage(const ::std::string &n, const LogEntry &entry, NodeState *s) + { + Message m(InitializeMessage()); + m.mutable_entry()->CopyFrom(entry); + if (!server_->SendMessage(this, n, m)) + return false; + s->sent_index = entry.index() + entry.extent(); + s->sent_term = entry.term(); + return true; + } - void - Replicate(const ::std::string &n, bool heartbeat) - { - bool sent = false; - auto &s = node_state_[n]; - if (s.term == term_) { // Replica has acknowledged me as leader. - int64_t end = index_; - if (waiting_commits_.size()) - end = waiting_commits_.front()->index() - 1; - while (s.sent_index < end) { // Get from server. - LogEntry entry; - server_->GetLogEntry(this, s.sent_term, s.sent_index + 1, end, &entry); - if (!entry.has_term()) { - // A summary log entry from the server with historical information. - entry.set_term(last_log_term_); - entry.set_index(s.sent_index + 1); - } - entry.set_previous_log_term(s.sent_term); - entry.set_previous_log_index(s.sent_index); - assert(entry.index() > s.sent_index); - int64_t x = s.sent_index; - if (!SendReplicationMessage(n, entry, &s)) - break; - assert(s.sent_index > x); - sent = true; - } - for (auto &e : waiting_commits_) { - if (e->index() <= s.sent_index) // Skip those already sent. - continue; - if (!SendReplicationMessage(n, *e, &s)) - break; - sent = true; + void + Replicate(const ::std::string &n, bool heartbeat) + { + bool sent = false; + auto &s = node_state_[n]; + if (s.term == term_) { // Replica has acknowledged me as leader. + int64_t end = index_; + if (waiting_commits_.size()) + end = waiting_commits_.front()->index() - 1; + while (s.sent_index < end) { // Get from server. + LogEntry entry; + server_->GetLogEntry(this, s.sent_term, s.sent_index + 1, end, &entry); + if (!entry.has_term()) { + // A summary log entry from the server with historical information. + entry.set_term(last_log_term_); + entry.set_index(s.sent_index + 1); } + entry.set_previous_log_term(s.sent_term); + entry.set_previous_log_index(s.sent_index); + assert(entry.index() > s.sent_index); + int64_t x = s.sent_index; + if (!SendReplicationMessage(n, entry, &s)) + break; + assert(s.sent_index > x); + sent = true; } - if (heartbeat && !sent) { - Message m(InitializeMessage()); - server_->SendMessage(this, n, m); + for (auto &e : waiting_commits_) { + if (e->index() <= s.sent_index) // Skip those already sent. + continue; + if (!SendReplicationMessage(n, *e, &s)) + break; + sent = true; } } - - void - ReplicateAll(bool heartbeat) - { - for (auto &n : replicas_) - Replicate(n, heartbeat); - } - - bool - i_am_leader() - { - return node_ == leader_; - } - bool - i_am_in_nodes() - { - auto &n = config_.node(); - return std::find(n.begin(), n.end(), node_) != n.end(); + if (heartbeat && !sent) { + Message m(InitializeMessage()); + server_->SendMessage(this, n, m); } + } - Server *const server_; - struct drand48_data rand_; - const ::std::string node_; - int64_t term_ = 0; // Current term. - int64_t last_log_term_ = -1; // Term of last log entry this node has. - int64_t index_ = 0; // Index of last log entry this node has. - int64_t config_committed_ = -1; - int64_t data_committed_ = -1; - int64_t last_log_committed_index_ = -1; - int64_t last_log_committed_term_ = -1; - double election_timeout_ = 1.0; - double last_heartbeat_ = -1.0e10; - double last_heartbeat_sent_ = -1.0e10; - double random_election_delay_ = 0.0; - ::std::string leader_; // The current leader. "" if there is no leader. - ::std::string vote_; // My vote this term. - Config config_; - Config pending_config_; - ::std::map< ::std::string, NodeState> node_state_; - ::std::deque<std::unique_ptr<LogEntry> > waiting_commits_; - bool seen_term_ = true; - // Cached values. - ::std::set< ::std::string> other_nodes_; // Nodes required for consensus on log entries. - ::std::set< ::std::string> other_config_nodes_; // Nodes required for config changes. - ::std::set< ::std::string> replicas_; // All nodes receiving the replication stream. - }; + void + ReplicateAll(bool heartbeat) + { + for (auto &n : replicas_) + Replicate(n, heartbeat); + } - template <typename Server> Raft<Server> *NewRaft(Server * server, const ::std::string &node) + bool + i_am_leader() { - return new RaftImpl<Server>(server, node); + return node_ == leader_; } + bool + i_am_in_nodes() + { + auto &n = config_.node(); + return std::find(n.begin(), n.end(), node_) != n.end(); + } + + Server *const server_; + struct drand48_data rand_; + const ::std::string node_; + int64_t term_ = 0; // Current term. + int64_t last_log_term_ = -1; // Term of last log entry this node has. + int64_t index_ = 0; // Index of last log entry this node has. + int64_t config_committed_ = -1; + int64_t data_committed_ = -1; + int64_t last_log_committed_index_ = -1; + int64_t last_log_committed_term_ = -1; + double election_timeout_ = 1.0; + double last_heartbeat_ = -1.0e10; + double last_heartbeat_sent_ = -1.0e10; + double random_election_delay_ = 0.0; + ::std::string leader_; // The current leader. "" if there is no leader. + ::std::string vote_; // My vote this term. + Config config_; + Config pending_config_; + ::std::map< ::std::string, NodeState> node_state_; + ::std::deque<std::unique_ptr<LogEntry> > waiting_commits_; + bool seen_term_ = true; + // Cached values. + ::std::set< ::std::string> other_nodes_; // Nodes required for consensus on log entries. + ::std::set< ::std::string> other_config_nodes_; // Nodes required for config changes. + ::std::set< ::std::string> replicas_; // All nodes receiving the replication stream. +}; + +template <typename Server> +Raft<Server> * +NewRaft(Server *server, const ::std::string &node) +{ + return new RaftImpl<Server>(server, node); +} } // namespace raft #endif // CONSENSUS_IMPL_H_ Build step 'Execute shell' marked build as failure