http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/LocalRegion.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/LocalRegion.cpp b/src/cppcache/src/LocalRegion.cpp index 2e465cb..1f31cad 100644 --- a/src/cppcache/src/LocalRegion.cpp +++ b/src/cppcache/src/LocalRegion.cpp @@ -14,15 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include <vector> #include "LocalRegion.hpp" #include <geode/Log.hpp> #include <geode/SystemProperties.hpp> +#include <geode/PoolManager.hpp> + #include "CacheImpl.hpp" #include "CacheRegionHelper.hpp" #include "CacheableToken.hpp" #include "Utils.hpp" - #include "EntryExpiryHandler.hpp" #include "RegionExpiryHandler.hpp" #include "ExpiryTaskManager.hpp" @@ -30,8 +32,7 @@ #include "RegionGlobalLocks.hpp" #include "TXState.hpp" #include "VersionTag.hpp" -#include <vector> -#include <geode/PoolManager.hpp> +#include "statistics/StatisticsManager.hpp" namespace apache { namespace geode { @@ -40,8 +41,9 @@ namespace client { LocalRegion::LocalRegion(const std::string& name, CacheImpl* cache, const RegionInternalPtr& rPtr, const RegionAttributesPtr& attributes, - const CacheStatisticsPtr& stats, bool shared) - : RegionInternal(attributes), + const CacheStatisticsPtr& stats, bool shared, + bool enableTimeStatistics) + : RegionInternal(cache->getCache()->shared_from_this(), attributes), m_name(name), m_parentRegion(rPtr), m_cacheImpl(cache), @@ -55,7 +57,8 @@ LocalRegion::LocalRegion(const std::string& name, CacheImpl* cache, m_transactionEnabled(false), m_isPRSingleHopEnabled(false), m_attachedPool(nullptr), - m_persistenceManager(nullptr) { + m_persistenceManager(nullptr), + m_enableTimeStatistics(enableTimeStatistics) { if (m_parentRegion != nullptr) { ((m_fullPath = m_parentRegion->getFullPath()) += "/") += m_name; } else { @@ -83,8 +86,12 @@ LocalRegion::LocalRegion(const std::string& name, CacheImpl* cache, (m_fullPath = "/") += m_name; } - m_regionStats = new RegionStats(m_fullPath.c_str()); - PoolPtr p = PoolManager::find(getAttributes()->getPoolName()); + m_regionStats = new RegionStats(cache->getDistributedSystem() + .getStatisticsManager() + ->getStatisticsFactory(), + m_fullPath); + PoolPtr p = + cache->getCache()->getPoolManager().find(getAttributes()->getPoolName()); // m_attachedPool = p; setPool(p); } @@ -121,11 +128,8 @@ void LocalRegion::updateAccessAndModifiedTime(bool modified) { CacheStatisticsPtr LocalRegion::getStatistics() const { CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getStatistics); bool m_statisticsEnabled = true; - SystemProperties* props = - m_cacheImpl->getCache()->getDistributedSystem()->getSystemProperties(); - if (props) { - m_statisticsEnabled = props->statisticsEnabled(); - } + auto& props = m_cacheImpl->getDistributedSystem().getSystemProperties(); + m_statisticsEnabled = props.statisticsEnabled(); if (!m_statisticsEnabled) { throw StatisticsDisabledException( "LocalRegion::getStatistics statistics disabled for this region"); @@ -307,11 +311,10 @@ void LocalRegion::getEntry(const CacheableKeyPtr& key, CacheablePtr& valuePtr) { CacheablePtr LocalRegion::get(const CacheableKeyPtr& key, const UserDataPtr& aCallbackArgument) { CacheablePtr rptr; - int64_t sampleStartNanos = Utils::startStatOpTime(); + int64_t sampleStartNanos = startStatOpTime(); GfErrType err = getNoThrow(key, rptr, aCallbackArgument); - Utils::updateStatOpTime(m_regionStats->getStat(), - RegionStatType::getInstance()->getGetTimeId(), - sampleStartNanos); + updateStatOpTime(m_regionStats->getStat(), m_regionStats->getGetTimeId(), + sampleStartNanos); // rptr = handleReplay(err, rptr); @@ -323,13 +326,12 @@ CacheablePtr LocalRegion::get(const CacheableKeyPtr& key, void LocalRegion::put(const CacheableKeyPtr& key, const CacheablePtr& value, const UserDataPtr& aCallbackArgument) { CacheablePtr oldValue; - int64_t sampleStartNanos = Utils::startStatOpTime(); + int64_t sampleStartNanos = startStatOpTime(); VersionTagPtr versionTag; GfErrType err = putNoThrow(key, value, aCallbackArgument, oldValue, -1, CacheEventFlags::NORMAL, versionTag); - Utils::updateStatOpTime(m_regionStats->getStat(), - RegionStatType::getInstance()->getPutTimeId(), - sampleStartNanos); + updateStatOpTime(m_regionStats->getStat(), m_regionStats->getPutTimeId(), + sampleStartNanos); // handleReplay(err, nullptr); GfErrTypeToException("Region::put", err); } @@ -351,11 +353,10 @@ void LocalRegion::putAll(const HashMapOfCacheable& map, uint32_t timeout, "Region::putAll: timeout parameter " "greater than maximum allowed (2^31/1000 i.e 2147483)."); } - int64_t sampleStartNanos = Utils::startStatOpTime(); + int64_t sampleStartNanos = startStatOpTime(); GfErrType err = putAllNoThrow(map, timeout, aCallbackArgument); - Utils::updateStatOpTime(m_regionStats->getStat(), - RegionStatType::getInstance()->getPutAllTimeId(), - sampleStartNanos); + updateStatOpTime(m_regionStats->getStat(), m_regionStats->getPutAllTimeId(), + sampleStartNanos); // handleReplay(err, nullptr); GfErrTypeToException("Region::putAll", err); } @@ -365,11 +366,10 @@ void LocalRegion::removeAll(const VectorOfCacheableKey& keys, if (keys.size() == 0) { throw IllegalArgumentException("Region::removeAll: zero keys provided"); } - int64_t sampleStartNanos = Utils::startStatOpTime(); + int64_t sampleStartNanos = startStatOpTime(); GfErrType err = removeAllNoThrow(keys, aCallbackArgument); - Utils::updateStatOpTime(m_regionStats->getStat(), - RegionStatType::getInstance()->getRemoveAllTimeId(), - sampleStartNanos); + updateStatOpTime(m_regionStats->getStat(), + m_regionStats->getRemoveAllTimeId(), sampleStartNanos); GfErrTypeToException("Region::removeAll", err); } @@ -537,17 +537,17 @@ void LocalRegion::getAll(const VectorOfCacheableKey& keys, !(addToLocalCache && m_regionAttributes->getCachingEnabled())) { throw IllegalArgumentException( "Region::getAll: either output \"values\"" - " parameter should be non-null, or \"addToLocalCache\" should be true " + " parameter should be non-null, or \"addToLocalCache\" should be " + "true " "and caching should be enabled for the region [%s]", getFullPath()); } - int64_t sampleStartNanos = Utils::startStatOpTime(); + int64_t sampleStartNanos = startStatOpTime(); GfErrType err = getAllNoThrow(keys, values, exceptions, addToLocalCache, aCallbackArgument); - Utils::updateStatOpTime(m_regionStats->getStat(), - RegionStatType::getInstance()->getGetAllTimeId(), - sampleStartNanos); + updateStatOpTime(m_regionStats->getStat(), m_regionStats->getGetAllTimeId(), + sampleStartNanos); // handleReplay(err, nullptr); GfErrTypeToException("Region::getAll", err); } @@ -650,8 +650,9 @@ void LocalRegion::setRegionExpiryTask() { uint32_t duration = getRegionExpiryDuration(); RegionExpiryHandler* handler = new RegionExpiryHandler(rptr, getRegionExpiryAction(), duration); - int64_t expiryTaskId = - CacheImpl::expiryTaskManager->scheduleExpiryTask(handler, duration, 0); + long expiryTaskId = + rptr->getCacheImpl()->getExpiryTaskManager().scheduleExpiryTask( + handler, duration, 0); handler->setExpiryTaskId(expiryTaskId); LOGFINE( "expiry for region [%s], expiry task id = %d, duration = %d, " @@ -670,8 +671,8 @@ void LocalRegion::registerEntryExpiryTask(MapEntryImplPtr& entry) { uint32_t duration = getEntryExpiryDuration(); EntryExpiryHandler* handler = new EntryExpiryHandler(rptr, entry, getEntryExpirationAction(), duration); - int64_t id = - CacheImpl::expiryTaskManager->scheduleExpiryTask(handler, duration, 0); + long id = rptr->getCacheImpl()->getExpiryTaskManager().scheduleExpiryTask( + handler, duration, 0); if (Log::finestEnabled()) { CacheableKeyPtr key; entry->getKeyI(key); @@ -740,7 +741,8 @@ void LocalRegion::release(bool invokeCallbacks) { /** Returns whether the specified key currently exists in this region. * This method is equivalent to <code>getEntry(key) != null</code>. * - * @param keyPtr the key to check for an existing entry, type is CacheableString + * @param keyPtr the key to check for an existing entry, type is + *CacheableString *& * @return true if there is an entry in this region for the specified key *@throw RegionDestroyedException, if region is destroyed. @@ -788,6 +790,7 @@ GfErrType LocalRegion::getNoThrow(const CacheableKeyPtr& keyPtr, const UserDataPtr& aCallbackArgument) { CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard); GfErrType err = GF_NOERR; + if (keyPtr == nullptr) { return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION; } @@ -809,7 +812,8 @@ GfErrType LocalRegion::getNoThrow(const CacheableKeyPtr& keyPtr, } m_regionStats->incGets(); - m_cacheImpl->m_cacheStats->incGets(); + auto& cachePerfStats = m_cacheImpl->getCachePerfStats(); + cachePerfStats.incGets(); // TODO: CacheableToken::isInvalid should be completely hidden // inside MapSegment; this should be done both for the value obtained @@ -824,7 +828,7 @@ GfErrType LocalRegion::getNoThrow(const CacheableKeyPtr& keyPtr, isLocal = m_entries->get(keyPtr, value, me); if (isLocal && (value != nullptr && !CacheableToken::isInvalid(value))) { m_regionStats->incHits(); - m_cacheImpl->m_cacheStats->incHits(); + cachePerfStats.incHits(); updateAccessAndModifiedTimeForEntry(me, false); updateAccessAndModifiedTime(false); return err; // found it in local cache... @@ -867,7 +871,8 @@ GfErrType LocalRegion::getNoThrow(const CacheableKeyPtr& keyPtr, // access times. updateAccessAndModifiedTime(false); m_regionStats->incMisses(); - m_cacheImpl->m_cacheStats->incMisses(); + + cachePerfStats.incMisses(); VersionTagPtr versionTag; // Get from some remote source (e.g. external java server) if required. err = getNoThrow_remote(keyPtr, value, aCallbackArgument, versionTag); @@ -880,12 +885,10 @@ GfErrType LocalRegion::getNoThrow(const CacheableKeyPtr& keyPtr, try { isLoaderInvoked = true; /*Update the statistics*/ - int64_t sampleStartNanos = Utils::startStatOpTime(); + int64_t sampleStartNanos = startStatOpTime(); value = m_loader->load(shared_from_this(), keyPtr, aCallbackArgument); - Utils::updateStatOpTime( - m_regionStats->getStat(), - RegionStatType::getInstance()->getLoaderCallTimeId(), - sampleStartNanos); + updateStatOpTime(m_regionStats->getStat(), + m_regionStats->getLoaderCallTimeId(), sampleStartNanos); m_regionStats->incLoaderCallsCompleted(); } catch (const Exception& ex) { LOGERROR("Error in CacheLoader::load: %s: %s", ex.getName(), @@ -909,7 +912,8 @@ GfErrType LocalRegion::getNoThrow(const CacheableKeyPtr& keyPtr, // try to create the entry and if that returns an existing value // (e.g. from another thread or notification) then return that LOGDEBUG( - "Region::get: creating entry with tracking update counter [%d] for key " + "Region::get: creating entry with tracking update counter [%d] for " + "key " "[%s]", updateCount, Utils::getCacheableKeyString(keyPtr)->asChar()); if ((err = putLocal("Region::get", false, keyPtr, value, oldValue, @@ -975,13 +979,15 @@ GfErrType LocalRegion::getAllNoThrow(const VectorOfCacheableKey& keys, } // if(!txState->isReplay()) // { - // auto args = std::make_shared<VectorOfCacheable>(); + // auto args = + // std::make_shared<VectorOfCacheable>(); // args->push_back(VectorOfCacheableKeyPtr(new // VectorOfCacheableKey(keys))); // args->push_back(values); // args->push_back(exceptions); // args->push_back(CacheableBoolean::create(addToLocalCache)); - // txState->recordTXOperation(GF_GET_ALL, getFullPath(), + // txState->recordTXOperation(GF_GET_ALL, + // getFullPath(), // nullptr, // args); // } @@ -998,18 +1004,19 @@ GfErrType LocalRegion::getAllNoThrow(const VectorOfCacheableKey& keys, VectorOfCacheableKey serverKeys; bool cachingEnabled = m_regionAttributes->getCachingEnabled(); bool regionAccessed = false; + auto& cachePerfStats = m_cacheImpl->getCachePerfStats(); for (int32_t index = 0; index < keys.size(); ++index) { const CacheableKeyPtr& key = keys[index]; MapEntryImplPtr me; value = nullptr; m_regionStats->incGets(); - m_cacheImpl->m_cacheStats->incGets(); + cachePerfStats.incGets(); if (values && cachingEnabled) { if (m_entries->get(key, value, me) && value && !CacheableToken::isInvalid(value)) { m_regionStats->incHits(); - m_cacheImpl->m_cacheStats->incHits(); + cachePerfStats.incHits(); updateAccessAndModifiedTimeForEntry(me, false); regionAccessed = true; values->emplace(key, value); @@ -1022,7 +1029,7 @@ GfErrType LocalRegion::getAllNoThrow(const VectorOfCacheableKey& keys, serverKeys.push_back(key); m_regionStats->incMisses(); - m_cacheImpl->m_cacheStats->incMisses(); + cachePerfStats.incMisses(); } // TODO: No support for loaders in getAll for now. } @@ -1259,6 +1266,8 @@ class DestroyActions { DataInput* delta = nullptr, EventIdPtr eventId = nullptr, bool afterRemote = false) { + auto& cachePerfStats = m_region.m_cacheImpl->getCachePerfStats(); + if (cachingEnabled) { MapEntryImplPtr entry; // for notification invoke the listener even if the key does @@ -1281,6 +1290,7 @@ class DestroyActions { } return err; } + if (oldValue != nullptr) { LOGDEBUG( "Region::destroy: region [%s] destroyed key [%s] having " @@ -1297,12 +1307,12 @@ class DestroyActions { } // update the stats m_region.m_regionStats->setEntries(m_region.m_entries->size()); - m_region.m_cacheImpl->m_cacheStats->incEntries(-1); + cachePerfStats.incEntries(-1); } } // update the stats m_region.m_regionStats->incDestroys(); - m_region.m_cacheImpl->m_cacheStats->incDestroys(); + cachePerfStats.incDestroys(); return GF_NOERR; } @@ -1361,8 +1371,10 @@ class RemoveActions { GfErrType err = GF_NOERR; if (!allowNULLValue && m_region.getAttributes()->getCachingEnabled()) { m_region.getEntry(key, valuePtr); - DataOutput out1; - DataOutput out2; + std::unique_ptr<DataOutput> out1 = + m_region.getCacheImpl()->getCache()->createDataOutput(); + std::unique_ptr<DataOutput> out2 = + m_region.getCacheImpl()->getCache()->createDataOutput(); if (valuePtr != nullptr && value != nullptr) { if (valuePtr->classId() != value->classId() || @@ -1370,14 +1382,14 @@ class RemoveActions { err = GF_ENOENT; return err; } - valuePtr->toData(out1); - value->toData(out2); - if (out1.getBufferLength() != out2.getBufferLength()) { + valuePtr->toData(*out1); + value->toData(*out2); + if (out1->getBufferLength() != out2->getBufferLength()) { err = GF_ENOENT; return err; } - if (memcmp(out1.getBuffer(), out2.getBuffer(), - out1.getBufferLength()) != 0) { + if (memcmp(out1->getBuffer(), out2->getBuffer(), + out1->getBufferLength()) != 0) { err = GF_ENOENT; return err; } @@ -1432,22 +1444,24 @@ class RemoveActions { GfErrType err = GF_NOERR; if (!allowNULLValue && cachingEnabled) { m_region.getEntry(key, valuePtr); - DataOutput out1; - DataOutput out2; + std::unique_ptr<DataOutput> out1 = + m_region.getCacheImpl()->getCache()->createDataOutput(); + std::unique_ptr<DataOutput> out2 = + m_region.getCacheImpl()->getCache()->createDataOutput(); if (valuePtr != nullptr && value != nullptr) { if (valuePtr->classId() != value->classId() || valuePtr->typeId() != value->typeId()) { err = GF_ENOENT; return err; } - valuePtr->toData(out1); - value->toData(out2); - if (out1.getBufferLength() != out2.getBufferLength()) { + valuePtr->toData(*out1); + value->toData(*out2); + if (out1->getBufferLength() != out2->getBufferLength()) { err = GF_ENOENT; return err; } - if (memcmp(out1.getBuffer(), out2.getBuffer(), - out1.getBufferLength()) != 0) { + if (memcmp(out1->getBuffer(), out2->getBuffer(), + out1->getBufferLength()) != 0) { err = GF_ENOENT; return err; } @@ -1458,9 +1472,9 @@ class RemoveActions { if (updateCount >= 0 && !m_region.getAttributes() ->getConcurrencyChecksEnabled()) { // This means server has - // deleted an entry & same - // entry has been destroyed - // locally + // deleted an entry & + // same entry has been + // destroyed locally // So call removeTrackerForEntry to remove key that was added in the // map during addTrackerForEntry call. m_region.m_entries->removeTrackerForEntry(key); @@ -1472,6 +1486,7 @@ class RemoveActions { return err; } } + auto& cachePerfStats = m_region.m_cacheImpl->getCachePerfStats(); if (cachingEnabled) { MapEntryImplPtr entry; @@ -1511,12 +1526,12 @@ class RemoveActions { } // update the stats m_region.m_regionStats->setEntries(m_region.m_entries->size()); - m_region.m_cacheImpl->m_cacheStats->incEntries(-1); + cachePerfStats.incEntries(-1); } } // update the stats m_region.m_regionStats->incDestroys(); - m_region.m_cacheImpl->m_cacheStats->incDestroys(); + cachePerfStats.incDestroys(); return GF_NOERR; } @@ -1704,10 +1719,11 @@ GfErrType LocalRegion::updateNoThrow(const CacheableKeyPtr& key, return GF_NOERR; } else if (err == GF_INVALID_DELTA) { LOGDEBUG( - "Region::localUpdate: updateNoThrow<%s> for key [%s] failed because " + "Region::localUpdate: updateNoThrow<%s> for key [%s] failed " + "because " "of invalid delta.", TAction::name(), Utils::getCacheableKeyString(key)->asChar()); - m_cacheImpl->m_cacheStats->incFailureOnDeltaReceived(); + m_cacheImpl->getCachePerfStats().incFailureOnDeltaReceived(); // Get full object from server. CacheablePtr& newValue1 = const_cast<CacheablePtr&>(value); VersionTagPtr versionTag1; @@ -1963,9 +1979,10 @@ GfErrType LocalRegion::putAllNoThrow(const HashMapOfCacheable& map, std::make_pair(key, std::make_pair(oldValue, updateCount))); } if (m_writer != nullptr) { - // invokeCacheWriterForEntryEvent method has the check that if oldValue - // is a CacheableToken then it sets it to nullptr; also determines if it - // should be BEFORE_UPDATE or BEFORE_CREATE depending on oldValue + // invokeCacheWriterForEntryEvent method has the check that if + // oldValue is a CacheableToken then it sets it to nullptr; also + // determines if it should be BEFORE_UPDATE or BEFORE_CREATE depending + // on oldValue if (!invokeCacheWriterForEntryEvent( key, oldValue, iter.second, aCallbackArgument, CacheEventFlags::LOCAL, BEFORE_UPDATE)) { @@ -2002,7 +2019,8 @@ GfErrType LocalRegion::putAllNoThrow(const HashMapOfCacheable& map, } else { // ThrowERROR LOGERROR( - "ERROR :: LocalRegion::putAllNoThrow() Key must be found in the " + "ERROR :: LocalRegion::putAllNoThrow() Key must be found in " + "the " "usermap"); } @@ -2022,7 +2040,8 @@ GfErrType LocalRegion::putAllNoThrow(const HashMapOfCacheable& map, versionTag)) == GF_CACHE_ENTRY_UPDATED) { LOGFINEST( "Region::putAll: did not change local value for key [%s] " - "since it has been updated by another thread while operation was " + "since it has been updated by another thread while operation " + "was " "in progress", Utils::getCacheableKeyString(key)->asChar()); } else if (localErr == GF_CACHE_LISTENER_EXCEPTION) { @@ -2058,7 +2077,8 @@ GfErrType LocalRegion::putAllNoThrow(const HashMapOfCacheable& map, versionTag)) == GF_CACHE_ENTRY_UPDATED) { LOGFINEST( "Region::putAll: did not change local value for key [%s] " - "since it has been updated by another thread while operation was " + "since it has been updated by another thread while operation " + "was " "in progress", Utils::getCacheableKeyString(key)->asChar()); } else if (localErr == GF_CACHE_LISTENER_EXCEPTION) { @@ -2161,11 +2181,10 @@ GfErrType LocalRegion::removeAllNoThrow(const VectorOfCacheableKey& keys, void LocalRegion::clear(const UserDataPtr& aCallbackArgument) { /*update the stats */ - int64_t sampleStartNanos = Utils::startStatOpTime(); + int64_t sampleStartNanos = startStatOpTime(); localClear(aCallbackArgument); - Utils::updateStatOpTime(m_regionStats->getStat(), - RegionStatType::getInstance()->getClearsId(), - sampleStartNanos); + updateStatOpTime(m_regionStats->getStat(), m_regionStats->getClearsId(), + sampleStartNanos); } void LocalRegion::localClear(const UserDataPtr& aCallbackArgument) { GfErrType err = localClearNoThrow(aCallbackArgument, CacheEventFlags::LOCAL); @@ -2326,7 +2345,8 @@ GfErrType LocalRegion::destroyRegionNoThrow( if (eventFlags == CacheEventFlags::LOCAL) { if (unregisterKeysBeforeDestroyRegion() != GF_NOERR) { LOGDEBUG( - "DEBUG :: LocalRegion::destroyRegionNoThrow UnregisteredKeys Failed"); + "DEBUG :: LocalRegion::destroyRegionNoThrow UnregisteredKeys " + "Failed"); } } @@ -2436,6 +2456,8 @@ GfErrType LocalRegion::putLocal(const char* name, bool isCreate, EventIdPtr eventId) { GfErrType err = GF_NOERR; bool isUpdate = !isCreate; + auto& cachePerfStats = m_cacheImpl->getCachePerfStats(); + if (cachingEnabled) { MapEntryImplPtr entry; LOGDEBUG("%s: region [%s] putting key [%s], value [%s]", name, @@ -2448,7 +2470,7 @@ GfErrType LocalRegion::putLocal(const char* name, bool isCreate, err = m_entries->put(key, value, entry, oldValue, updateCount, destroyTracker, versionTag, isUpdate, delta); if (err == GF_INVALID_DELTA) { - m_cacheImpl->m_cacheStats->incFailureOnDeltaReceived(); + cachePerfStats.incFailureOnDeltaReceived(); // PXR: Get full object from server. CacheablePtr& newValue1 = const_cast<CacheablePtr&>(value); VersionTagPtr versionTag1; @@ -2461,7 +2483,7 @@ GfErrType LocalRegion::putLocal(const char* name, bool isCreate, } if (delta != nullptr && err == GF_NOERR) { // Means that delta is on and there is no failure. - m_cacheImpl->m_cacheStats->incDeltaReceived(); + cachePerfStats.incDeltaReceived(); } } if (err != GF_NOERR) { @@ -2484,14 +2506,14 @@ GfErrType LocalRegion::putLocal(const char* name, bool isCreate, // update the stats if (isUpdate) { m_regionStats->incPuts(); - m_cacheImpl->m_cacheStats->incPuts(); + cachePerfStats.incPuts(); } else { if (cachingEnabled) { m_regionStats->setEntries(m_entries->size()); - m_cacheImpl->m_cacheStats->incEntries(1); + cachePerfStats.incEntries(1); } m_regionStats->incCreates(); - m_cacheImpl->m_cacheStats->incCreates(); + cachePerfStats.incCreates(); } return err; } @@ -2545,7 +2567,7 @@ bool LocalRegion::invokeCacheWriterForEntryEvent( try { bool updateStats = true; /*Update the CacheWriter Stats*/ - int64_t sampleStartNanos = Utils::startStatOpTime(); + int64_t sampleStartNanos = startStatOpTime(); switch (type) { case BEFORE_UPDATE: { if (oldValue != nullptr) { @@ -2572,10 +2594,9 @@ bool LocalRegion::invokeCacheWriterForEntryEvent( } if (updateStats) { - Utils::updateStatOpTime( - m_regionStats->getStat(), - RegionStatType::getInstance()->getWriterCallTimeId(), - sampleStartNanos); + updateStatOpTime(m_regionStats->getStat(), + m_regionStats->getWriterCallTimeId(), + sampleStartNanos); m_regionStats->incWriterCallsCompleted(); } @@ -2603,7 +2624,7 @@ bool LocalRegion::invokeCacheWriterForRegionEvent( try { bool updateStats = true; /*Update the CacheWriter Stats*/ - int64_t sampleStartNanos = Utils::startStatOpTime(); + int64_t sampleStartNanos = startStatOpTime(); switch (type) { case BEFORE_REGION_DESTROY: { eventStr = "beforeRegionDestroy"; @@ -2621,10 +2642,9 @@ bool LocalRegion::invokeCacheWriterForRegionEvent( } } if (updateStats) { - Utils::updateStatOpTime( - m_regionStats->getStat(), - RegionStatType::getInstance()->getWriterCallTimeId(), - sampleStartNanos); + updateStatOpTime(m_regionStats->getStat(), + m_regionStats->getWriterCallTimeId(), + sampleStartNanos); m_regionStats->incWriterCallsCompleted(); } } catch (const Exception& ex) { @@ -2656,7 +2676,7 @@ GfErrType LocalRegion::invokeCacheListenerForEntryEvent( try { bool updateStats = true; /*Update the CacheWriter Stats*/ - int64_t sampleStartNanos = Utils::startStatOpTime(); + int64_t sampleStartNanos = startStatOpTime(); switch (type) { case AFTER_UPDATE: { // when CREATE is received from server for notification @@ -2690,11 +2710,10 @@ GfErrType LocalRegion::invokeCacheListenerForEntryEvent( } } if (updateStats) { - m_cacheImpl->m_cacheStats->incListenerCalls(); - Utils::updateStatOpTime( - m_regionStats->getStat(), - RegionStatType::getInstance()->getListenerCallTimeId(), - sampleStartNanos); + m_cacheImpl->getCachePerfStats().incListenerCalls(); + updateStatOpTime(m_regionStats->getStat(), + m_regionStats->getListenerCallTimeId(), + sampleStartNanos); m_regionStats->incListenerCallsCompleted(); } } catch (const Exception& ex) { @@ -2729,18 +2748,18 @@ GfErrType LocalRegion::invokeCacheListenerForRegionEvent( case AFTER_REGION_DESTROY: { eventStr = "afterRegionDestroy"; m_listener->afterRegionDestroy(event); - m_cacheImpl->m_cacheStats->incListenerCalls(); + m_cacheImpl->getCachePerfStats().incListenerCalls(); if (eventFlags.isCacheClose()) { eventStr = "close"; m_listener->close(shared_from_this()); - m_cacheImpl->m_cacheStats->incListenerCalls(); + m_cacheImpl->getCachePerfStats().incListenerCalls(); } break; } case AFTER_REGION_INVALIDATE: { eventStr = "afterRegionInvalidate"; m_listener->afterRegionInvalidate(event); - m_cacheImpl->m_cacheStats->incListenerCalls(); + m_cacheImpl->getCachePerfStats().incListenerCalls(); break; } case AFTER_REGION_CLEAR: { @@ -2754,10 +2773,9 @@ GfErrType LocalRegion::invokeCacheListenerForRegionEvent( } } if (updateStats) { - Utils::updateStatOpTime( - m_regionStats->getStat(), - RegionStatType::getInstance()->getListenerCallTimeId(), - sampleStartNanos); + updateStatOpTime(m_regionStats->getStat(), + m_regionStats->getListenerCallTimeId(), + sampleStartNanos); m_regionStats->incListenerCallsCompleted(); } } catch (const Exception& ex) { @@ -2847,7 +2865,8 @@ ExpirationAction::Action LocalRegion::adjustEntryExpiryAction( bool hadExpiry = (getEntryExpiryDuration() != 0); if (!hadExpiry) { throw IllegalStateException( - "Cannot change entry ExpirationAction for region created without entry " + "Cannot change entry ExpirationAction for region created without " + "entry " "expiry."); } ExpirationAction::Action oldValue = getEntryExpirationAction(); @@ -2865,7 +2884,8 @@ int32_t LocalRegion::adjustRegionExpiryDuration(int32_t duration) { bool hadExpiry = (getEntryExpiryDuration() != 0); if (!hadExpiry) { throw IllegalStateException( - "Cannot change region expiration duration for region created without " + "Cannot change region expiration duration for region created " + "without " "region expiry."); } int32_t oldValue = getRegionExpiryDuration(); @@ -2899,14 +2919,9 @@ bool LocalRegion::isStatisticsEnabled() { if (m_cacheImpl == nullptr) { return false; } - if (m_cacheImpl->getCache() != nullptr) { - SystemProperties* props = - m_cacheImpl->getCache()->getDistributedSystem()->getSystemProperties(); - if (props) { - status = props->statisticsEnabled(); - } - } - return status; + return m_cacheImpl->getDistributedSystem() + .getSystemProperties() + .statisticsEnabled(); } bool LocalRegion::useModifiedTimeForRegionExpiry() { @@ -3106,7 +3121,7 @@ void LocalRegion::evict(int32_t percentage) { } void LocalRegion::invokeAfterAllEndPointDisconnected() { if (m_listener != nullptr) { - int64_t sampleStartNanos = Utils::startStatOpTime(); + int64_t sampleStartNanos = startStatOpTime(); try { m_listener->afterRegionDisconnected(shared_from_this()); } catch (const Exception& ex) { @@ -3115,10 +3130,8 @@ void LocalRegion::invokeAfterAllEndPointDisconnected() { } catch (...) { LOGERROR("Unknown exception in CacheListener::afterRegionDisconnected"); } - Utils::updateStatOpTime( - m_regionStats->getStat(), - RegionStatType::getInstance()->getListenerCallTimeId(), - sampleStartNanos); + updateStatOpTime(m_regionStats->getStat(), + m_regionStats->getListenerCallTimeId(), sampleStartNanos); m_regionStats->incListenerCallsCompleted(); } } @@ -3150,6 +3163,16 @@ CacheablePtr LocalRegion::handleReplay(GfErrType& err, TombstoneListPtr LocalRegion::getTombstoneList() { return m_tombstoneList; } +int64_t LocalRegion::startStatOpTime() { + return m_enableTimeStatistics ? Utils::startStatOpTime() : 0; +} +void LocalRegion::updateStatOpTime(Statistics* statistics, int32_t statId, + int64_t start) { + if (m_enableTimeStatistics) { + Utils::updateStatOpTime(statistics, statId, start); + } +} + } // namespace client } // namespace geode } // namespace apache
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/LocalRegion.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/LocalRegion.hpp b/src/cppcache/src/LocalRegion.hpp index 7d5a139..fee2518 100644 --- a/src/cppcache/src/LocalRegion.hpp +++ b/src/cppcache/src/LocalRegion.hpp @@ -131,7 +131,8 @@ class CPPCACHE_EXPORT LocalRegion : public RegionInternal { LocalRegion(const std::string& name, CacheImpl* cache, const RegionInternalPtr& rPtr, const RegionAttributesPtr& attributes, - const CacheStatisticsPtr& stats, bool shared = false); + const CacheStatisticsPtr& stats, bool shared = false, + bool enableTimeStatistics = true); virtual ~LocalRegion(); const char* getName() const; @@ -424,6 +425,10 @@ class CPPCACHE_EXPORT LocalRegion : public RegionInternal { DataInput* delta = nullptr, EventIdPtr eventId = nullptr); + int64_t startStatOpTime(); + void updateStatOpTime(Statistics* m_regionStats, int32_t statId, + int64_t start); + /* protected attributes */ std::string m_name; RegionPtr m_parentRegion; @@ -442,6 +447,7 @@ class CPPCACHE_EXPORT LocalRegion : public RegionInternal { TombstoneListPtr m_tombstoneList; bool m_isPRSingleHopEnabled; PoolPtr m_attachedPool; + bool m_enableTimeStatistics; mutable ACE_RW_Thread_Mutex m_rwLock; void keys_internal(VectorOfCacheableKey& v); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/MapEntry.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/MapEntry.cpp b/src/cppcache/src/MapEntry.cpp index b8f1854..13a4ff3 100644 --- a/src/cppcache/src/MapEntry.cpp +++ b/src/cppcache/src/MapEntry.cpp @@ -14,20 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #include "MapEntry.hpp" #include "MapEntryT.hpp" -using namespace apache::geode::client; +namespace apache { +namespace geode { +namespace client { -EntryFactory* EntryFactory::singleton = nullptr; MapEntryPtr MapEntry::MapEntry_NullPointer(nullptr); -/** - * @brief called when library is initialized... see CppCacheLibrary. - */ -void EntryFactory::init() { singleton = new EntryFactory(); } - -void EntryFactory::newMapEntry(const CacheableKeyPtr& key, +void EntryFactory::newMapEntry(ExpiryTaskManager* expiryTaskManager, + const CacheableKeyPtr& key, MapEntryImplPtr& result) const { if (m_concurrencyChecksEnabled) { result = MapEntryT<VersionedMapEntryImpl, 0, 0>::create(key); @@ -35,3 +33,7 @@ void EntryFactory::newMapEntry(const CacheableKeyPtr& key, result = MapEntryT<MapEntryImpl, 0, 0>::create(key); } } + +} // namespace client +} // namespace geode +} // namespace apache http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/MapEntry.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/MapEntry.hpp b/src/cppcache/src/MapEntry.hpp index 52fb299..270a8b8 100644 --- a/src/cppcache/src/MapEntry.hpp +++ b/src/cppcache/src/MapEntry.hpp @@ -1,8 +1,3 @@ -#pragma once - -#ifndef GEODE_MAPENTRY_H_ -#define GEODE_MAPENTRY_H_ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -20,8 +15,11 @@ * limitations under the License. */ -#include <atomic> +#pragma once +#ifndef GEODE_MAPENTRY_H_ +#define GEODE_MAPENTRY_H_ +#include <atomic> #include <geode/geode_globals.hpp> #include <geode/Cacheable.hpp> #include <geode/CacheableKey.hpp> @@ -45,6 +43,7 @@ class CPPCACHE_EXPORT MapEntryImpl; typedef std::shared_ptr<MapEntryImpl> MapEntryImplPtr; class CPPCACHE_EXPORT LRUEntryProperties; +class CacheImpl; /** * @brief This class encapsulates expiration specific properties for @@ -52,8 +51,11 @@ class CPPCACHE_EXPORT LRUEntryProperties; */ class CPPCACHE_EXPORT ExpEntryProperties { public: - inline ExpEntryProperties() - : m_lastAccessTime(0), m_lastModifiedTime(0), m_expiryTaskId(-1) { + inline ExpEntryProperties(ExpiryTaskManager* expiryTaskManager) + : m_lastAccessTime(0), + m_lastModifiedTime(0), + m_expiryTaskId(-1), + m_expiryTaskManager(expiryTaskManager) { // The reactor always gives +ve id while scheduling. // -1 will indicate that an expiry task has not been scheduled // for this entry. // TODO confirm @@ -86,7 +88,7 @@ class CPPCACHE_EXPORT ExpEntryProperties { inline void cancelExpiryTaskId(const CacheableKeyPtr& key) const { LOGDEBUG("Cancelling expiration task for key [%s] with id [%d]", Utils::getCacheableKeyString(key)->asChar(), m_expiryTaskId); - CacheImpl::expiryTaskManager->cancelTask(m_expiryTaskId); + m_expiryTaskManager->cancelTask(m_expiryTaskId); } protected: @@ -100,6 +102,7 @@ class CPPCACHE_EXPORT ExpEntryProperties { std::atomic<uint32_t> m_lastModifiedTime; /** The expiry task id for this particular entry.. **/ long m_expiryTaskId; + ExpiryTaskManager* m_expiryTaskManager; }; /** @@ -270,20 +273,15 @@ typedef std::shared_ptr<VersionedMapEntryImpl> VersionedMapEntryImplPtr; class CPPCACHE_EXPORT EntryFactory { public: - static EntryFactory* singleton; - static void init(); - - EntryFactory() { m_concurrencyChecksEnabled = true; } + EntryFactory(const bool concurrencyChecksEnabled) + : m_concurrencyChecksEnabled(concurrencyChecksEnabled) {} virtual ~EntryFactory() {} - virtual void newMapEntry(const CacheableKeyPtr& key, + virtual void newMapEntry(ExpiryTaskManager* expiryTaskManager, + const CacheableKeyPtr& key, MapEntryImplPtr& result) const; - virtual void setConcurrencyChecksEnabled(bool enabled) { - m_concurrencyChecksEnabled = enabled; - } - protected: bool m_concurrencyChecksEnabled; }; http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/MapEntryT.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/MapEntryT.hpp b/src/cppcache/src/MapEntryT.hpp index 86db403..4bd354e 100644 --- a/src/cppcache/src/MapEntryT.hpp +++ b/src/cppcache/src/MapEntryT.hpp @@ -1,8 +1,3 @@ -#pragma once - -#ifndef GEODE_MAPENTRYT_H_ -#define GEODE_MAPENTRYT_H_ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -20,6 +15,11 @@ * limitations under the License. */ +#pragma once + +#ifndef GEODE_MAPENTRYT_H_ +#define GEODE_MAPENTRYT_H_ + #include <geode/geode_globals.hpp> #include "MapEntry.hpp" #include "TrackedMapEntry.hpp" @@ -116,8 +116,16 @@ class MapEntryT : public TBase { return std::make_shared<MapEntryT>(key); } + inline static std::shared_ptr<MapEntryT> create( + ExpiryTaskManager* expiryTaskManager, const CacheableKeyPtr& key) { + return std::make_shared<MapEntryT>(expiryTaskManager, key); + } + protected: inline MapEntryT(const CacheableKeyPtr& key) : TBase(key) {} + inline MapEntryT(ExpiryTaskManager* expiryTaskManager, + const CacheableKeyPtr& key) + : TBase(expiryTaskManager, key) {} private: // disabled http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/MapSegment.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/MapSegment.cpp b/src/cppcache/src/MapSegment.cpp index efa5f92..fe1ae59 100644 --- a/src/cppcache/src/MapSegment.cpp +++ b/src/cppcache/src/MapSegment.cpp @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #include "MapSegment.hpp" #include "MapEntry.hpp" #include "TrackedMapEntry.hpp" @@ -42,7 +43,8 @@ MapSegment::~MapSegment() { } void MapSegment::open(RegionInternal* region, const EntryFactory* entryFactory, - uint32_t size, std::atomic<int32_t>* destroyTrackers, + ExpiryTaskManager* expiryTaskManager, uint32_t size, + std::atomic<int32_t>* destroyTrackers, bool concurrencyChecksEnabled) { m_map = new CacheableKeyHashMap(); uint32_t mapSize = TableOfPrimes::nextLargerPrime(size, m_primeIndex); @@ -51,6 +53,9 @@ void MapSegment::open(RegionInternal* region, const EntryFactory* entryFactory, m_map->open(mapSize); m_entryFactory = entryFactory; m_region = region; + m_tombstoneList = + std::make_shared<TombstoneList>(this, m_region->getCacheImpl()); + m_expiryTaskManager = expiryTaskManager; m_numDestroyTrackers = destroyTrackers; m_concurrencyChecksEnabled = concurrencyChecksEnabled; } @@ -123,7 +128,7 @@ GfErrType MapSegment::create(const CacheableKeyPtr& key, } } if (taskid != -1) { - CacheImpl::expiryTaskManager->cancelTask(taskid); + m_expiryTaskManager->cancelTask(taskid); if (handler != nullptr) delete handler; } return err; @@ -194,7 +199,7 @@ GfErrType MapSegment::put(const CacheableKeyPtr& key, } } if (taskid != -1) { - CacheImpl::expiryTaskManager->cancelTask(taskid); + m_expiryTaskManager->cancelTask(taskid); if (handler != nullptr) delete handler; } return err; @@ -279,7 +284,7 @@ GfErrType MapSegment::removeWhenConcurrencyEnabled( if ((err = putForTrackedEntry(key, CacheableToken::tombstone(), entry, entryImpl, updateCount, versionStamp)) == GF_NOERR) { - m_tombstoneList->add(m_region, entryImpl, handler, expiryTaskID); + m_tombstoneList->add(entryImpl, handler, expiryTaskID); expTaskSet = true; } if (CacheableToken::isTombstone(oldValue)) { @@ -298,8 +303,7 @@ GfErrType MapSegment::removeWhenConcurrencyEnabled( if (_VERSION_TAG_NULL_CHK) { MapEntryImplPtr mapEntry; putNoEntry(key, CacheableToken::tombstone(), mapEntry, -1, 0, versionTag); - m_tombstoneList->add(m_region, mapEntry->getImplPtr(), handler, - expiryTaskID); + m_tombstoneList->add(mapEntry->getImplPtr(), handler, expiryTaskID); expTaskSet = true; } oldValue = nullptr; @@ -335,7 +339,7 @@ GfErrType MapSegment::remove(const CacheableKeyPtr& key, CacheablePtr& oldValue, } if (!expTaskSet) { - CacheImpl::expiryTaskManager->cancelTask(id); + m_expiryTaskManager->cancelTask(id); delete handler; } return err; @@ -369,7 +373,7 @@ GfErrType MapSegment::remove(const CacheableKeyPtr& key, CacheablePtr& oldValue, bool MapSegment::unguardedRemoveActualEntry(const CacheableKeyPtr& key, bool cancelTask) { MapEntryPtr entry; - m_tombstoneList->eraseEntryFromTombstoneList(key, m_region, cancelTask); + m_tombstoneList->eraseEntryFromTombstoneList(key, cancelTask); if (m_map->unbind(key, entry) == -1) { return false; } @@ -381,7 +385,7 @@ bool MapSegment::unguardedRemoveActualEntryWithoutCancelTask( int64_t& taskid) { MapEntryPtr entry; taskid = m_tombstoneList->eraseEntryFromTombstoneListWithoutCancelTask( - key, m_region, handler); + key, handler); if (m_map->unbind(key, entry) == -1) { return false; } @@ -522,7 +526,7 @@ int MapSegment::addTrackerForEntry(const CacheableKeyPtr& key, if (addIfAbsent) { MapEntryImplPtr entryImpl; // add a new entry with value as destroyed - m_entryFactory->newMapEntry(key, entryImpl); + m_entryFactory->newMapEntry(m_expiryTaskManager, key, entryImpl); entryImpl->setValueI(CacheableToken::destroyed()); entry = entryImpl; newEntry = entryImpl; http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/MapSegment.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/MapSegment.hpp b/src/cppcache/src/MapSegment.hpp index 94609d6..6483c61 100644 --- a/src/cppcache/src/MapSegment.hpp +++ b/src/cppcache/src/MapSegment.hpp @@ -1,8 +1,3 @@ -#pragma once - -#ifndef GEODE_MAPSEGMENT_H_ -#define GEODE_MAPSEGMENT_H_ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -20,6 +15,11 @@ * limitations under the License. */ +#pragma once + +#ifndef GEODE_MAPSEGMENT_H_ +#define GEODE_MAPSEGMENT_H_ + #include <geode/geode_globals.hpp> #include <geode/CacheableKey.hpp> @@ -82,6 +82,7 @@ class CPPCACHE_EXPORT MapSegment { // does not need deletion here. const EntryFactory* m_entryFactory; RegionInternal* m_region; + ExpiryTaskManager* m_expiryTaskManager; // index of the current prime in the primes table uint32_t m_primeIndex; @@ -163,7 +164,7 @@ class CPPCACHE_EXPORT MapSegment { } } } - m_entryFactory->newMapEntry(key, newEntry); + m_entryFactory->newMapEntry(m_expiryTaskManager, key, newEntry); newEntry->setValueI(newValue); if (m_concurrencyChecksEnabled) { if (versionTag != nullptr && versionTag.get() != nullptr) { @@ -195,15 +196,14 @@ class CPPCACHE_EXPORT MapSegment { : m_map(nullptr), m_entryFactory(nullptr), m_region(nullptr), + m_expiryTaskManager(nullptr), m_primeIndex(0), m_spinlock(), m_segmentMutex(), m_concurrencyChecksEnabled(false), m_numDestroyTrackers(nullptr), - m_rehashCount(0) // COVERITY --> 30303 Uninitialized scalar field - { - m_tombstoneList = std::make_shared<TombstoneList>(this); - } + m_rehashCount(0), + m_tombstoneList(nullptr) {} ~MapSegment(); @@ -217,8 +217,8 @@ class CPPCACHE_EXPORT MapSegment { * Used when allocated in arrays by EntriesMap implementations. */ void open(RegionInternal* region, const EntryFactory* entryFactory, - uint32_t size, std::atomic<int32_t>* destroyTrackers, - bool concurrencyChecksEnabled); + ExpiryTaskManager* expiryTaskManager, uint32_t size, + std::atomic<int32_t>* destroyTrackers, bool concurrencyChecksEnabled); void close(); void clear(); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/PdxHelper.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/PdxHelper.cpp b/src/cppcache/src/PdxHelper.cpp index 80ad913..7da7086 100644 --- a/src/cppcache/src/PdxHelper.cpp +++ b/src/cppcache/src/PdxHelper.cpp @@ -33,7 +33,10 @@ #include "Utils.hpp" #include "PdxRemoteWriter.hpp" #include "CacheRegionHelper.hpp" +#include "ThinClientPoolDM.hpp" #include <geode/Cache.hpp> +#include <geode/DataInput.hpp> +#include <geode/PoolManager.hpp> namespace apache { namespace geode { @@ -44,18 +47,6 @@ PdxHelper::PdxHelper() {} PdxHelper::~PdxHelper() {} -CacheImpl* PdxHelper::getCacheImpl() { - CachePtr cache = CacheFactory::getAnyInstance(); - if (cache == nullptr) { - throw IllegalStateException("cache has not been created yet."); - ; - } - if (cache->isClosed()) { - throw IllegalStateException("cache has been closed. "); - } - return CacheRegionHelper::getCacheImpl(cache.get()); -} - void PdxHelper::serializePdx(DataOutput& output, const PdxSerializable& pdxObject) { serializePdx( @@ -69,6 +60,9 @@ void PdxHelper::serializePdx(DataOutput& output, const char* pdxClassname = nullptr; auto pdxII = std::dynamic_pointer_cast<PdxInstanceImpl>(pdxObject); + auto cacheImpl = CacheRegionHelper::getCacheImpl(output.getCache()); + auto pdxTypeRegistry = cacheImpl->getPdxTypeRegistry(); + auto& cachePerfStats = cacheImpl->getCachePerfStats(); if (pdxII != nullptr) { PdxTypePtr piPt = pdxII->getPdxType(); @@ -76,10 +70,10 @@ void PdxHelper::serializePdx(DataOutput& output, piPt->getTypeId() == 0) // from pdxInstance factory need to get typeid from server { - int typeId = PdxTypeRegistry::getPDXIdForType(piPt, output.getPoolName()); + int typeId = pdxTypeRegistry->getPDXIdForType(piPt, output.getPoolName()); pdxII->setPdxId(typeId); } - auto plw = std::make_shared<PdxLocalWriter>(output, piPt); + auto plw = std::make_shared<PdxLocalWriter>(output, piPt, pdxTypeRegistry); pdxII->toData(plw); plw->endObjectWriting(); // now write typeid int len = 0; @@ -93,34 +87,31 @@ void PdxHelper::serializePdx(DataOutput& output, const char* pdxType = pdxObject->getClassName(); pdxClassname = pdxType; - PdxTypePtr localPdxType = PdxTypeRegistry::getLocalPdxType(pdxType); + PdxTypePtr localPdxType = pdxTypeRegistry->getLocalPdxType(pdxType); if (localPdxType == nullptr) { // need to grab type info, as fromdata is not called yet PdxWriterWithTypeCollectorPtr ptc = - std::make_shared<PdxWriterWithTypeCollector>(output, pdxType); + std::make_shared<PdxWriterWithTypeCollector>(output, pdxType, + pdxTypeRegistry); pdxObject->toData(std::dynamic_pointer_cast<PdxWriter>(ptc)); PdxTypePtr nType = ptc->getPdxLocalType(); nType->InitializeType(); - - // SerializationRegistry::GetPDXIdForType(output.getPoolName(), nType); - int32_t nTypeId = PdxTypeRegistry::getPDXIdForType( + int32_t nTypeId = pdxTypeRegistry->getPDXIdForType( pdxType, output.getPoolName(), nType, true); nType->setTypeId(nTypeId); ptc->endObjectWriting(); - PdxTypeRegistry::addLocalPdxType(pdxType, nType); - PdxTypeRegistry::addPdxType(nTypeId, nType); + pdxTypeRegistry->addLocalPdxType(pdxType, nType); + pdxTypeRegistry->addPdxType(nTypeId, nType); - //[ToDo] need to write bytes for stats - CacheImpl* cacheImpl = PdxHelper::getCacheImpl(); if (cacheImpl != nullptr) { uint8_t* stPos = const_cast<uint8_t*>(output.getBuffer()) + ptc->getStartPositionOffset(); int pdxLen = PdxHelper::readInt32(stPos); - cacheImpl->m_cacheStats->incPdxSerialization( + cachePerfStats.incPdxSerialization( pdxLen + 1 + 2 * 4); // pdxLen + 93 DSID + len + typeID } @@ -129,7 +120,7 @@ void PdxHelper::serializePdx(DataOutput& output, // if object got from server than create instance of RemoteWriter otherwise // local writer. - PdxRemotePreservedDataPtr pd = PdxTypeRegistry::getPreserveData(pdxObject); + PdxRemotePreservedDataPtr pd = pdxTypeRegistry->getPreserveData(pdxObject); // now always remotewriter as we have API Read/WriteUnreadFields // so we don't know whether user has used those or not;; Can we do some @@ -138,21 +129,22 @@ void PdxHelper::serializePdx(DataOutput& output, if (pd != nullptr) { PdxTypePtr mergedPdxType = - PdxTypeRegistry::getPdxType(pd->getMergedTypeId()); - prw = std::make_shared<PdxRemoteWriter>(output, mergedPdxType, pd); + pdxTypeRegistry->getPdxType(pd->getMergedTypeId()); + prw = std::make_shared<PdxRemoteWriter>(output, mergedPdxType, pd, + pdxTypeRegistry); } else { - prw = std::make_shared<PdxRemoteWriter>(output, pdxClassname); + prw = std::make_shared<PdxRemoteWriter>(output, pdxClassname, + pdxTypeRegistry); } pdxObject->toData(std::dynamic_pointer_cast<PdxWriter>(prw)); prw->endObjectWriting(); //[ToDo] need to write bytes for stats - CacheImpl* cacheImpl = PdxHelper::getCacheImpl(); if (cacheImpl != nullptr) { uint8_t* stPos = const_cast<uint8_t*>(output.getBuffer()) + prw->getStartPositionOffset(); int pdxLen = PdxHelper::readInt32(stPos); - cacheImpl->m_cacheStats->incPdxSerialization( + cachePerfStats.incPdxSerialization( pdxLen + 1 + 2 * 4); // pdxLen + 93 DSID + len + typeID } } @@ -165,9 +157,13 @@ PdxSerializablePtr PdxHelper::deserializePdx(DataInput& dataInput, PdxSerializablePtr pdxObjectptr = nullptr; PdxTypePtr pdxLocalType = nullptr; - PdxTypePtr pType = PdxTypeRegistry::getPdxType(typeId); + auto cacheImpl = CacheRegionHelper::getCacheImpl(dataInput.getCache()); + auto pdxTypeRegistry = cacheImpl->getPdxTypeRegistry(); + auto serializationRegistry = cacheImpl->getSerializationRegistry(); + + PdxTypePtr pType = pdxTypeRegistry->getPdxType(typeId); if (pType != nullptr) { // this may happen with PdxInstanceFactory { - pdxLocalType = PdxTypeRegistry::getLocalPdxType( + pdxLocalType = pdxTypeRegistry->getLocalPdxType( pType->getPdxClassName()); // this should be fine for IPdxTypeMapper } if (pType != nullptr && pdxLocalType != nullptr) // type found @@ -176,25 +172,27 @@ PdxSerializablePtr PdxHelper::deserializePdx(DataInput& dataInput, LOGDEBUG("deserializePdx ClassName = %s, isLocal = %d ", pType->getPdxClassName(), pType->isLocal()); - pdxObjectptr = SerializationRegistry::getPdxType(pdxClassname); + pdxObjectptr = serializationRegistry->getPdxType(pdxClassname); if (pType->isLocal()) // local type no need to read Unread data { - PdxLocalReaderPtr plr = - std::make_shared<PdxLocalReader>(dataInput, pType, length); + PdxLocalReaderPtr plr = std::make_shared<PdxLocalReader>( + dataInput, pType, length, pdxTypeRegistry); pdxObjectptr->fromData(std::dynamic_pointer_cast<PdxReader>(plr)); plr->MoveStream(); } else { - PdxRemoteReaderPtr prr = - std::make_shared<PdxRemoteReader>(dataInput, pType, length); + PdxRemoteReaderPtr prr = std::make_shared<PdxRemoteReader>( + dataInput, pType, length, pdxTypeRegistry); pdxObjectptr->fromData(std::dynamic_pointer_cast<PdxReader>(prr)); PdxTypePtr mergedVersion = - PdxTypeRegistry::getMergedType(pType->getTypeId()); + pdxTypeRegistry->getMergedType(pType->getTypeId()); PdxRemotePreservedDataPtr preserveData = prr->getPreservedData(mergedVersion, pdxObjectptr); if (preserveData != nullptr) { - PdxTypeRegistry::setPreserveData( - pdxObjectptr, preserveData); // it will set data in weakhashmap + pdxTypeRegistry->setPreserveData( + pdxObjectptr, preserveData, + cacheImpl + ->getExpiryTaskManager()); // it will set data in weakhashmap } prr->MoveStream(); } @@ -202,9 +200,11 @@ PdxSerializablePtr PdxHelper::deserializePdx(DataInput& dataInput, // type not found; need to get from server if (pType == nullptr) { pType = std::static_pointer_cast<PdxType>( - SerializationRegistry::GetPDXTypeById(dataInput.getPoolName(), - typeId)); - pdxLocalType = PdxTypeRegistry::getLocalPdxType(pType->getPdxClassName()); + serializationRegistry->GetPDXTypeById( + cacheImpl->getCache()->getPoolManager().find( + dataInput.getPoolName()), + typeId)); + pdxLocalType = pdxTypeRegistry->getLocalPdxType(pType->getPdxClassName()); } /* adongre - Coverity II * CID 29298: Unused pointer value (UNUSED_VALUE) @@ -213,13 +213,13 @@ PdxSerializablePtr PdxHelper::deserializePdx(DataInput& dataInput, * Fix : Commented the line */ // pdxClassname = pType->getPdxClassName(); - pdxObjectptr = SerializationRegistry::getPdxType(pType->getPdxClassName()); + pdxObjectptr = serializationRegistry->getPdxType(pType->getPdxClassName()); PdxSerializablePtr pdxRealObject = pdxObjectptr; if (pdxLocalType == nullptr) // need to know local type { PdxReaderWithTypeCollectorPtr prtc = - std::make_shared<PdxReaderWithTypeCollector>(dataInput, pType, - length); + std::make_shared<PdxReaderWithTypeCollector>(dataInput, pType, length, + pdxTypeRegistry); pdxObjectptr->fromData(std::dynamic_pointer_cast<PdxReader>(prtc)); // Check for the PdxWrapper @@ -227,44 +227,45 @@ PdxSerializablePtr PdxHelper::deserializePdx(DataInput& dataInput, pdxLocalType = prtc->getLocalType(); if (pType->Equals(pdxLocalType)) { - PdxTypeRegistry::addLocalPdxType(pdxRealObject->getClassName(), pType); - PdxTypeRegistry::addPdxType(pType->getTypeId(), pType); + pdxTypeRegistry->addLocalPdxType(pdxRealObject->getClassName(), pType); + pdxTypeRegistry->addPdxType(pType->getTypeId(), pType); pType->setLocal(true); } else { // Need to know local type and then merge type pdxLocalType->InitializeType(); - pdxLocalType->setTypeId(PdxTypeRegistry::getPDXIdForType( + pdxLocalType->setTypeId(pdxTypeRegistry->getPDXIdForType( pdxObjectptr->getClassName(), dataInput.getPoolName(), pdxLocalType, true)); pdxLocalType->setLocal(true); - PdxTypeRegistry::addLocalPdxType(pdxRealObject->getClassName(), + pdxTypeRegistry->addLocalPdxType(pdxRealObject->getClassName(), pdxLocalType); // added local type - PdxTypeRegistry::addPdxType(pdxLocalType->getTypeId(), pdxLocalType); + pdxTypeRegistry->addPdxType(pdxLocalType->getTypeId(), pdxLocalType); pType->InitializeType(); - PdxTypeRegistry::addPdxType(pType->getTypeId(), + pdxTypeRegistry->addPdxType(pType->getTypeId(), pType); // adding remote type // create merge type createMergedType(pdxLocalType, pType, dataInput); PdxTypePtr mergedVersion = - PdxTypeRegistry::getMergedType(pType->getTypeId()); + pdxTypeRegistry->getMergedType(pType->getTypeId()); PdxRemotePreservedDataPtr preserveData = prtc->getPreservedData(mergedVersion, pdxObjectptr); if (preserveData != nullptr) { - PdxTypeRegistry::setPreserveData(pdxObjectptr, preserveData); + pdxTypeRegistry->setPreserveData(pdxObjectptr, preserveData, + cacheImpl->getExpiryTaskManager()); } } prtc->MoveStream(); } else { // remote reader will come here as local type is there pType->InitializeType(); LOGDEBUG("Adding type %d ", pType->getTypeId()); - PdxTypeRegistry::addPdxType(pType->getTypeId(), + pdxTypeRegistry->addPdxType(pType->getTypeId(), pType); // adding remote type - PdxRemoteReaderPtr prr = - std::make_shared<PdxRemoteReader>(dataInput, pType, length); + PdxRemoteReaderPtr prr = std::make_shared<PdxRemoteReader>( + dataInput, pType, length, pdxTypeRegistry); pdxObjectptr->fromData(std::dynamic_pointer_cast<PdxReader>(prr)); // Check for PdxWrapper to getObject. @@ -272,12 +273,13 @@ PdxSerializablePtr PdxHelper::deserializePdx(DataInput& dataInput, createMergedType(pdxLocalType, pType, dataInput); PdxTypePtr mergedVersion = - PdxTypeRegistry::getMergedType(pType->getTypeId()); + pdxTypeRegistry->getMergedType(pType->getTypeId()); PdxRemotePreservedDataPtr preserveData = prr->getPreservedData(mergedVersion, pdxObjectptr); if (preserveData != nullptr) { - PdxTypeRegistry::setPreserveData(pdxObjectptr, preserveData); + pdxTypeRegistry->setPreserveData(pdxObjectptr, preserveData, + cacheImpl->getExpiryTaskManager()); } prr->MoveStream(); } @@ -287,7 +289,11 @@ PdxSerializablePtr PdxHelper::deserializePdx(DataInput& dataInput, PdxSerializablePtr PdxHelper::deserializePdx(DataInput& dataInput, bool forceDeserialize) { - if (PdxTypeRegistry::getPdxReadSerialized() == false || forceDeserialize) { + auto cacheImpl = CacheRegionHelper::getCacheImpl(dataInput.getCache()); + auto pdxTypeRegistry = cacheImpl->getPdxTypeRegistry(); + auto serializationRegistry = cacheImpl->getSerializationRegistry(); + auto& cachePerfStats = cacheImpl->getCachePerfStats(); + if (pdxTypeRegistry->getPdxReadSerialized() == false || forceDeserialize) { // Read Length int32_t len; dataInput.readInt(&len); @@ -296,11 +302,8 @@ PdxSerializablePtr PdxHelper::deserializePdx(DataInput& dataInput, // read typeId dataInput.readInt(&typeId); - auto cacheImpl = PdxHelper::getCacheImpl(); - if (cacheImpl != nullptr) { - cacheImpl->m_cacheStats->incPdxDeSerialization(len + - 9); // pdxLen + 1 + 2*4 - } + cachePerfStats.incPdxDeSerialization(len + 9); // pdxLen + 1 + 2*4 + return PdxHelper::deserializePdx(dataInput, forceDeserialize, (int32_t)typeId, (int32_t)len); @@ -313,26 +316,31 @@ PdxSerializablePtr PdxHelper::deserializePdx(DataInput& dataInput, // read typeId dataInput.readInt(&typeId); - auto pType = PdxTypeRegistry::getPdxType(typeId); + auto pType = pdxTypeRegistry->getPdxType(typeId); if (pType == nullptr) { + // TODO shared_ptr why redef? auto pType = std::static_pointer_cast<PdxType>( - SerializationRegistry::GetPDXTypeById(dataInput.getPoolName(), - typeId)); - PdxTypeRegistry::addLocalPdxType(pType->getPdxClassName(), pType); - PdxTypeRegistry::addPdxType(pType->getTypeId(), pType); + serializationRegistry->GetPDXTypeById( + cacheImpl->getCache()->getPoolManager().find( + dataInput.getPoolName()), + typeId)); + pdxTypeRegistry->addLocalPdxType(pType->getPdxClassName(), pType); + pdxTypeRegistry->addPdxType(pType->getTypeId(), pType); } + cachePerfStats.incPdxInstanceCreations(); + // TODO::Enable it once the PdxInstanceImple is CheckedIn. auto pdxObject = std::make_shared<PdxInstanceImpl>( - const_cast<uint8_t*>(dataInput.currentBufferPosition()), len, typeId); + const_cast<uint8_t*>(dataInput.currentBufferPosition()), len, typeId, + &cachePerfStats, pdxTypeRegistry, dataInput.getCache(), + cacheImpl->getDistributedSystem() + .getSystemProperties() + .getEnableTimeStatistics()); dataInput.advanceCursor(len); - auto cacheImpl = PdxHelper::getCacheImpl(); - if (cacheImpl != nullptr) { - cacheImpl->m_cacheStats->incPdxInstanceCreations(); - } return pdxObject; } } @@ -340,22 +348,26 @@ PdxSerializablePtr PdxHelper::deserializePdx(DataInput& dataInput, void PdxHelper::createMergedType(PdxTypePtr localType, PdxTypePtr remoteType, DataInput& dataInput) { PdxTypePtr mergedVersion = localType->mergeVersion(remoteType); + auto cacheImpl = CacheRegionHelper::getCacheImpl(dataInput.getCache()); + auto pdxTypeRegistry = cacheImpl->getPdxTypeRegistry(); + auto serializaionRegistry = cacheImpl->getSerializationRegistry(); if (mergedVersion->Equals(localType)) { - PdxTypeRegistry::setMergedType(remoteType->getTypeId(), localType); + pdxTypeRegistry->setMergedType(remoteType->getTypeId(), localType); } else if (mergedVersion->Equals(remoteType)) { - PdxTypeRegistry::setMergedType(remoteType->getTypeId(), remoteType); + pdxTypeRegistry->setMergedType(remoteType->getTypeId(), remoteType); } else { // need to create new version mergedVersion->InitializeType(); if (mergedVersion->getTypeId() == 0) { - mergedVersion->setTypeId(SerializationRegistry::GetPDXIdForType( - dataInput.getPoolName(), mergedVersion)); + mergedVersion->setTypeId(serializaionRegistry->GetPDXIdForType( + dataInput.getCache()->getPoolManager().find(dataInput.getPoolName()), + mergedVersion)); } // PdxTypeRegistry::AddPdxType(remoteType->TypeId, mergedVersion); - PdxTypeRegistry::addPdxType(mergedVersion->getTypeId(), mergedVersion); - PdxTypeRegistry::setMergedType(remoteType->getTypeId(), mergedVersion); - PdxTypeRegistry::setMergedType(mergedVersion->getTypeId(), mergedVersion); + pdxTypeRegistry->addPdxType(mergedVersion->getTypeId(), mergedVersion); + pdxTypeRegistry->setMergedType(remoteType->getTypeId(), mergedVersion); + pdxTypeRegistry->setMergedType(mergedVersion->getTypeId(), mergedVersion); } } @@ -414,13 +426,15 @@ int32_t PdxHelper::readInt(uint8_t* offsetPosition, int size) { } int32_t PdxHelper::getEnumValue(const char* enumClassName, const char* enumName, - int hashcode) { - auto ei = std::make_shared<EnumInfo>(enumClassName, enumName, hashcode); - return PdxTypeRegistry::getEnumValue(ei); + int hashcode, + PdxTypeRegistryPtr pdxTypeRegistry) { + const auto& ei = + std::make_shared<EnumInfo>(enumClassName, enumName, hashcode); + return pdxTypeRegistry->getEnumValue(ei); } -EnumInfoPtr PdxHelper::getEnum(int enumId) { - EnumInfoPtr ei = PdxTypeRegistry::getEnum(enumId); +EnumInfoPtr PdxHelper::getEnum(int enumId, PdxTypeRegistryPtr pdxTypeRegistry) { + const auto& ei = pdxTypeRegistry->getEnum(enumId); return ei; } } // namespace client http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/PdxHelper.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/PdxHelper.hpp b/src/cppcache/src/PdxHelper.hpp index 546278e..d83d6b1 100644 --- a/src/cppcache/src/PdxHelper.hpp +++ b/src/cppcache/src/PdxHelper.hpp @@ -77,9 +77,9 @@ class PdxHelper { static int32_t readInt(uint8_t* offsetPosition, int size); static int32_t getEnumValue(const char* enumClassName, const char* enumName, - int hashcode); + int hashcode, PdxTypeRegistryPtr pdxTypeRegistry); - static EnumInfoPtr getEnum(int enumId); + static EnumInfoPtr getEnum(int enumId, PdxTypeRegistryPtr pdxTypeRegistry); static CacheImpl* getCacheImpl(); }; http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/PdxInstanceFactoryImpl.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/PdxInstanceFactoryImpl.cpp b/src/cppcache/src/PdxInstanceFactoryImpl.cpp index 166f742..b75c9fe 100644 --- a/src/cppcache/src/PdxInstanceFactoryImpl.cpp +++ b/src/cppcache/src/PdxInstanceFactoryImpl.cpp @@ -26,13 +26,20 @@ namespace client { PdxInstanceFactoryImpl::~PdxInstanceFactoryImpl() {} -PdxInstanceFactoryImpl::PdxInstanceFactoryImpl(const char* className) { +PdxInstanceFactoryImpl::PdxInstanceFactoryImpl( + const char* className, CachePerfStats* cachePerfStats, + PdxTypeRegistryPtr pdxTypeRegistry, const Cache* cache, + bool enableTimeStatistics) + : m_pdxType(std::make_shared<PdxType>(pdxTypeRegistry, className, false)), + m_created(false), + m_cachePerfStats(cachePerfStats), + m_pdxTypeRegistry(pdxTypeRegistry), + m_cache(cache), + m_enableTimeStatistics(enableTimeStatistics) { if (className == nullptr || *className == '\0') { // COVERITY ---> 30289 Same on both sides throw IllegalStateException("className should not be null."); } - m_pdxType = std::make_shared<PdxType>(className, false); - m_created = false; } std::unique_ptr<PdxInstance> PdxInstanceFactoryImpl::create() { @@ -41,7 +48,8 @@ std::unique_ptr<PdxInstance> PdxInstanceFactoryImpl::create() { "The PdxInstanceFactory.Create() method can only be called once."); } auto pi = std::unique_ptr<PdxInstance>( - new PdxInstanceImpl(m_FieldVsValues, m_pdxType)); + new PdxInstanceImpl(m_FieldVsValues, m_pdxType, m_cachePerfStats, + m_pdxTypeRegistry, m_cache, m_enableTimeStatistics)); m_created = true; return pi; } http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/PdxInstanceFactoryImpl.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/PdxInstanceFactoryImpl.hpp b/src/cppcache/src/PdxInstanceFactoryImpl.hpp index 05a6b85..03eccd4 100644 --- a/src/cppcache/src/PdxInstanceFactoryImpl.hpp +++ b/src/cppcache/src/PdxInstanceFactoryImpl.hpp @@ -24,6 +24,8 @@ #include <geode/CacheableBuiltins.hpp> #include <map> #include "PdxType.hpp" +#include "PdxTypeRegistry.hpp" +#include "CachePerfStats.hpp" namespace apache { namespace geode { @@ -446,13 +448,18 @@ class CPPCACHE_EXPORT PdxInstanceFactoryImpl */ virtual PdxInstanceFactoryPtr markIdentityField(const char* fieldName); - PdxInstanceFactoryImpl(const char* className); + PdxInstanceFactoryImpl(const char* className, CachePerfStats* cachePerfStats, + PdxTypeRegistryPtr m_pdxTypeRegistry, + const Cache* cache, bool enableTimeStatistics); private: bool m_created; PdxTypePtr m_pdxType; FieldVsValues m_FieldVsValues; - + CachePerfStats* m_cachePerfStats; + PdxTypeRegistryPtr m_pdxTypeRegistry; + const Cache* m_cache; + bool m_enableTimeStatistics; void isFieldAdded(const char* fieldName); }; } // namespace client