This is an automated email from the ASF dual-hosted git repository. bbender pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push: new 78d2fbf GEODE-8887: Refactor EventIdTSS class (#733) 78d2fbf is described below commit 78d2fbf67e81152b1a61a4ac411a30121e9f87ce Author: Blake Bender <bben...@pivotal.io> AuthorDate: Fri Jan 29 10:15:56 2021 -0800 GEODE-8887: Refactor EventIdTSS class (#733) - Use Meyers singleton pattern for global singleton (threadId), and thread local singleton EventIdTSS - rename all member variables in EventId and EventIdTSS classes - clean up logic slightly around global threadId atomic var - rename EventId getter methods for clarity --- cppcache/src/EventId.cpp | 140 +++++++++++++++++++++++++++----------------- cppcache/src/EventId.hpp | 30 ++++++---- cppcache/src/EventIdMap.cpp | 4 +- 3 files changed, 105 insertions(+), 69 deletions(-) diff --git a/cppcache/src/EventId.cpp b/cppcache/src/EventId.cpp index a6db548..62c0539 100644 --- a/cppcache/src/EventId.cpp +++ b/cppcache/src/EventId.cpp @@ -17,82 +17,97 @@ #include "EventId.hpp" #include <atomic> +#include <cstdint> #include <cstring> #include <geode/DataInput.hpp> #include "ClientProxyMembershipID.hpp" +#include "util/Log.hpp" namespace apache { namespace geode { namespace client { -class EventIdTSS { - private: - static std::atomic<int64_t> s_eidThrId; - - int64_t m_eidThrTSS; - int64_t m_eidSeqTSS; +class ThreadIdCounter { + public: + static std::atomic<int64_t>& instance() { + static std::atomic<int64_t> threadId_(0); + return threadId_; + } - ~EventIdTSS() = default; - EventIdTSS(const EventIdTSS&) = delete; - EventIdTSS& operator=(const EventIdTSS&) = delete; + static int64_t next() { return ++instance(); } +}; +class EventIdTSS { public: - // this should get called just once per thread due to first access to TSS - EventIdTSS() { - m_eidThrTSS = ++s_eidThrId; - m_eidSeqTSS = 0; + static EventIdTSS& instance() { + thread_local EventIdTSS eventId_; + return eventId_; } - inline int64_t getEidThr() { return m_eidThrTSS; } + int64_t nextSequenceId() { + sequenceId_++; + return sequenceId_; + } - inline int64_t getAndIncEidSeq() { return m_eidSeqTSS++; } + int64_t currentSequenceId() { return sequenceId_; } - inline int64_t getSeqNum() { return m_eidSeqTSS - 1; } + int64_t threadId() { return threadId_; } - static thread_local EventIdTSS s_eventId; + private: + EventIdTSS(); -}; // class EventIdTSS + int64_t threadId_; + int64_t sequenceId_; +}; -std::atomic<int64_t> EventIdTSS::s_eidThrId; -thread_local EventIdTSS EventIdTSS::s_eventId; +EventIdTSS::EventIdTSS() : threadId_(ThreadIdCounter::next()), sequenceId_(0) { + LOGDEBUG("EventIdTSS::EventIdTSS(%p): threadId_=%" PRId64 + ", sequenceId_=%" PRId64, + this, threadId_, sequenceId_); +} void EventId::toData(DataOutput& output) const { // This method is always expected to write out nonstatic distributed - // memberid. - output.writeBytes(reinterpret_cast<const int8_t*>(m_eidMem), m_eidMemLen); + // memberid. Note that binary representation of EventId is NOT THE + // SAME here as when serialized into part of a message (via the writeIdsData + // method). + LOGDEBUG("EventId::toData(%p) - called", this); + output.writeBytes(reinterpret_cast<const int8_t*>(clientId_), + clientIdLength_); output.writeArrayLen(18); char longCode = 3; output.write(static_cast<uint8_t>(longCode)); - output.writeInt(m_eidThr); + output.writeInt(threadId_); output.write(static_cast<uint8_t>(longCode)); - output.writeInt(m_eidSeq); - output.writeInt(m_bucketId); - output.write(m_breadcrumbCounter); + output.writeInt(sequenceId_); + output.writeInt(bucketId_); + output.write(breadcrumbCounter_); } void EventId::fromData(DataInput& input) { - // TODO: statics being assigned; not thread-safe?? - m_eidMemLen = input.readArrayLength(); - input.readBytesOnly(reinterpret_cast<int8_t*>(m_eidMem), m_eidMemLen); - input.readArrayLength(); // ignore arrayLen - m_eidThr = getEventIdData(input, input.read()); - m_eidSeq = getEventIdData(input, input.read()); - m_bucketId = input.readInt32(); - m_breadcrumbCounter = input.read(); + LOGDEBUG("EventId::fromData(%p) - called", this); + clientIdLength_ = input.readArrayLength(); + input.readBytesOnly(reinterpret_cast<int8_t*>(clientId_), clientIdLength_); + input.readArrayLength(); + threadId_ = getEventIdData(input, input.read()); + sequenceId_ = getEventIdData(input, input.read()); + bucketId_ = input.readInt32(); + breadcrumbCounter_ = input.read(); } -const char* EventId::getMemId() const { return m_eidMem; } +const char* EventId::clientId() const { return clientId_; } -int32_t EventId::getMemIdLen() const { return m_eidMemLen; } +int32_t EventId::clientIdLength() const { return clientIdLength_; } -int64_t EventId::getThrId() const { return m_eidThr; } +int64_t EventId::threadId() const { return threadId_; } -int64_t EventId::getSeqNum() const { return m_eidSeq; } +int64_t EventId::sequenceNumber() const { return sequenceId_; } int64_t EventId::getEventIdData(DataInput& input, char numberCode) { int64_t retVal = 0; + LOGDEBUG("EventId::getEventIdData(%p) - called", this); // Read number based on numeric code written by java server. if (numberCode == 0) { @@ -113,19 +128,23 @@ int64_t EventId::getEventIdData(DataInput& input, char numberCode) { } std::shared_ptr<Serializable> EventId::createDeserializable() { - return std::make_shared<EventId>(false); + LOGDEBUG("EventId::createDeserializable - called"); // use false since we dont want to inc sequence // (for de-serialization) + return std::make_shared<EventId>(false); } EventId::EventId(char* memId, uint32_t memIdLen, int64_t thr, int64_t seq) { + LOGDEBUG("EventId::EventId(%p) - memId=%s, memIdLen=%d, thr=%" PRId64 + ", seq=%" PRId64, + this, memId, memIdLen, thr, seq); // TODO: statics being assigned; not thread-safe?? - std::memcpy(m_eidMem, memId, memIdLen); - m_eidMemLen = memIdLen; - m_eidThr = thr; - m_eidSeq = seq; - m_bucketId = -1; - m_breadcrumbCounter = 0; + std::memcpy(clientId_, memId, memIdLen); + clientIdLength_ = memIdLen; + threadId_ = thr; + sequenceId_ = seq; + bucketId_ = -1; + breadcrumbCounter_ = 0; } EventId::EventId(bool doInit, uint32_t reserveSize, @@ -133,11 +152,16 @@ EventId::EventId(bool doInit, uint32_t reserveSize, : /* adongre * CID 28934: Uninitialized scalar field (UNINIT_CTOR) */ - m_eidMemLen(0), - m_eidThr(0), - m_eidSeq(0), - m_bucketId(-1), - m_breadcrumbCounter(0) { + clientIdLength_(0), + threadId_(0), + sequenceId_(0), + bucketId_(-1), + breadcrumbCounter_(0) { + LOGDEBUG( + "EventId::EventId(%p) - doInit=%s, reserveSize=%d, " + "fullValueAfterDeltaFail=%s", + this, doInit ? "true" : "false", reserveSize, + fullValueAfterDeltaFail ? "true" : "false"); if (!doInit) return; if (fullValueAfterDeltaFail) { @@ -148,18 +172,24 @@ EventId::EventId(bool doInit, uint32_t reserveSize, } for (uint32_t i = 0; i < reserveSize; i++) { - EventIdTSS::s_eventId.getAndIncEidSeq(); + EventIdTSS::instance().nextSequenceId(); } } void EventId::initFromTSS() { - m_eidThr = EventIdTSS::s_eventId.getEidThr(); - m_eidSeq = EventIdTSS::s_eventId.getAndIncEidSeq(); + threadId_ = EventIdTSS::instance().threadId(); + sequenceId_ = EventIdTSS::instance().nextSequenceId(); + LOGDEBUG("EventId::initFromTSS(%p) - called, tid=%" PRId64 ", seqid=%" PRId64, + this, threadId_, sequenceId_); } void EventId::initFromTSS_SameThreadIdAndSameSequenceId() { - m_eidThr = EventIdTSS::s_eventId.getEidThr(); - m_eidSeq = EventIdTSS::s_eventId.getSeqNum(); + threadId_ = EventIdTSS::instance().threadId(); + sequenceId_ = EventIdTSS::instance().currentSequenceId(); + LOGDEBUG( + "EventId::initFromTSS_SameThreadIdAndSameSequenceId(%p) - called, " + "tid=%" PRId64 ", seqid=%" PRId64, + this, threadId_, sequenceId_); } } // namespace client diff --git a/cppcache/src/EventId.hpp b/cppcache/src/EventId.hpp index 19828e7..e301e4a 100644 --- a/cppcache/src/EventId.hpp +++ b/cppcache/src/EventId.hpp @@ -20,12 +20,16 @@ #ifndef GEODE_EVENTID_H_ #define GEODE_EVENTID_H_ +#include <inttypes.h> + #include <string> #include <geode/DataOutput.hpp> #include <geode/internal/DataSerializableFixedId.hpp> #include <geode/internal/geode_globals.hpp> +#include "util/Log.hpp" + /** @file */ @@ -42,21 +46,21 @@ using internal::DSFid; class APACHE_GEODE_EXPORT EventId : public internal::DataSerializableFixedId_t<DSFid::EventId> { private: - char m_eidMem[512]; - int32_t m_eidMemLen; - int64_t m_eidThr; - int64_t m_eidSeq; - int32_t m_bucketId; - int8_t m_breadcrumbCounter; + char clientId_[512]; + int32_t clientIdLength_; + int64_t threadId_; + int64_t sequenceId_; + int32_t bucketId_; + int8_t breadcrumbCounter_; public: /** *@brief Accessor methods **/ - const char* getMemId() const; - int32_t getMemIdLen() const; - int64_t getThrId() const; - int64_t getSeqNum() const; + const char* clientId() const; + int32_t clientIdLength() const; + int64_t threadId() const; + int64_t sequenceNumber() const; void toData(DataOutput& output) const override; @@ -96,9 +100,11 @@ class APACHE_GEODE_EXPORT EventId output.write(static_cast<uint8_t>(0)); char longCode = 3; output.write(static_cast<uint8_t>(longCode)); - output.writeInt(m_eidThr); + output.writeInt(threadId_); output.write(static_cast<uint8_t>(longCode)); - output.writeInt(m_eidSeq); + output.writeInt(sequenceId_); + LOGDEBUG("%s(%p): Wrote tid=%" PRId64 ", seqid=%" PRId64, __FUNCTION__, + this, threadId_, sequenceId_); } /** Constructor, given the values. */ diff --git a/cppcache/src/EventIdMap.cpp b/cppcache/src/EventIdMap.cpp index 6fe069e..ec83b84 100644 --- a/cppcache/src/EventIdMap.cpp +++ b/cppcache/src/EventIdMap.cpp @@ -35,8 +35,8 @@ void EventIdMap::clear() { EventIdMapEntry EventIdMap::make(std::shared_ptr<EventId> eventid) { auto sid = std::make_shared<EventSource>( - eventid->getMemId(), eventid->getMemIdLen(), eventid->getThrId()); - auto seq = std::make_shared<EventSequence>(eventid->getSeqNum()); + eventid->clientId(), eventid->clientIdLength(), eventid->threadId()); + auto seq = std::make_shared<EventSequence>(eventid->sequenceNumber()); return std::make_pair(sid, seq); }