http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/SerializationRegistry.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/SerializationRegistry.cpp b/src/cppcache/src/SerializationRegistry.cpp index a4e593c..4f308b4 100644 --- a/src/cppcache/src/SerializationRegistry.cpp +++ b/src/cppcache/src/SerializationRegistry.cpp @@ -49,304 +49,80 @@ #include "ThinClientPoolDM.hpp" #include "PdxType.hpp" #include <geode/PdxWrapper.hpp> -#include <geode/PdxSerializable.hpp> #include "EnumInfo.hpp" #include "VersionTag.hpp" #include "DiskStoreId.hpp" #include "DiskVersionTag.hpp" #include "CachedDeserializableHelper.hpp" - -#include "NonCopyable.hpp" - #include <mutex> -#include "util/concurrent/spinlock_mutex.hpp" +#include <functional> namespace apache { namespace geode { namespace client { -/* adongre - * CID 28729: Other violation (MISSING_COPY) - * Class "apache::geode::client::TheTypeMap" owns resources that are managed in - * its - * constructor and destructor but has no user-written copy constructor. - * - * CID 28715: Other violation (MISSING_ASSIGN) - * Class "apache::geode::client::TheTypeMap" owns resources that are managed - * in its constructor and destructor but has no user-written assignment - * operator. - * - * FIX : Make the class NonCopyable - */ - -class TheTypeMap : private NonCopyable, private NonAssignable { - private: - IdToFactoryMap* m_map; - IdToFactoryMap* m_map2; // to hold Fixed IDs since GFE 5.7. - StrToPdxFactoryMap* m_pdxTypemap; - spinlock_mutex m_mapLock; - spinlock_mutex m_map2Lock; - spinlock_mutex m_pdxTypemapLock; - - public: - TheTypeMap(); - - virtual ~TheTypeMap() { - if (m_map != nullptr) { - delete m_map; - } - - if (m_map2 != nullptr) { - delete m_map2; - } - - if (m_pdxTypemap != nullptr) { - delete m_pdxTypemap; - } - } - - inline void setup() { - // Register Geode builtins here!! - // update type ids in GeodeTypeIds.hpp - - bind(CacheableByte::createDeserializable); - bind(CacheableBoolean::createDeserializable); - bind(BooleanArray::createDeserializable); - bind(CacheableBytes::createDeserializable); - bind(CacheableFloat::createDeserializable); - bind(CacheableFloatArray::createDeserializable); - bind(CacheableDouble::createDeserializable); - bind(CacheableDoubleArray::createDeserializable); - bind(CacheableDate::createDeserializable); - bind(CacheableFileName::createDeserializable); - bind(CacheableHashMap::createDeserializable); - bind(CacheableHashSet::createDeserializable); - bind(CacheableHashTable::createDeserializable); - bind(CacheableIdentityHashMap::createDeserializable); - bind(CacheableLinkedHashSet::createDeserializable); - bind(CacheableInt16::createDeserializable); - bind(CacheableInt16Array::createDeserializable); - bind(CacheableInt32::createDeserializable); - bind(CacheableInt32Array::createDeserializable); - bind(CacheableInt64::createDeserializable); - bind(CacheableInt64Array::createDeserializable); - bind(CacheableObjectArray::createDeserializable); - bind(CacheableString::createDeserializable); - bind(CacheableString::createDeserializableHuge); - bind(CacheableString::createUTFDeserializable); - bind(CacheableString::createUTFDeserializableHuge); - bind(CacheableStringArray::createDeserializable); - bind(CacheableVector::createDeserializable); - bind(CacheableArrayList::createDeserializable); - bind(CacheableLinkedList::createDeserializable); - bind(CacheableStack::createDeserializable); - bind(CacheableWideChar::createDeserializable); - bind(CharArray::createDeserializable); - bind(CacheableToken::createDeserializable); - bind(RegionAttributes::createDeserializable); - bind(Properties::createDeserializable); - // bind(CacheableObjectPartList::createDeserializable); - // bind internal/fixed classes - since GFE 5.7 - bind2(CacheableUndefined::createDeserializable); - bind2(EventId::createDeserializable); - bind2(Struct::createDeserializable); - bind2(ClientConnectionResponse::create); - bind2(QueueConnectionResponse::create); - bind2(LocatorListResponse::create); - bind2(ClientProxyMembershipID::createDeserializable); - bind2(GatewayEventCallbackArgument::createDeserializable); - bind2(GatewaySenderEventCallbackArgument::createDeserializable); - bind2(GetAllServersResponse::create); - bind2(TXCommitMessage::create); - bind2(EnumInfo::createDeserializable); - bind2(VersionTag::createDeserializable); - rebind2(GeodeTypeIdsImpl::DiskStoreId, DiskStoreId::createDeserializable); - rebind2(GeodeTypeIdsImpl::DiskVersionTag, - DiskVersionTag::createDeserializable); - bind2(CachedDeserializableHelper::createForVmCachedDeserializable); - bind2(CachedDeserializableHelper::createForPreferBytesDeserializable); - // bind2(VersionedCacheableObjectPartList::createDeserializable); - } - - inline void clear() { - std::lock_guard<spinlock_mutex> guard(m_mapLock); - m_map->unbind_all(); - - std::lock_guard<spinlock_mutex> guard2(m_map2Lock); - m_map2->unbind_all(); - - std::lock_guard<spinlock_mutex> guard3(m_pdxTypemapLock); - m_pdxTypemap->unbind_all(); - } - - inline void find(int64_t id, TypeFactoryMethod& func) { - std::lock_guard<spinlock_mutex> guard(m_mapLock); - m_map->find(id, func); - } - - inline void find2(int64_t id, TypeFactoryMethod& func) { - std::lock_guard<spinlock_mutex> guard(m_map2Lock); - m_map2->find(id, func); - } - - inline void bind(TypeFactoryMethod func) { - Serializable* obj = func(); - std::lock_guard<spinlock_mutex> guard(m_mapLock); - int64_t compId = static_cast<int64_t>(obj->typeId()); - if (compId == GeodeTypeIdsImpl::CacheableUserData || - compId == GeodeTypeIdsImpl::CacheableUserData2 || - compId == GeodeTypeIdsImpl::CacheableUserData4) { - compId |= ((static_cast<int64_t>(obj->classId())) << 32); - } - delete obj; - int bindRes = m_map->bind(compId, func); - if (bindRes == 1) { - LOGERROR( - "A class with " - "ID %d is already registered.", - compId); - throw IllegalStateException( - "A class with " - "given ID is already registered."); - } else if (bindRes == -1) { - LOGERROR( - "Unknown error " - "while adding class ID %d to map.", - compId); - throw IllegalStateException( - "Unknown error " - "while adding type to map."); - } - } - inline void rebind(int64_t compId, TypeFactoryMethod func) { - std::lock_guard<spinlock_mutex> guard(m_mapLock); - int bindRes = m_map->rebind(compId, func); - if (bindRes == -1) { - LOGERROR( - "Unknown error " - "while adding class ID %d to map.", - compId); - throw IllegalStateException( - "Unknown error " - "while adding type to map."); - } - } - - inline void unbind(int64_t compId) { - std::lock_guard<spinlock_mutex> guard(m_mapLock); - m_map->unbind(compId); - } - - inline void bind2(TypeFactoryMethod func) { - Serializable* obj = func(); - std::lock_guard<spinlock_mutex> guard(m_map2Lock); - int8_t dsfid = obj->DSFID(); - - int64_t compId = 0; - if (dsfid == GeodeTypeIdsImpl::FixedIDShort) { - compId = compId = static_cast<int64_t>(obj->classId()); - } else { - compId = static_cast<int64_t>(obj->typeId()); - } - delete obj; - int bindRes = m_map2->bind(compId, func); - if (bindRes == 1) { - LOGERROR( - "A fixed class with " - "ID %d is already registered.", - compId); - throw IllegalStateException( - "A fixed class with " - "given ID is already registered."); - } else if (bindRes == -1) { - LOGERROR( - "Unknown error " - "while adding class ID %d to map2.", - compId); - throw IllegalStateException( - "Unknown error " - "while adding to map2."); - } - } - - inline void rebind2(int64_t compId, TypeFactoryMethod func) { - std::lock_guard<spinlock_mutex> guard(m_map2Lock); - m_map2->rebind(compId, func); - } - - inline void unbind2(int64_t compId) { - std::lock_guard<spinlock_mutex> guard(m_map2Lock); - m_map2->unbind(compId); - } - - inline void bindPdxType(TypeFactoryMethodPdx func) { - PdxSerializable* obj = func(); - std::lock_guard<spinlock_mutex> guard(m_pdxTypemapLock); - const char* objFullName = obj->getClassName(); - - int bindRes = m_pdxTypemap->bind(objFullName, func); - - delete obj; - - if (bindRes == 1) { - LOGERROR("A object with FullName %s is already registered.", objFullName); - throw IllegalStateException( - "A Object with " - "given FullName is already registered."); - } else if (bindRes == -1) { - LOGERROR( - "Unknown error " - "while adding Pdx Object named %s to map.", - objFullName); - throw IllegalStateException( - "Unknown error " - "while adding type to map."); - } - } - - inline void findPdxType(const char* objFullName, TypeFactoryMethodPdx& func) { - std::lock_guard<spinlock_mutex> guard(m_pdxTypemapLock); - m_pdxTypemap->find(objFullName, func); - } - - inline void rebindPdxType(const char* objFullName, - TypeFactoryMethodPdx func) { - std::lock_guard<spinlock_mutex> guard(m_pdxTypemapLock); - int bindRes = m_pdxTypemap->rebind(objFullName, func); - if (bindRes == -1) { - LOGERROR( - "Unknown error " - "while adding Pdx Object FullName %s to map.", - objFullName); - throw IllegalStateException( - "Unknown error " - "while adding type to map."); - } - } - - inline void unbindPdxType(const char* objFullName) { - std::lock_guard<spinlock_mutex> guard(m_pdxTypemapLock); - m_pdxTypemap->unbind(objFullName); - } -}; - -TheTypeMap::TheTypeMap() { - m_map = new IdToFactoryMap(); - - // second map to hold internal Data Serializable Fixed IDs - since GFE 5.7 - m_map2 = new IdToFactoryMap(); - - // map to hold PDX types <string, funptr>. - m_pdxTypemap = new StrToPdxFactoryMap(); +void TheTypeMap::setup() { + // Register Geode builtins here!! + // update type ids in GeodeTypeIds.hpp + + bind(CacheableByte::createDeserializable); + bind(CacheableBoolean::createDeserializable); + bind(BooleanArray::createDeserializable); + bind(CacheableBytes::createDeserializable); + bind(CacheableFloat::createDeserializable); + bind(CacheableFloatArray::createDeserializable); + bind(CacheableDouble::createDeserializable); + bind(CacheableDoubleArray::createDeserializable); + bind(CacheableDate::createDeserializable); + bind(CacheableFileName::createDeserializable); + bind(CacheableHashMap::createDeserializable); + bind(CacheableHashSet::createDeserializable); + bind(CacheableHashTable::createDeserializable); + bind(CacheableIdentityHashMap::createDeserializable); + bind(CacheableLinkedHashSet::createDeserializable); + bind(CacheableInt16::createDeserializable); + bind(CacheableInt16Array::createDeserializable); + bind(CacheableInt32::createDeserializable); + bind(CacheableInt32Array::createDeserializable); + bind(CacheableInt64::createDeserializable); + bind(CacheableInt64Array::createDeserializable); + bind(CacheableObjectArray::createDeserializable); + bind(CacheableString::createDeserializable); + bind(CacheableString::createDeserializableHuge); + bind(CacheableString::createUTFDeserializable); + bind(CacheableString::createUTFDeserializableHuge); + bind(CacheableStringArray::createDeserializable); + bind(CacheableVector::createDeserializable); + bind(CacheableArrayList::createDeserializable); + bind(CacheableLinkedList::createDeserializable); + bind(CacheableStack::createDeserializable); + bind(CacheableWideChar::createDeserializable); + bind(CharArray::createDeserializable); + bind(CacheableToken::createDeserializable); + bind(RegionAttributes::createDeserializable); + bind(Properties::createDeserializable); + + bind2(CacheableUndefined::createDeserializable); + bind2(EventId::createDeserializable); + bind2(Struct::createDeserializable); + bind2(ClientConnectionResponse::create); + bind2(QueueConnectionResponse::create); + bind2(LocatorListResponse::create); + bind2(ClientProxyMembershipID::createDeserializable); + bind2(GatewayEventCallbackArgument::createDeserializable); + bind2(GatewaySenderEventCallbackArgument::createDeserializable); + bind2(GetAllServersResponse::create); + bind2(EnumInfo::createDeserializable); + + rebind2(GeodeTypeIdsImpl::DiskStoreId, DiskStoreId::createDeserializable); + + bind2(CachedDeserializableHelper::createForVmCachedDeserializable); + bind2(CachedDeserializableHelper::createForPreferBytesDeserializable); } -typedef ACE_Singleton<TheTypeMap, ACE_Thread_Mutex> theTypeMap; - -PdxSerializerPtr SerializationRegistry::m_pdxSerializer = nullptr; - /** This starts at reading the typeid.. assumes the length has been read. */ SerializablePtr SerializationRegistry::deserialize(DataInput& input, - int8_t typeId) { + int8_t typeId) const { bool findinternal = false; int8_t currentTypeId = typeId; @@ -393,9 +169,9 @@ SerializablePtr SerializationRegistry::deserialize(DataInput& input, } if (findinternal) { - theTypeMap::instance()->find2(compId, createType); + theTypeMap.find2(compId, createType); } else { - theTypeMap::instance()->find(compId, createType); + theTypeMap.find(compId, createType); } if (createType == nullptr) { if (findinternal) { @@ -425,47 +201,41 @@ SerializablePtr SerializationRegistry::deserialize(DataInput& input, } void SerializationRegistry::addType(TypeFactoryMethod func) { - theTypeMap::instance()->bind(func); + theTypeMap.bind(func); } void SerializationRegistry::addPdxType(TypeFactoryMethodPdx func) { - theTypeMap::instance()->bindPdxType(func); + theTypeMap.bindPdxType(func); } void SerializationRegistry::addType(int64_t compId, TypeFactoryMethod func) { - theTypeMap::instance()->rebind(compId, func); + theTypeMap.rebind(compId, func); } void SerializationRegistry::removeType(int64_t compId) { - theTypeMap::instance()->unbind(compId); + theTypeMap.unbind(compId); } void SerializationRegistry::addType2(TypeFactoryMethod func) { - theTypeMap::instance()->bind2(func); + theTypeMap.bind2(func); } void SerializationRegistry::addType2(int64_t compId, TypeFactoryMethod func) { - theTypeMap::instance()->rebind2(compId, func); + theTypeMap.rebind2(compId, func); } void SerializationRegistry::removeType2(int64_t compId) { - theTypeMap::instance()->unbind2(compId); + theTypeMap.unbind2(compId); } -void SerializationRegistry::init() { - // Everything here is done in the constructor for TheTypeMap... - theTypeMap::instance(); - theTypeMap::instance()->clear(); - theTypeMap::instance()->setup(); -} - -PdxSerializablePtr SerializationRegistry::getPdxType(const char* className) { +PdxSerializablePtr SerializationRegistry::getPdxType(char* className) { TypeFactoryMethodPdx objectType = nullptr; - theTypeMap::instance()->findPdxType(className, objectType); + theTypeMap.findPdxType(className, objectType); PdxSerializablePtr pdxObj; if (nullptr == objectType) { try { - pdxObj = std::make_shared<PdxWrapper>(className); + pdxObj = + std::make_shared<PdxWrapper>((const char*)className, m_pdxSerializer); } catch (const Exception&) { LOGERROR( "Unregistered class %s during PDX deserialization: Did the " @@ -488,19 +258,8 @@ PdxSerializerPtr SerializationRegistry::getPdxSerializer() { return m_pdxSerializer; } -int32_t SerializationRegistry::GetPDXIdForType(const char* poolName, - SerializablePtr pdxType) { - PoolPtr pool = nullptr; - - if (poolName == nullptr) { - for (const auto& iter : PoolManager::getAll()) { - pool = iter.second; - break; - } - } else { - pool = PoolManager::find(poolName); - } - +int32_t SerializationRegistry::GetPDXIdForType(PoolPtr pool, + SerializablePtr pdxType) const { if (pool == nullptr) { throw IllegalStateException("Pool not found, Pdx operation failed"); } @@ -508,19 +267,8 @@ int32_t SerializationRegistry::GetPDXIdForType(const char* poolName, return static_cast<ThinClientPoolDM*>(pool.get())->GetPDXIdForType(pdxType); } -SerializablePtr SerializationRegistry::GetPDXTypeById(const char* poolName, - int32_t typeId) { - PoolPtr pool = nullptr; - - if (poolName == nullptr) { - for (const auto& iter : PoolManager::getAll()) { - pool = iter.second; - break; - } - } else { - pool = PoolManager::find(poolName); - } - +SerializablePtr SerializationRegistry::GetPDXTypeById(PoolPtr pool, + int32_t typeId) const { if (pool == nullptr) { throw IllegalStateException("Pool not found, Pdx operation failed"); } @@ -528,16 +276,16 @@ SerializablePtr SerializationRegistry::GetPDXTypeById(const char* poolName, return static_cast<ThinClientPoolDM*>(pool.get())->GetPDXTypeById(typeId); } -int32_t SerializationRegistry::GetEnumValue(SerializablePtr enumInfo) { - PoolPtr pool = getPool(); +int32_t SerializationRegistry::GetEnumValue(PoolPtr pool, + SerializablePtr enumInfo) const { if (pool == nullptr) { throw IllegalStateException("Pool not found, Pdx operation failed"); } return static_cast<ThinClientPoolDM*>(pool.get())->GetEnumValue(enumInfo); } -SerializablePtr SerializationRegistry::GetEnum(int32_t val) { - PoolPtr pool = getPool(); +SerializablePtr SerializationRegistry::GetEnum(PoolPtr pool, + int32_t val) const { if (pool == nullptr) { throw IllegalStateException("Pool not found, Pdx operation failed"); } @@ -545,14 +293,167 @@ SerializablePtr SerializationRegistry::GetEnum(int32_t val) { return static_cast<ThinClientPoolDM*>(pool.get())->GetEnum(val); } -PoolPtr SerializationRegistry::getPool() { - PoolPtr pool = nullptr; - for (const auto& iter: PoolManager::getAll()) { - pool = iter.second; - break; +void TheTypeMap::clear() { + std::lock_guard<util::concurrent::spinlock_mutex> guard(m_mapLock); + m_map->unbind_all(); + + std::lock_guard<util::concurrent::spinlock_mutex> guard2(m_map2Lock); + m_map2->unbind_all(); + + std::lock_guard<util::concurrent::spinlock_mutex> guard3(m_pdxTypemapLock); + m_pdxTypemap->unbind_all(); +} + +void TheTypeMap::find(int64_t id, TypeFactoryMethod& func) const { + std::lock_guard<util::concurrent::spinlock_mutex> guard(m_mapLock); + m_map->find(id, func); +} + +void TheTypeMap::find2(int64_t id, TypeFactoryMethod& func) const { + std::lock_guard<util::concurrent::spinlock_mutex> guard(m_map2Lock); + m_map2->find(id, func); +} + +void TheTypeMap::bind(TypeFactoryMethod func) { + Serializable* obj = func(); + std::lock_guard<util::concurrent::spinlock_mutex> guard(m_mapLock); + int64_t compId = static_cast<int64_t>(obj->typeId()); + if (compId == GeodeTypeIdsImpl::CacheableUserData || + compId == GeodeTypeIdsImpl::CacheableUserData2 || + compId == GeodeTypeIdsImpl::CacheableUserData4) { + compId |= ((static_cast<int64_t>(obj->classId())) << 32); + } + delete obj; + int bindRes = m_map->bind(compId, func); + if (bindRes == 1) { + LOGERROR( + "A class with " + "ID %d is already registered.", + compId); + throw IllegalStateException( + "A class with " + "given ID is already registered."); + } else if (bindRes == -1) { + LOGERROR( + "Unknown error " + "while adding class ID %d to map.", + compId); + throw IllegalStateException( + "Unknown error " + "while adding type to map."); + } +} + +void TheTypeMap::rebind(int64_t compId, TypeFactoryMethod func) { + std::lock_guard<util::concurrent::spinlock_mutex> guard(m_mapLock); + int bindRes = m_map->rebind(compId, func); + if (bindRes == -1) { + LOGERROR( + "Unknown error " + "while adding class ID %d to map.", + compId); + throw IllegalStateException( + "Unknown error " + "while adding type to map."); + } +} + +void TheTypeMap::unbind(int64_t compId) { + std::lock_guard<util::concurrent::spinlock_mutex> guard(m_mapLock); + m_map->unbind(compId); +} + +void TheTypeMap::bind2(TypeFactoryMethod func) { + Serializable* obj = func(); + std::lock_guard<util::concurrent::spinlock_mutex> guard(m_map2Lock); + int8_t dsfid = obj->DSFID(); + + int64_t compId = 0; + if (dsfid == GeodeTypeIdsImpl::FixedIDShort) { + compId = compId = static_cast<int64_t>(obj->classId()); + } else { + compId = static_cast<int64_t>(obj->typeId()); + } + delete obj; + int bindRes = m_map2->bind(compId, func); + if (bindRes == 1) { + LOGERROR( + "A fixed class with " + "ID %d is already registered.", + compId); + throw IllegalStateException( + "A fixed class with " + "given ID is already registered."); + } else if (bindRes == -1) { + LOGERROR( + "Unknown error " + "while adding class ID %d to map2.", + compId); + throw IllegalStateException( + "Unknown error " + "while adding to map2."); } - return pool; } + +void TheTypeMap::rebind2(int64_t compId, TypeFactoryMethod func) { + std::lock_guard<util::concurrent::spinlock_mutex> guard(m_map2Lock); + m_map2->rebind(compId, func); +} + +void TheTypeMap::unbind2(int64_t compId) { + std::lock_guard<util::concurrent::spinlock_mutex> guard(m_map2Lock); + m_map2->unbind(compId); +} + +void TheTypeMap::bindPdxType(TypeFactoryMethodPdx func) { + PdxSerializable* obj = func(); + std::lock_guard<util::concurrent::spinlock_mutex> guard(m_pdxTypemapLock); + const char* objFullName = obj->getClassName(); + + int bindRes = m_pdxTypemap->bind(objFullName, func); + + delete obj; + + if (bindRes == 1) { + LOGERROR("A object with FullName %s is already registered.", objFullName); + throw IllegalStateException( + "A Object with " + "given FullName is already registered."); + } else if (bindRes == -1) { + LOGERROR( + "Unknown error " + "while adding Pdx Object named %s to map.", + objFullName); + throw IllegalStateException( + "Unknown error " + "while adding type to map."); + } +} + +void TheTypeMap::findPdxType(char* objFullName, TypeFactoryMethodPdx& func) { + std::lock_guard<util::concurrent::spinlock_mutex> guard(m_pdxTypemapLock); + m_pdxTypemap->find(objFullName, func); +} + +void TheTypeMap::rebindPdxType(char* objFullName, TypeFactoryMethodPdx func) { + std::lock_guard<util::concurrent::spinlock_mutex> guard(m_pdxTypemapLock); + int bindRes = m_pdxTypemap->rebind(objFullName, func); + if (bindRes == -1) { + LOGERROR( + "Unknown error " + "while adding Pdx Object FullName %s to map.", + objFullName); + throw IllegalStateException( + "Unknown error " + "while adding type to map."); + } +} + +void TheTypeMap::unbindPdxType(char* objFullName) { + std::lock_guard<util::concurrent::spinlock_mutex> guard(m_pdxTypemapLock); + m_pdxTypemap->unbind(objFullName); +} + } // namespace client } // namespace geode } // namespace apache
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/SerializationRegistry.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/SerializationRegistry.hpp b/src/cppcache/src/SerializationRegistry.hpp index 33cc86b..9803a8a 100644 --- a/src/cppcache/src/SerializationRegistry.hpp +++ b/src/cppcache/src/SerializationRegistry.hpp @@ -33,6 +33,10 @@ #include <geode/ExceptionTypes.hpp> #include <geode/Delta.hpp> #include <string> +#include "util/concurrent/spinlock_mutex.hpp" +#include "NonCopyable.hpp" +#include <geode/PdxSerializable.hpp> +#include "MemberListForVersionStamp.hpp" #if defined(_MACOSX) ACE_BEGIN_VERSIONED_NAMESPACE_DECL @@ -58,14 +62,75 @@ typedef ACE_Hash_Map_Manager<int64_t, TypeFactoryMethod, ACE_Null_Mutex> typedef ACE_Hash_Map_Manager<std::string, TypeFactoryMethodPdx, ACE_Null_Mutex> StrToPdxFactoryMap; +class TheTypeMap : private NonCopyable { + private: + IdToFactoryMap* m_map; + IdToFactoryMap* m_map2; // to hold Fixed IDs since GFE 5.7. + StrToPdxFactoryMap* m_pdxTypemap; + mutable util::concurrent::spinlock_mutex m_mapLock; + mutable util::concurrent::spinlock_mutex m_map2Lock; + mutable util::concurrent::spinlock_mutex m_pdxTypemapLock; + + public: + TheTypeMap() { + m_map = new IdToFactoryMap(); + + // second map to hold internal Data Serializable Fixed IDs - since GFE 5.7 + m_map2 = new IdToFactoryMap(); + + // map to hold PDX types <string, funptr>. + m_pdxTypemap = new StrToPdxFactoryMap(); + + setup(); + } + + virtual ~TheTypeMap() { + if (m_map != nullptr) { + delete m_map; + } + + if (m_map2 != nullptr) { + delete m_map2; + } + + if (m_pdxTypemap != nullptr) { + delete m_pdxTypemap; + } + } + + void setup(); + + void clear(); + + void find(int64_t id, TypeFactoryMethod& func) const; + void find2(int64_t id, TypeFactoryMethod& func) const; + + void bind(TypeFactoryMethod func); + + inline void rebind(int64_t compId, TypeFactoryMethod func); + inline void unbind(int64_t compId); + inline void bind2(TypeFactoryMethod func); + + inline void rebind2(int64_t compId, TypeFactoryMethod func); + + inline void unbind2(int64_t compId); + inline void bindPdxType(TypeFactoryMethodPdx func); + inline void findPdxType(char* objFullName, TypeFactoryMethodPdx& func); + inline void unbindPdxType(char* objFullName); + + void rebindPdxType(char* objFullName, TypeFactoryMethodPdx func); +}; + class CPPCACHE_EXPORT SerializationRegistry { public: + SerializationRegistry() : theTypeMap() {} + /** write the length of the serialization, write the typeId of the object, * then write whatever the object's toData requires. The length at the * front is backfilled after the serialization. */ - inline static void serialize(const Serializable* obj, DataOutput& output, - bool isDelta = false) { + inline void serialize(const Serializable* obj, DataOutput& output, + bool isDelta = false) const { if (obj == nullptr) { output.write(static_cast<int8_t>(GeodeTypeIds::NullObj)); } else { @@ -106,7 +171,7 @@ class CPPCACHE_EXPORT SerializationRegistry { } } - inline static void serialize(const SerializablePtr& obj, DataOutput& output) { + inline void serialize(const SerializablePtr& obj, DataOutput& output) const { serialize(obj.get(), output); } @@ -114,45 +179,44 @@ class CPPCACHE_EXPORT SerializationRegistry { * Read the length, typeid, and run the objs fromData. Returns the New * object. */ - static SerializablePtr deserialize(DataInput& input, int8_t typeId = -1); - - static void addType(TypeFactoryMethod func); + SerializablePtr deserialize(DataInput& input, int8_t typeId = -1) const; - static void addType(int64_t compId, TypeFactoryMethod func); + void addType(TypeFactoryMethod func); - static void addPdxType(TypeFactoryMethodPdx func); + void addType(int64_t compId, TypeFactoryMethod func); - static void setPdxSerializer(PdxSerializerPtr pdxSerializer); + void addPdxType(TypeFactoryMethodPdx func); - static PdxSerializerPtr getPdxSerializer(); + void setPdxSerializer(PdxSerializerPtr pdxSerializer); - static void removeType(int64_t compId); + PdxSerializerPtr getPdxSerializer(); - static void init(); + void removeType(int64_t compId); // following for internal types with Data Serializable Fixed IDs - since GFE // 5.7 - static void addType2(TypeFactoryMethod func); + void addType2(TypeFactoryMethod func); - static void addType2(int64_t compId, TypeFactoryMethod func); + void addType2(int64_t compId, TypeFactoryMethod func); - static void removeType2(int64_t compId); + void removeType2(int64_t compId); - static int32_t GetPDXIdForType(const char* poolName, SerializablePtr pdxType); + int32_t GetPDXIdForType(PoolPtr pool, SerializablePtr pdxType) const; - static SerializablePtr GetPDXTypeById(const char* poolName, int32_t typeId); + SerializablePtr GetPDXTypeById(PoolPtr pool, int32_t typeId) const; - static int32_t GetEnumValue(SerializablePtr enumInfo); - static SerializablePtr GetEnum(int32_t val); + int32_t GetEnumValue(PoolPtr pool, SerializablePtr enumInfo) const; + SerializablePtr GetEnum(PoolPtr pool, int32_t val) const; - static PdxSerializablePtr getPdxType(const char* className); + PdxSerializablePtr getPdxType(char* className); private: - static PoolPtr getPool(); - static IdToFactoryMap* s_typeMap; - static PdxSerializerPtr m_pdxSerializer; + PdxSerializerPtr m_pdxSerializer; + TheTypeMap theTypeMap; }; + +typedef std::shared_ptr<SerializationRegistry> SerializationRegistryPtr; } // namespace client } // namespace geode } // namespace apache http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TXCommitMessage.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TXCommitMessage.cpp b/src/cppcache/src/TXCommitMessage.cpp index 211450f..933d980 100644 --- a/src/cppcache/src/TXCommitMessage.cpp +++ b/src/cppcache/src/TXCommitMessage.cpp @@ -33,7 +33,7 @@ namespace apache { namespace geode { namespace client { -TXCommitMessage::TXCommitMessage() +TXCommitMessage::TXCommitMessage(MemberListForVersionStamp & memberListForVersionStamp) : m_memberListForVersionStamp(memberListForVersionStamp) // UNUSED : m_processorId(0) {} @@ -88,7 +88,7 @@ m_processorId = -1; int32_t regionSize; input.readInt(®ionSize); for (int32_t i = 0; i < regionSize; i++) { - auto rc = std::make_shared<RegionCommit>(); + auto rc = std::make_shared<RegionCommit>(m_memberListForVersionStamp); rc->fromData(input); m_regions.push_back(rc); } @@ -159,7 +159,7 @@ int8_t TXCommitMessage::typeId() const { return static_cast<int8_t>(GeodeTypeIdsImpl::TXCommitMessage); } -Serializable* TXCommitMessage::create() { return new TXCommitMessage(); } +Serializable* TXCommitMessage::create(MemberListForVersionStamp & memberListForVersionStamp) { return new TXCommitMessage(memberListForVersionStamp); } void TXCommitMessage::apply(Cache* cache) { for (std::vector<RegionCommitPtr>::iterator iter = m_regions.begin(); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TXCommitMessage.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TXCommitMessage.hpp b/src/cppcache/src/TXCommitMessage.hpp index eb1b89b..09bc702 100644 --- a/src/cppcache/src/TXCommitMessage.hpp +++ b/src/cppcache/src/TXCommitMessage.hpp @@ -32,14 +32,14 @@ _GF_PTR_DEF_(TXCommitMessage, TXCommitMessagePtr); class TXCommitMessage : public apache::geode::client::Cacheable { public: - TXCommitMessage(); + TXCommitMessage(MemberListForVersionStamp & memberListForVersionStamp); virtual ~TXCommitMessage(); virtual Serializable* fromData(DataInput& input); virtual void toData(DataOutput& output) const; virtual int32_t classId() const; int8_t typeId() const; - static Serializable* create(); + static Serializable* create(MemberListForVersionStamp & memberListForVersionStamp); // VectorOfEntryEvent getEvents(Cache* cache); void apply(Cache* cache); @@ -47,7 +47,7 @@ class TXCommitMessage : public apache::geode::client::Cacheable { private: // UNUSED int32_t m_processorId; bool isAckRequired(); - + MemberListForVersionStamp & m_memberListForVersionStamp; std::vector<RegionCommitPtr> m_regions; }; } // namespace client http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcpConn.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TcpConn.cpp b/src/cppcache/src/TcpConn.cpp index b80aa51..581d267 100644 --- a/src/cppcache/src/TcpConn.cpp +++ b/src/cppcache/src/TcpConn.cpp @@ -27,11 +27,9 @@ #include <ace/SOCK_Connector.h> #include <ace/SOCK_Acceptor.h> #include <ace/OS.h> - +#include "CacheImpl.hpp" using namespace apache::geode::client; -int TcpConn::m_chunkSize = TcpConn::setChunkSize(); - void TcpConn::clearNagle(ACE_SOCKET sock) { int32_t val = 1; #ifdef WIN32 @@ -60,19 +58,10 @@ int32_t TcpConn::maxSize(ACE_SOCKET sock, int32_t flag, int32_t size) { socklen_t plen = sizeof(val); socklen_t clen = sizeof(val); - static int32_t max = 32000; - if (m_maxBuffSizePool <= 0) { - SystemProperties *props = DistributedSystem::getSystemProperties(); - if (props != nullptr) { - max = props->maxSocketBufferSize(); - } - } else { - max = m_maxBuffSizePool; - } int32_t inc = 32120; val = size - (3 * inc); if (val < 0) val = 0; - if (size == 0) size = max; + if (size == 0) size = m_maxBuffSizePool; int32_t red = 0; int32_t lastRed = -1; while (lastRed != red) { @@ -92,7 +81,7 @@ int32_t TcpConn::maxSize(ACE_SOCKET sock, int32_t flag, int32_t size) { #ifdef _LINUX val /= 2; #endif - if ((val >= max) || (val >= size)) continue; + if ((val >= m_maxBuffSizePool) || (val >= size)) continue; red = val; } return val; @@ -105,17 +94,7 @@ void TcpConn::createSocket(ACE_SOCKET sock) { } void TcpConn::init() { - /* adongre - * CID 28736: Improper use of negative value (NEGATIVE_RETURNS) - * Function "socket(2, 1, 0)" returns a negative number. - * Assigning: unsigned variable "sock" = "socket". - * - * CID 28737: Unsigned compared against 0 (NO_EFFECT) - * This less-than-zero comparison of an unsigned value is never true. "sock < - * 0U". - */ ACE_SOCKET sock = socket(AF_INET, SOCK_STREAM, 0); - // if ( sock < 0 ) { if (sock == -1) { int32_t lastError = ACE_OS::last_error(); LOGERROR("Failed to create socket. Errno: %d: %s", lastError, @@ -128,19 +107,19 @@ void TcpConn::init() { clearNagle(sock); - static int32_t readSize = 0; - static int32_t writeSize = 0; + int32_t readSize = 0; + int32_t writeSize = 0; int32_t originalReadSize = readSize; readSize = maxSize(sock, SO_SNDBUF, readSize); if (originalReadSize != readSize) { // This should get logged once at startup and again only if it changes - LOGINFO("Using socket send buffer size of %d.", readSize); + LOGFINEST("Using socket send buffer size of %d.", readSize); } int32_t originalWriteSize = writeSize; writeSize = maxSize(sock, SO_RCVBUF, writeSize); if (originalWriteSize != writeSize) { // This should get logged once at startup and again only if it changes - LOGINFO("Using socket receive buffer size of %d.", writeSize); + LOGFINEST("Using socket receive buffer size of %d.", writeSize); } createSocket(sock); @@ -148,21 +127,21 @@ void TcpConn::init() { connect(); } -TcpConn::TcpConn() : m_io(nullptr), m_waitSeconds(0), m_maxBuffSizePool(0) {} - TcpConn::TcpConn(const char *ipaddr, uint32_t waitSeconds, int32_t maxBuffSizePool) : m_io(nullptr), m_addr(ipaddr), - m_waitSeconds(waitSeconds), - m_maxBuffSizePool(maxBuffSizePool) {} + m_waitMilliSeconds(waitSeconds * 1000), + m_maxBuffSizePool(maxBuffSizePool), + m_chunkSize(getDefaultChunkSize()) {} TcpConn::TcpConn(const char *hostname, int32_t port, uint32_t waitSeconds, int32_t maxBuffSizePool) : m_io(nullptr), m_addr(port, hostname), - m_waitSeconds(waitSeconds), - m_maxBuffSizePool(maxBuffSizePool) {} + m_waitMilliSeconds(waitSeconds * 1000), + m_maxBuffSizePool(maxBuffSizePool), + m_chunkSize(getDefaultChunkSize()) {} void TcpConn::listen(const char *hostname, int32_t port, uint32_t waitSeconds) { ACE_INET_Addr addr(port, hostname); @@ -216,14 +195,14 @@ void TcpConn::connect(const char *hostname, int32_t port, uint32_t waitSeconds) { ACE_INET_Addr addr(port, hostname); m_addr = addr; - m_waitSeconds = waitSeconds; + m_waitMilliSeconds = waitSeconds; connect(); } void TcpConn::connect(const char *ipaddr, uint32_t waitSeconds) { ACE_INET_Addr addr(ipaddr); m_addr = addr; - m_waitSeconds = waitSeconds; + m_waitMilliSeconds = waitSeconds; connect(); } @@ -231,25 +210,18 @@ void TcpConn::connect() { GF_DEV_ASSERT(m_io != nullptr); ACE_INET_Addr ipaddr = m_addr; - uint32_t waitSeconds = m_waitSeconds; + uint32_t waitMicroSeconds = m_waitMilliSeconds * 1000; ACE_OS::signal(SIGPIPE, SIG_IGN); // Ignore broken pipe - // passing waittime as microseconds - if (DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis()) { - waitSeconds = waitSeconds * 1000; - } else { - waitSeconds = waitSeconds * (1000 * 1000); - } - LOGFINER("Connecting plain socket stream to %s:%d waiting %d micro sec", - ipaddr.get_host_name(), ipaddr.get_port_number(), waitSeconds); + ipaddr.get_host_name(), ipaddr.get_port_number(), waitMicroSeconds); ACE_SOCK_Connector conn; int32_t retVal = 0; - if (waitSeconds > 0) { + if (waitMicroSeconds > 0) { // passing waittime as microseconds - ACE_Time_Value wtime(0, waitSeconds); + ACE_Time_Value wtime(0, waitMicroSeconds); retVal = conn.connect(*m_io, ipaddr, &wtime); } else { retVal = conn.connect(*m_io, ipaddr); @@ -258,10 +230,10 @@ void TcpConn::connect() { char msg[256]; int32_t lastError = ACE_OS::last_error(); if (lastError == ETIME || lastError == ETIMEDOUT) { - ACE_OS::snprintf( - msg, 256, - "TcpConn::connect Attempt to connect timed out after %d seconds.", - waitSeconds); + ACE_OS::snprintf(msg, 256, + "TcpConn::connect Attempt to connect timed out after %d " + "microseconds.", + waitMicroSeconds); // this is only called by constructor, so we must delete m_io GF_SAFE_DELETE(m_io); throw TimeoutException(msg); @@ -269,7 +241,7 @@ void TcpConn::connect() { ACE_OS::snprintf(msg, 256, "TcpConn::connect failed with errno: %d: %s", lastError, ACE_OS::strerror(lastError)); // this is only called by constructor, so we must delete m_io - close(); + close(); throw GeodeIOException(msg); } int rc = this->m_io->enable(ACE_NONBLOCK); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcpConn.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TcpConn.hpp b/src/cppcache/src/TcpConn.hpp index 60ee0a9..13964d4 100644 --- a/src/cppcache/src/TcpConn.hpp +++ b/src/cppcache/src/TcpConn.hpp @@ -50,7 +50,7 @@ class CPPCACHE_EXPORT TcpConn : public Connector { protected: ACE_INET_Addr m_addr; - uint32_t m_waitSeconds; + uint32_t m_waitMilliSeconds; int32_t m_maxBuffSizePool; @@ -65,9 +65,9 @@ class CPPCACHE_EXPORT TcpConn : public Connector { virtual void createSocket(ACE_SOCKET sock); public: - static int m_chunkSize; + int m_chunkSize; - static int setChunkSize() { + static int getDefaultChunkSize() { // Attempt to set chunk size to nearest OS page size // for perf improvement int pageSize = ACE_OS::getpagesize(); @@ -80,12 +80,9 @@ class CPPCACHE_EXPORT TcpConn : public Connector { return 16000000; } - TcpConn(); - TcpConn(const char* hostname, int32_t port, - uint32_t waitSeconds = DEFAULT_CONNECT_TIMEOUT, - int32_t maxBuffSizePool = 0); - TcpConn(const char* ipaddr, uint32_t waitSeconds = DEFAULT_CONNECT_TIMEOUT, - int32_t maxBuffSizePool = 0); + TcpConn(const char* hostname, int32_t port, uint32_t waitSeconds, + int32_t maxBuffSizePool); + TcpConn(const char* ipaddr, uint32_t waitSeconds, int32_t maxBuffSizePool); virtual ~TcpConn() { close(); } http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcpSslConn.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TcpSslConn.cpp b/src/cppcache/src/TcpSslConn.cpp index 3f4935f..74b999d 100644 --- a/src/cppcache/src/TcpSslConn.cpp +++ b/src/cppcache/src/TcpSslConn.cpp @@ -19,7 +19,7 @@ #include <geode/SystemProperties.hpp> #include <geode/DistributedSystem.hpp> #include "../../cryptoimpl/Ssl.hpp" - +#include "CacheImpl.hpp" using namespace apache::geode::client; Ssl* TcpSslConn::getSSLImpl(ACE_SOCKET sock, const char* pubkeyfile, @@ -42,20 +42,13 @@ Ssl* TcpSslConn::getSSLImpl(ACE_SOCKET sock, const char* pubkeyfile, LOGERROR(msg); throw IllegalStateException(msg); } - // adongre: Added for Ticket #758 - const char* pemPassword = - DistributedSystem::getSystemProperties()->sslKeystorePassword(); - return reinterpret_cast<Ssl*>( - func(sock, pubkeyfile, privkeyfile, pemPassword)); + func(sock, pubkeyfile, privkeyfile, m_pemPassword)); } void TcpSslConn::createSocket(ACE_SOCKET sock) { - SystemProperties* props = DistributedSystem::getSystemProperties(); - const char* pubkeyfile = props->sslTrustStore(); - const char* privkeyfile = props->sslKeyStore(); LOGDEBUG("Creating SSL socket stream"); - m_ssl = getSSLImpl(sock, pubkeyfile, privkeyfile); + m_ssl = getSSLImpl(sock, m_pubkeyfile, m_privkeyfile); } void TcpSslConn::listen(ACE_INET_Addr addr, uint32_t waitSeconds) { @@ -101,28 +94,21 @@ void TcpSslConn::connect() { // m_ssl->init(); - uint32_t waitSeconds = m_waitSeconds; - - // passing waittime as microseconds - if (DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis()) { - waitSeconds = waitSeconds * 1000; - } else { - waitSeconds = waitSeconds * (1000 * 1000); - } + uint32_t waitMicroSeconds = m_waitMilliSeconds * 1000; - LOGDEBUG("Connecting SSL socket stream to %s:%d waiting %d sec", - m_addr.get_host_name(), m_addr.get_port_number(), m_waitSeconds); + LOGDEBUG("Connecting SSL socket stream to %s:%d waiting %d micro sec", + m_addr.get_host_name(), m_addr.get_port_number(), waitMicroSeconds); - int32_t retVal = m_ssl->connect(m_addr, waitSeconds); + int32_t retVal = m_ssl->connect(m_addr, waitMicroSeconds); if (retVal == -1) { char msg[256]; int32_t lastError = ACE_OS::last_error(); if (lastError == ETIME || lastError == ETIMEDOUT) { - ACE_OS::snprintf( - msg, 256, - "TcpSslConn::connect Attempt to connect timed out after %d seconds.", - m_waitSeconds); + ACE_OS::snprintf(msg, 256, + "TcpSslConn::connect Attempt to connect timed out after " + "%d micro-seconds.", + waitMicroSeconds); // this is only called by constructor, so we must delete m_ssl GF_SAFE_DELETE(m_ssl); throw TimeoutException(msg); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcpSslConn.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TcpSslConn.hpp b/src/cppcache/src/TcpSslConn.hpp index 387a95c..1ebf3db 100644 --- a/src/cppcache/src/TcpSslConn.hpp +++ b/src/cppcache/src/TcpSslConn.hpp @@ -32,6 +32,9 @@ class TcpSslConn : public TcpConn { private: Ssl* m_ssl; ACE_DLL m_dll; + const char* m_pubkeyfile; + const char* m_privkeyfile; + const char* m_pemPassword; // adongre: Added for Ticket #758 // Pass extra parameter for the password typedef void* (*gf_create_SslImpl)(ACE_SOCKET, const char*, const char*, @@ -47,16 +50,23 @@ class TcpSslConn : public TcpConn { void createSocket(ACE_SOCKET sock); public: - TcpSslConn() : TcpConn(), m_ssl(nullptr){}; - - TcpSslConn(const char* hostname, int32_t port, - uint32_t waitSeconds = DEFAULT_CONNECT_TIMEOUT, - int32_t maxBuffSizePool = 0) - : TcpConn(hostname, port, waitSeconds, maxBuffSizePool), m_ssl(nullptr){}; - - TcpSslConn(const char* ipaddr, uint32_t waitSeconds = DEFAULT_CONNECT_TIMEOUT, - int32_t maxBuffSizePool = 0) - : TcpConn(ipaddr, waitSeconds, maxBuffSizePool), m_ssl(nullptr){}; + TcpSslConn(const char* hostname, int32_t port, uint32_t waitSeconds, + int32_t maxBuffSizePool, const char* pubkeyfile, + const char* privkeyfile, const char* pemPassword) + : TcpConn(hostname, port, waitSeconds, maxBuffSizePool), + m_ssl(nullptr), + m_pubkeyfile(pubkeyfile), + m_privkeyfile(privkeyfile), + m_pemPassword(pemPassword){}; + + TcpSslConn(const char* ipaddr, uint32_t waitSeconds, int32_t maxBuffSizePool, + const char* pubkeyfile, const char* privkeyfile, + const char* pemPassword) + : TcpConn(ipaddr, waitSeconds, maxBuffSizePool), + m_ssl(nullptr), + m_pubkeyfile(pubkeyfile), + m_privkeyfile(privkeyfile), + m_pemPassword(pemPassword){}; // TODO: Watch out for virt dtor calling virt methods! http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrChunkedContext.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TcrChunkedContext.hpp b/src/cppcache/src/TcrChunkedContext.hpp index c07c22c..068bb09 100644 --- a/src/cppcache/src/TcrChunkedContext.hpp +++ b/src/cppcache/src/TcrChunkedContext.hpp @@ -53,7 +53,8 @@ class TcrChunkedResult { /** handle a chunk of response message from server */ virtual void handleChunk(const uint8_t* bytes, int32_t len, - uint8_t isLastChunkWithSecurity) = 0; + uint8_t isLastChunkWithSecurity, + const Cache* cache) = 0; public: inline TcrChunkedResult() @@ -75,13 +76,14 @@ class TcrChunkedResult { virtual void reset() = 0; void fireHandleChunk(const uint8_t* bytes, int32_t len, - uint8_t isLastChunkWithSecurity) { + uint8_t isLastChunkWithSecurity, const Cache* cache) { if (appDomainContext) { - appDomainContext->run([this, bytes, len, isLastChunkWithSecurity]() { - handleChunk(bytes, len, isLastChunkWithSecurity); - }); + appDomainContext->run( + [this, bytes, len, isLastChunkWithSecurity, &cache]() { + handleChunk(bytes, len, isLastChunkWithSecurity, cache); + }); } else { - handleChunk(bytes, len, isLastChunkWithSecurity); + handleChunk(bytes, len, isLastChunkWithSecurity, cache); } } @@ -135,15 +137,17 @@ class TcrChunkedContext { const uint8_t* m_bytes; const int32_t m_len; const uint8_t m_isLastChunkWithSecurity; + const Cache* m_cache; TcrChunkedResult* m_result; public: inline TcrChunkedContext(const uint8_t* bytes, int32_t len, TcrChunkedResult* result, - uint8_t isLastChunkWithSecurity) + uint8_t isLastChunkWithSecurity, const Cache* cache) : m_bytes(bytes), m_len(len), m_isLastChunkWithSecurity(isLastChunkWithSecurity), + m_cache(cache), m_result(result) {} inline ~TcrChunkedContext() { GF_SAFE_DELETE_ARRAY(m_bytes); } @@ -158,7 +162,8 @@ class TcrChunkedContext { m_result->finalize(inSameThread); } else if (!m_result->exceptionOccurred()) { try { - m_result->fireHandleChunk(m_bytes, m_len, m_isLastChunkWithSecurity); + m_result->fireHandleChunk(m_bytes, m_len, m_isLastChunkWithSecurity, + m_cache); } catch (Exception& ex) { LOGERROR("HandleChunk error message %s, name = %s", ex.getMessage(), ex.getName()); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrConnection.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TcrConnection.cpp b/src/cppcache/src/TcrConnection.cpp index f11db0c..dafaed2 100644 --- a/src/cppcache/src/TcrConnection.cpp +++ b/src/cppcache/src/TcrConnection.cpp @@ -62,6 +62,9 @@ bool TcrConnection::InitTcrConnection( m_creationTime = ACE_OS::gettimeofday(); connectionId = INITIAL_CONNECTION_ID; m_lastAccessed = ACE_OS::gettimeofday(); + const auto& distributedSystem = + m_poolDM->getConnectionManager().getCacheImpl()->getDistributedSystem(); + const auto& sysProp = distributedSystem.getSystemProperties(); LOGDEBUG( "Tcrconnection const isSecondary = %d and isClientNotification = %d, " @@ -77,8 +80,6 @@ bool TcrConnection::InitTcrConnection( GF_DEV_ASSERT(!isSecondary || isClientNotification); - DistributedSystemPtr dsys = DistributedSystem::getInstance(); - // Create TcpConn object which manages a socket connection with the endpoint. if (endpointObj && endpointObj->getPoolHADM()) { m_conn = createConnection( @@ -87,33 +88,34 @@ bool TcrConnection::InitTcrConnection( endpointObj->getPoolHADM()->getSocketBufferSize())); isPool = true; } else { - m_conn = createConnection(m_endpoint, connectTimeout, 0); + m_conn = createConnection(m_endpoint, connectTimeout, + sysProp.maxSocketBufferSize()); } GF_DEV_ASSERT(m_conn != nullptr); - DataOutput handShakeMsg; + auto handShakeMsg = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataOutput(); bool isNotificationChannel = false; // Send byte Acceptor.CLIENT_TO_SERVER = (byte) 100; // Send byte Acceptor.SERVER_TO_CLIENT = (byte) 101; if (isClientNotification) { isNotificationChannel = true; if (isSecondary) { - handShakeMsg.write(static_cast<int8_t>(SECONDARY_SERVER_TO_CLIENT)); + handShakeMsg->write(static_cast<int8_t>(SECONDARY_SERVER_TO_CLIENT)); } else { - handShakeMsg.write(static_cast<int8_t>(PRIMARY_SERVER_TO_CLIENT)); + handShakeMsg->write(static_cast<int8_t>(PRIMARY_SERVER_TO_CLIENT)); } } else { - handShakeMsg.write(static_cast<int8_t>(CLIENT_TO_SERVER)); + handShakeMsg->write(static_cast<int8_t>(CLIENT_TO_SERVER)); } // added for versioned client int8_t versionOrdinal = Version::getOrdinal(); - handShakeMsg.write(versionOrdinal); + handShakeMsg->write(versionOrdinal); LOGFINE("Client version ordinal is %d", versionOrdinal); - handShakeMsg.write(static_cast<int8_t>(REPLY_OK)); + handShakeMsg->write(static_cast<int8_t>(REPLY_OK)); // Send byte REPLY_OK = (byte)58; if (!isClientNotification) { @@ -122,9 +124,9 @@ bool TcrConnection::InitTcrConnection( } else { // add the local ports to message Set<uint16_t>::Iterator iter = ports.iterator(); - handShakeMsg.writeInt(static_cast<int32_t>(ports.size())); + handShakeMsg->writeInt(static_cast<int32_t>(ports.size())); while (iter.hasNext()) { - handShakeMsg.writeInt(static_cast<int32_t>(iter.next())); + handShakeMsg->writeInt(static_cast<int32_t>(iter.next())); } } @@ -134,21 +136,21 @@ bool TcrConnection::InitTcrConnection( // permissible value for bug #232 for now. // minus 10 sec because the GFE 5.7 gridDev branch adds a // 5 sec buffer which was causing an int overflow. - handShakeMsg.writeInt((int32_t)0x7fffffff - 10000); + handShakeMsg->writeInt((int32_t)0x7fffffff - 10000); } // Write header for byte FixedID since GFE 5.7 - handShakeMsg.write(static_cast<int8_t>(GeodeTypeIdsImpl::FixedIDByte)); + handShakeMsg->write(static_cast<int8_t>(GeodeTypeIdsImpl::FixedIDByte)); // Writing byte for ClientProxyMembershipID class id=38 as registered on the // java server. - handShakeMsg.write( + handShakeMsg->write( static_cast<int8_t>(GeodeTypeIdsImpl::ClientProxyMembershipId)); if (endpointObj->getPoolHADM()) { ClientProxyMembershipID* memId = endpointObj->getPoolHADM()->getMembershipId(); uint32_t memIdBufferLength; const char* memIdBuffer = memId->getDSMemberId(memIdBufferLength); - handShakeMsg.writeBytes((int8_t*)memIdBuffer, memIdBufferLength); + handShakeMsg->writeBytes((int8_t*)memIdBuffer, memIdBufferLength); } else { ACE_TCHAR hostName[256]; ACE_OS::hostname(hostName, sizeof(hostName) - 1); @@ -158,42 +160,37 @@ bool TcrConnection::InitTcrConnection( uint16_t hostPort = 0; // Add 3 durable Subcription properties to ClientProxyMembershipID - SystemProperties* sysProp = DistributedSystem::getSystemProperties(); - const char* durableId = - (sysProp != nullptr) ? sysProp->durableClientId() : nullptr; - const uint32_t durableTimeOut = - (sysProp != nullptr) ? sysProp->durableTimeout() : 0; + const char* durableId = sysProp.durableClientId(); + const uint32_t durableTimeOut = sysProp.durableTimeout(); // Write ClientProxyMembershipID serialized object. uint32_t memIdBufferLength; - ClientProxyMembershipID memId(hostName, hostAddr, hostPort, durableId, - durableTimeOut); - const char* memIdBuffer = memId.getDSMemberId(memIdBufferLength); - handShakeMsg.writeBytes((int8_t*)memIdBuffer, memIdBufferLength); + const auto memId = + m_connectionManager->getCacheImpl() + ->getClientProxyMembershipIDFactory() + .create(hostName, hostAddr, hostPort, durableId, durableTimeOut); + const auto memIdBuffer = memId->getDSMemberId(memIdBufferLength); + handShakeMsg->writeBytes((int8_t*)memIdBuffer, memIdBufferLength); } - handShakeMsg.writeInt((int32_t)1); + handShakeMsg->writeInt((int32_t)1); bool isDhOn = false; bool requireServerAuth = false; PropertiesPtr credentials; CacheableBytesPtr serverChallenge; - SystemProperties* tmpSystemProperties = - DistributedSystem::getSystemProperties(); - // Write overrides (just conflation for now) - handShakeMsg.write(getOverrides(tmpSystemProperties)); + handShakeMsg->write(getOverrides(&sysProp)); - bool tmpIsSecurityOn = tmpSystemProperties->isSecurityOn(); - isDhOn = tmpSystemProperties->isDhOn(); + bool tmpIsSecurityOn = sysProp.isSecurityOn(); + isDhOn = sysProp.isDhOn(); if (m_endpointObj) { - tmpIsSecurityOn = tmpSystemProperties->isSecurityOn() || - this->m_endpointObj->isMultiUserMode(); + tmpIsSecurityOn = + sysProp.isSecurityOn() || this->m_endpointObj->isMultiUserMode(); CacheableStringPtr dhalgo = - tmpSystemProperties->getSecurityProperties()->find( - "security-client-dhalgo"); + sysProp.getSecurityProperties()->find("security-client-dhalgo"); LOGDEBUG("TcrConnection this->m_endpointObj->isMultiUserMode() = %d ", this->m_endpointObj->isMultiUserMode()); @@ -205,7 +202,7 @@ bool TcrConnection::InitTcrConnection( LOGDEBUG( "TcrConnection algo name %s tmpIsSecurityOn = %d isDhOn = %d " "isNotificationChannel = %d ", - tmpSystemProperties->securityClientDhAlgo(), tmpIsSecurityOn, isDhOn, + sysProp.securityClientDhAlgo(), tmpIsSecurityOn, isDhOn, isNotificationChannel); bool doIneedToSendCreds = true; if (isNotificationChannel && m_endpointObj && @@ -216,48 +213,35 @@ bool TcrConnection::InitTcrConnection( } if (isNotificationChannel && !doIneedToSendCreds) { - handShakeMsg.write( + handShakeMsg->write( static_cast<uint8_t>(SECURITY_MULTIUSER_NOTIFICATIONCHANNEL)); } else if (isDhOn) { m_dh = new DiffieHellman(); - m_dh->initDhKeys(tmpSystemProperties->getSecurityProperties()); - handShakeMsg.write(static_cast<uint8_t>(SECURITY_CREDENTIALS_DHENCRYPT)); + m_dh->initDhKeys(sysProp.getSecurityProperties()); + handShakeMsg->write(static_cast<uint8_t>(SECURITY_CREDENTIALS_DHENCRYPT)); } else if (tmpIsSecurityOn) { - handShakeMsg.write(static_cast<uint8_t>(SECURITY_CREDENTIALS_NORMAL)); + handShakeMsg->write(static_cast<uint8_t>(SECURITY_CREDENTIALS_NORMAL)); } else { - handShakeMsg.write(static_cast<uint8_t>(SECURITY_CREDENTIALS_NONE)); + handShakeMsg->write(static_cast<uint8_t>(SECURITY_CREDENTIALS_NONE)); } if (tmpIsSecurityOn) { try { LOGFINER("TcrConnection: about to invoke authloader"); - PropertiesPtr tmpSecurityProperties = - tmpSystemProperties->getSecurityProperties(); + const auto& tmpSecurityProperties = sysProp.getSecurityProperties(); if (tmpSecurityProperties == nullptr) { LOGWARN("TcrConnection: security properties not found."); } - // AuthInitializePtr authInitialize = - // tmpSystemProperties->getAuthLoader(); - //:only for backward connection + // only for backward connection if (isClientNotification) { - AuthInitializePtr authInitialize = - DistributedSystem::m_impl->getAuthLoader(); - if (authInitialize != nullptr) { + if (const auto& authInitialize = + distributedSystem.m_impl->getAuthLoader()) { LOGFINER( "TcrConnection: acquired handle to authLoader, " "invoking getCredentials"); - /* adongre - * CID 28898: Copy into fixed size buffer (STRING_OVERFLOW) - * You might overrun the 100 byte fixed-size string "tmpEndpoint" by - * copying "this->m_endpoint" without checking the length. - * Note: This defect has an elevated risk because the source argument - * is a parameter of the current function. - */ - // char tmpEndpoint[100] = { '\0' } ; - // strcpy(tmpEndpoint, m_endpoint); - PropertiesPtr tmpAuthIniSecurityProperties = - authInitialize->getCredentials(tmpSecurityProperties, - /*tmpEndpoint*/ m_endpoint); + + const auto& tmpAuthIniSecurityProperties = + authInitialize->getCredentials(tmpSecurityProperties, m_endpoint); LOGFINER("TcrConnection: after getCredentials "); credentials = tmpAuthIniSecurityProperties; } @@ -267,20 +251,20 @@ bool TcrConnection::InitTcrConnection( CacheableStringPtr ksPath = tmpSecurityProperties->find("security-client-kspath"); requireServerAuth = (ksPath != nullptr && ksPath->length() > 0); - handShakeMsg.writeBoolean(requireServerAuth); + handShakeMsg->writeBoolean(requireServerAuth); LOGFINE( "HandShake: Server authentication using RSA signature %s required", requireServerAuth ? "is" : "not"); // Send the symmetric key algorithm name string - handShakeMsg.write(static_cast<int8_t>(GeodeTypeIds::CacheableString)); - handShakeMsg.writeASCII(tmpSystemProperties->securityClientDhAlgo()); + handShakeMsg->write(static_cast<int8_t>(GeodeTypeIds::CacheableString)); + handShakeMsg->writeASCII(sysProp.securityClientDhAlgo()); // Send the client's DH public key to the server // CacheableBytesPtr dhPubKey = DiffieHellman::getPublicKey(); CacheableBytesPtr dhPubKey = m_dh->getPublicKey(); LOGDEBUG("DH pubkey send len is %d", dhPubKey->length()); - dhPubKey->toData(handShakeMsg); + dhPubKey->toData(*handShakeMsg); if (requireServerAuth) { char serverChallengeBytes[64] = {0}; @@ -290,11 +274,11 @@ bool TcrConnection::InitTcrConnection( } serverChallenge = CacheableBytes::create( reinterpret_cast<const uint8_t*>(serverChallengeBytes), 64); - serverChallenge->toData(handShakeMsg); + serverChallenge->toData(*handShakeMsg); } } else { // if isDhOn if (isClientNotification) { //:only for backward connection - credentials->toData(handShakeMsg); + credentials->toData(*handShakeMsg); } } // else isDhOn } catch (const AuthenticationRequiredException&) { @@ -314,7 +298,7 @@ bool TcrConnection::InitTcrConnection( } uint32_t msgLengh; - char* data = (char*)handShakeMsg.getBuffer(&msgLengh); + char* data = (char*)handShakeMsg->getBuffer(&msgLengh); LOGFINE("Attempting handshake with endpoint %s for %s%s connection", endpoint, isClientNotification ? (isSecondary ? "secondary " : "primary ") : "", isClientNotification ? "subscription" : "client"); @@ -325,8 +309,7 @@ bool TcrConnection::InitTcrConnection( LOGDEBUG(" Handshake: Got Accept Code %d", (*acceptanceCode)[0]); /* adongre */ - if ((*acceptanceCode)[0] == REPLY_SSL_ENABLED && - !tmpSystemProperties->sslEnabled()) { + if ((*acceptanceCode)[0] == REPLY_SSL_ENABLED && !sysProp.sslEnabled()) { LOGERROR("SSL is enabled on server, enable SSL in client as well"); AuthenticationRequiredException ex( "SSL is enabled on server, enable SSL in client as well"); @@ -370,18 +353,18 @@ bool TcrConnection::InitTcrConnection( LOGDEBUG("Handshake: Got challengeSize %d", challengeBytes->length()); // encrypt the credentials and challenge bytes - DataOutput cleartext; + auto cleartext = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataOutput(); if (isClientNotification) { //:only for backward connection - credentials->toData(cleartext); + credentials->toData(*cleartext); } - challengeBytes->toData(cleartext); + challengeBytes->toData(*cleartext); CacheableBytesPtr ciphertext = - m_dh->encrypt(cleartext.getBuffer(), cleartext.getBufferLength()); + m_dh->encrypt(cleartext->getBuffer(), cleartext->getBufferLength()); - DataOutput sendCreds; - ciphertext->toData(sendCreds); + auto sendCreds = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataOutput(); + ciphertext->toData(*sendCreds); uint32_t credLen; - char* credData = (char*)sendCreds.getBuffer(&credLen); + char* credData = (char*)sendCreds->getBuffer(&credLen); // send the encrypted bytes and check the response error = sendData(credData, credLen, connectTimeout, false); @@ -418,9 +401,9 @@ bool TcrConnection::InitTcrConnection( m_hasServerQueue = NON_REDUNDANT_SERVER; } CacheableBytesPtr queueSizeMsg = readHandshakeData(4, connectTimeout); - DataInput dI(queueSizeMsg->value(), queueSizeMsg->length()); + auto dI = m_connectionManager->getCacheImpl()->getCache()->createDataInput(queueSizeMsg->value(), queueSizeMsg->length()); int32_t queueSize = 0; - dI.readInt(&queueSize); + dI->readInt(&queueSize); m_queueSize = queueSize > 0 ? queueSize : 0; m_endpointObj->setServerQueueStatus(m_hasServerQueue, m_queueSize); @@ -449,43 +432,49 @@ bool TcrConnection::InitTcrConnection( if (static_cast<int8_t>((*arrayLenHeader)[0]) == -2) { CacheableBytesPtr recvMsgLenBytes = readHandshakeData(2, connectTimeout); - DataInput dI2(recvMsgLenBytes->value(), recvMsgLenBytes->length()); + auto dI2 = m_connectionManager->getCacheImpl()->getCache()->createDataInput( + recvMsgLenBytes->value(), recvMsgLenBytes->length()); int16_t recvMsgLenShort = 0; - dI2.readInt(&recvMsgLenShort); + dI2->readInt(&recvMsgLenShort); recvMsgLen = recvMsgLenShort; } else if (static_cast<int8_t>((*arrayLenHeader)[0]) == -3) { CacheableBytesPtr recvMsgLenBytes = readHandshakeData(4, connectTimeout); - DataInput dI2(recvMsgLenBytes->value(), recvMsgLenBytes->length()); - dI2.readInt(&recvMsgLen); + auto dI2 = m_connectionManager->getCacheImpl()->getCache()->createDataInput( + recvMsgLenBytes->value(), recvMsgLenBytes->length()); + dI2->readInt(&recvMsgLen); } - CacheableBytesPtr recvMessage = - readHandshakeData(recvMsgLen, connectTimeout); + auto recvMessage = readHandshakeData(recvMsgLen, connectTimeout); // If the distributed member has not been set yet, set it. if (getEndpointObject()->getDistributedMemberID() == 0) { LOGDEBUG("Deserializing distributed member Id"); - DataInput diForClient(recvMessage->value(), recvMessage->length()); + auto diForClient = m_connectionManager->getCacheImpl()->getCache()->createDataInput( + recvMessage->value(), recvMessage->length()); ClientProxyMembershipIDPtr member; - diForClient.readObject(member); - uint16_t memId = CacheImpl::getMemberListForVersionStamp()->add( - (DSMemberForVersionStampPtr)member); + diForClient->readObject(member); + auto memId = m_poolDM->getConnectionManager() + .getCacheImpl() + ->getMemberListForVersionStamp() + ->add(member); getEndpointObject()->setDistributedMemberID(memId); LOGDEBUG("Deserialized distributed member Id %d", memId); } } CacheableBytesPtr recvMsgLenBytes = readHandshakeData(2, connectTimeout); - DataInput dI3(recvMsgLenBytes->value(), recvMsgLenBytes->length()); + auto dI3 = m_connectionManager->getCacheImpl()->getCache()->createDataInput( + recvMsgLenBytes->value(), recvMsgLenBytes->length()); uint16_t recvMsgLen2 = 0; - dI3.readInt(&recvMsgLen2); + dI3->readInt(&recvMsgLen2); CacheableBytesPtr recvMessage = readHandshakeData(recvMsgLen2, connectTimeout); if (!isClientNotification) { CacheableBytesPtr deltaEnabledMsg = readHandshakeData(1, connectTimeout); - DataInput di(deltaEnabledMsg->value(), 1); + auto di = m_connectionManager->getCacheImpl()->getCache()->createDataInput( + deltaEnabledMsg->value(), 1); bool isDeltaEnabledOnServer; - di.readBoolean(&isDeltaEnabledOnServer); + di->readBoolean(&isDeltaEnabledOnServer); ThinClientBaseDM::setDeltaEnabledOnServer(isDeltaEnabledOnServer); } @@ -579,8 +568,14 @@ Connector* TcrConnection::createConnection(const char* endpoint, uint32_t connectTimeout, int32_t maxBuffSizePool) { Connector* socket = nullptr; - if (DistributedSystem::getSystemProperties()->sslEnabled()) { - socket = new TcpSslConn(endpoint, connectTimeout, maxBuffSizePool); + auto& systemProperties = m_connectionManager->getCacheImpl() + ->getDistributedSystem() + .getSystemProperties(); + if (systemProperties.sslEnabled()) { + socket = new TcpSslConn(endpoint, connectTimeout, maxBuffSizePool, + systemProperties.sslKeystorePassword(), + systemProperties.sslTrustStore(), + systemProperties.sslKeyStore()); } else { socket = new TcpConn(endpoint, connectTimeout, maxBuffSizePool); } @@ -611,7 +606,11 @@ inline ConnErrType TcrConnection::receiveData(char* buffer, int32_t length, // if gfcpp property unit set then sendTimeoutSec will be in millisecond // otherwise it will be in second - if (DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis()) { + if (m_poolDM->getConnectionManager() + .getCacheImpl() + ->getDistributedSystem() + .getSystemProperties() + .readTimeoutUnitInMillis()) { LOGFINER("recieveData %d %d ", receiveTimeoutSec, notPublicApiWithTimeout); if (notPublicApiWithTimeout == TcrMessage::QUERY || notPublicApiWithTimeout == TcrMessage::QUERY_WITH_PARAMETERS || @@ -694,7 +693,11 @@ inline ConnErrType TcrConnection::sendData(uint32_t& timeSpent, bool isPublicApiTimeout = false; // if gfcpp property unit set then sendTimeoutSec will be in millisecond // otherwise it will be in second - if (DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis()) { + if (m_poolDM->getConnectionManager() + .getCacheImpl() + ->getDistributedSystem() + .getSystemProperties() + .readTimeoutUnitInMillis()) { LOGFINER("sendData %d %d", sendTimeoutSec, notPublicApiWithTimeout); if (notPublicApiWithTimeout == TcrMessage::QUERY || notPublicApiWithTimeout == TcrMessage::QUERY_WITH_PARAMETERS || @@ -950,9 +953,10 @@ char* TcrConnection::readMessage(size_t* recvLen, uint32_t receiveTimeoutSec, m_endpoint, Utils::convertBytesToString(msg_header, HEADER_LENGTH)->asChar()); - DataInput input(reinterpret_cast<uint8_t*>(msg_header), HEADER_LENGTH); - input.readInt(&msgType); - input.readInt(&msgLen); + auto input = m_connectionManager->getCacheImpl()->getCache()->createDataInput( + reinterpret_cast<uint8_t*>(msg_header), HEADER_LENGTH); + input->readInt(&msgType); + input->readInt(&msgLen); // check that message length is valid. if (!(msgLen > 0) && request == TcrMessage::GET_CLIENT_PR_METADATA) { char* fullMessage; @@ -1059,17 +1063,18 @@ void TcrConnection::readMessageChunked(TcrMessageReply& reply, m_endpoint, Utils::convertBytesToString(msg_header, HDR_LEN_12)->asChar()); - DataInput input(msg_header, HDR_LEN_12); + auto input = m_connectionManager->getCacheImpl()->getCache()->createDataInput( + msg_header, HDR_LEN_12); int32_t msgType; - input.readInt(&msgType); + input->readInt(&msgType); reply.setMessageType(msgType); int32_t txId; int32_t numOfParts; - input.readInt(&numOfParts); + input->readInt(&numOfParts); LOGDEBUG("TcrConnection::readMessageChunked numberof parts = %d ", numOfParts); - // input.advanceCursor(4); - input.readInt(&txId); + // input->advanceCursor(4); + input->readInt(&txId); reply.setTransId(txId); // bool isLastChunk = false; @@ -1125,13 +1130,14 @@ void TcrConnection::readMessageChunked(TcrMessageReply& reply, Utils::convertBytesToString((msg_header + HDR_LEN_12), HDR_LEN) ->asChar()); - DataInput inp((msg_header + HDR_LEN_12), HDR_LEN); + auto input = m_connectionManager->getCacheImpl()->getCache()->createDataInput( + msg_header + HDR_LEN_12, HDR_LEN); int32_t chunkLen; - inp.readInt(&chunkLen); + input->readInt(&chunkLen); // check that chunk length is valid. GF_DEV_ASSERT(chunkLen > 0); - // inp.readBoolean(&isLastChunk); - inp.read(&isLastChunk); + // input->readBoolean(&isLastChunk); + input->read(&isLastChunk); uint8_t* chunk_body; GF_NEW(chunk_body, uint8_t[chunkLen]); @@ -1172,17 +1178,19 @@ void TcrConnection::readMessageChunked(TcrMessageReply& reply, void TcrConnection::close() { // If this is a short lived grid client, don't bother with this close ack // message - if (DistributedSystem::getSystemProperties()->isGridClient()) { + if (m_poolDM->getConnectionManager() + .getCacheImpl() + ->getDistributedSystem() + .getSystemProperties() + .isGridClient()) { return; } - TcrMessage* closeMsg = TcrMessage::getCloseConnMessage(); + TcrMessage* closeMsg = TcrMessage::getCloseConnMessage( + m_poolDM->getConnectionManager().getCacheImpl()->getCache()); try { - // LOGINFO("TcrConnection::close DC = %d; netdown = %d endpoint %s", - // TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH, - // TcrConnectionManager::isNetDown, m_endpoint); if (!TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH && - !TcrConnectionManager::isNetDown) { + !m_connectionManager->isNetDown()) { send(closeMsg->getMsgData(), closeMsg->getMsgLength(), 2, false); } } catch (Exception& e) { @@ -1285,9 +1293,10 @@ CacheableBytesPtr TcrConnection::readHandshakeByteArray( // read a byte array uint32_t TcrConnection::readHandshakeArraySize(uint32_t connectTimeout) { CacheableBytesPtr codeBytes = readHandshakeData(1, connectTimeout); - DataInput codeDI(codeBytes->value(), codeBytes->length()); + auto codeDI = m_connectionManager->getCacheImpl()->getCache()->createDataInput( + codeBytes->value(), codeBytes->length()); uint8_t code = 0; - codeDI.read(&code); + codeDI->read(&code); uint32_t arraySize = 0; if (code == 0xFF) { return 0; @@ -1296,15 +1305,15 @@ uint32_t TcrConnection::readHandshakeArraySize(uint32_t connectTimeout) { if (tempLen > 252) { // 252 is java's ((byte)-4 && 0xFF) if (code == 0xFE) { CacheableBytesPtr lenBytes = readHandshakeData(2, connectTimeout); - DataInput lenDI(lenBytes->value(), lenBytes->length()); + auto lenDI = m_connectionManager->getCacheImpl()->getCache()->createDataInput(lenBytes->value(), lenBytes->length()); uint16_t val; - lenDI.readInt(&val); + lenDI->readInt(&val); tempLen = val; } else if (code == 0xFD) { CacheableBytesPtr lenBytes = readHandshakeData(4, connectTimeout); - DataInput lenDI(lenBytes->value(), lenBytes->length()); + auto lenDI = m_connectionManager->getCacheImpl()->getCache()->createDataInput(lenBytes->value(), lenBytes->length()); uint32_t val; - lenDI.readInt(&val); + lenDI->readInt(&val); tempLen = val; } else { GF_SAFE_DELETE_CON(m_conn); @@ -1393,9 +1402,9 @@ int32_t TcrConnection::readHandShakeInt(uint32_t connectTimeout) { } } - DataInput di(recvMessage, 4); + auto di = m_connectionManager->getCacheImpl()->getCache()->createDataInput(recvMessage, 4); int32_t val; - di.readInt(&val); + di->readInt(&val); GF_SAFE_DELETE_ARRAY(recvMessage); @@ -1431,8 +1440,8 @@ CacheableStringPtr TcrConnection::readHandshakeString(uint32_t connectTimeout) { case GF_STRING: { uint16_t shortLen = 0; CacheableBytesPtr lenBytes = readHandshakeData(2, connectTimeout); - DataInput lenDI(lenBytes->value(), lenBytes->length()); - lenDI.readInt(&shortLen); + auto lenDI = m_connectionManager->getCacheImpl()->getCache()->createDataInput(lenBytes->value(), lenBytes->length()); + lenDI->readInt(&shortLen); length = shortLen; break; } @@ -1512,7 +1521,7 @@ void TcrConnection::touch() { m_lastAccessed = ACE_OS::gettimeofday(); } ACE_Time_Value TcrConnection::getLastAccessed() { return m_lastAccessed; } -uint8_t TcrConnection::getOverrides(SystemProperties* props) { +uint8_t TcrConnection::getOverrides(const SystemProperties* props) { const char* conflate = props->conflateEvents(); uint8_t conflateByte = 0; if (conflate != nullptr) { @@ -1522,27 +1531,7 @@ uint8_t TcrConnection::getOverrides(SystemProperties* props) { conflateByte = 2; } } - /* - const char * removeUnresponsive = props->removeUnresponsiveClientOverride(); - uint8_t removeByte = 0; - if (removeUnresponsive != nullptr ) { - if ( ACE_OS::strcasecmp(removeUnresponsive, "true") == 0 ) { - removeByte = 1; - } else if ( ACE_OS::strcasecmp(removeUnresponsive, "false") == 0 ) { - removeByte = 2; - } - } - const char * notify = props->notifyBySubscriptionOverride(); - uint8_t notifyByte = 0; - if (notify != nullptr ) { - if ( ACE_OS::strcasecmp(notify, "true") == 0 ) { - notifyByte = 1; - } else if ( ACE_OS::strcasecmp(notify, "false") == 0 ) { - notifyByte = 2; - } - } - return (((notifyByte << 2) | removeByte) << 2) | conflateByte; - */ + return conflateByte; } http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrConnection.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TcrConnection.hpp b/src/cppcache/src/TcrConnection.hpp index 9e8d873..fc8f54f 100644 --- a/src/cppcache/src/TcrConnection.hpp +++ b/src/cppcache/src/TcrConnection.hpp @@ -79,6 +79,7 @@ enum ServerQueueStatus { class TcrEndpoint; class SystemProperties; class ThinClientPoolDM; +class TcrConnectionManager; class CPPCACHE_EXPORT TcrConnection { public: /** Create one connection, endpoint is in format of hostname:portno @@ -111,8 +112,10 @@ class CPPCACHE_EXPORT TcrConnection { bool isSecondary = false, uint32_t connectTimeout = DEFAULT_CONNECT_TIMEOUT); - TcrConnection(volatile const bool& isConnected) + TcrConnection(const TcrConnectionManager& connectionManager, + volatile const bool& isConnected) : connectionId(0), + m_connectionManager(&connectionManager), m_dh(nullptr), m_endpoint(nullptr), m_endpointObj(nullptr), @@ -279,6 +282,10 @@ class CPPCACHE_EXPORT TcrConnection { connectionId = id; } + const TcrConnectionManager& getConnectionManager() { + return *m_connectionManager; + } + CacheableBytesPtr encryptBytes(CacheableBytesPtr data) { if (m_dh != nullptr) { return m_dh->encrypt(data); @@ -297,6 +304,7 @@ class CPPCACHE_EXPORT TcrConnection { private: int64_t connectionId; + const TcrConnectionManager* m_connectionManager; DiffieHellman* m_dh; /** * To read Intantiator message(which meant for java client), here we are @@ -308,7 +316,7 @@ class CPPCACHE_EXPORT TcrConnection { * Packs the override settings bits into bytes - currently a single byte for * conflation, remove-unresponsive-client and notify-by-subscription. */ - uint8_t getOverrides(SystemProperties* props); + uint8_t getOverrides(const SystemProperties* props); /** * To read the from stream http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrConnectionManager.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TcrConnectionManager.cpp b/src/cppcache/src/TcrConnectionManager.cpp index 71d2347..714f479 100644 --- a/src/cppcache/src/TcrConnectionManager.cpp +++ b/src/cppcache/src/TcrConnectionManager.cpp @@ -39,7 +39,6 @@ namespace apache { namespace geode { namespace client { -volatile bool TcrConnectionManager::isNetDown = false; volatile bool TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH = false; const char *TcrConnectionManager::NC_Redundancy = "NC Redundancy"; @@ -59,7 +58,8 @@ TcrConnectionManager::TcrConnectionManager(CacheImpl *cache) m_notifyCleanupSemaList(false), m_redundancySema(0), m_redundancyTask(nullptr), - m_isDurable(false) { + m_isDurable(false), + m_isNetDown(false) { m_redundancyManager = new ThinClientRedundancyManager(this); } @@ -70,14 +70,14 @@ void TcrConnectionManager::init(bool isPool) { } else { return; } - SystemProperties *props = DistributedSystem::getSystemProperties(); - m_isDurable = strlen(props->durableClientId()) > 0; - int32_t pingInterval = (props->pingInterval() / 2); - if (!props->isGridClient() && !isPool) { + auto &props = m_cache->getDistributedSystem().getSystemProperties(); + m_isDurable = strlen(props.durableClientId()) > 0; + int32_t pingInterval = (props.pingInterval() / 2); + if (!props.isGridClient() && !isPool) { ACE_Event_Handler *connectionChecker = new ExpiryHandler_T<TcrConnectionManager>( this, &TcrConnectionManager::checkConnection); - m_pingTaskId = CacheImpl::expiryTaskManager->scheduleExpiryTask( + m_pingTaskId = m_cache->getExpiryTaskManager().scheduleExpiryTask( connectionChecker, 10, pingInterval, false); LOGFINE( "TcrConnectionManager::TcrConnectionManager Registered ping " @@ -105,9 +105,9 @@ void TcrConnectionManager::init(bool isPool) { ACE_Event_Handler *redundancyChecker = new ExpiryHandler_T<TcrConnectionManager>( this, &TcrConnectionManager::checkRedundancy); - int32_t redundancyMonitorInterval = props->redundancyMonitorInterval(); + int32_t redundancyMonitorInterval = props.redundancyMonitorInterval(); - m_servermonitorTaskId = CacheImpl::expiryTaskManager->scheduleExpiryTask( + m_servermonitorTaskId = m_cache->getExpiryTaskManager().scheduleExpiryTask( redundancyChecker, 1, redundancyMonitorInterval, false); LOGFINE( "TcrConnectionManager::TcrConnectionManager Registered server " @@ -125,7 +125,7 @@ void TcrConnectionManager::init(bool isPool) { m_redundancyManager->m_HAenabled = true; } - if (!props->isGridClient()) { + if (!props.isGridClient()) { startFailoverAndCleanupThreads(isPool); } } @@ -152,7 +152,7 @@ void TcrConnectionManager::startFailoverAndCleanupThreads(bool isPool) { void TcrConnectionManager::close() { LOGFINE("TcrConnectionManager is closing"); if (m_pingTaskId > 0) { - CacheImpl::expiryTaskManager->cancelTask(m_pingTaskId); + m_cache->getExpiryTaskManager().cancelTask(m_pingTaskId); } if (m_failoverTask != nullptr) { @@ -166,7 +166,7 @@ void TcrConnectionManager::close() { if (cacheAttributes != nullptr && (cacheAttributes->getRedundancyLevel() > 0 || m_isDurable)) { if (m_servermonitorTaskId > 0) { - CacheImpl::expiryTaskManager->cancelTask(m_servermonitorTaskId); + m_cache->getExpiryTaskManager().cancelTask(m_servermonitorTaskId); } if (m_redundancyTask != nullptr) { m_redundancyTask->stopNoblock(); @@ -344,7 +344,7 @@ int TcrConnectionManager::checkConnection(const ACE_Time_Value &, ACE_Recursive_Thread_Mutex>::iterator currItr = m_endpoints.begin(); while (currItr != m_endpoints.end()) { - if ((*currItr).int_id_->connected() && !isNetDown) { + if ((*currItr).int_id_->connected() && !m_isNetDown) { (*currItr).int_id_->pingServer(); } currItr++; @@ -362,7 +362,7 @@ int TcrConnectionManager::failover(volatile bool &isRunning) { LOGFINE("TcrConnectionManager: starting failover thread"); while (isRunning) { m_failoverSema.acquire(); - if (isRunning && !isNetDown) { + if (isRunning && !m_isNetDown) { try { ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_distMngrsLock); for (std::list<ThinClientBaseDM *>::iterator it = m_distMngrs.begin(); @@ -477,7 +477,7 @@ void TcrConnectionManager::removeHAEndpoints() { } void TcrConnectionManager::netDown() { - isNetDown = true; + m_isNetDown = true; // sleep for 15 seconds to allow ping and redundancy threads to pause. std::this_thread::sleep_for(std::chrono::seconds(15)); @@ -499,7 +499,7 @@ void TcrConnectionManager::netDown() { /* Need to do a get on unknown key after calling this Fn to restablish all * connection */ void TcrConnectionManager::revive() { - isNetDown = false; + m_isNetDown = false; // sleep for 15 seconds to allow redundancy thread to reestablish // connections. @@ -510,7 +510,7 @@ int TcrConnectionManager::redundancy(volatile bool &isRunning) { LOGFINE("Starting subscription maintain redundancy thread."); while (isRunning) { m_redundancySema.acquire(); - if (isRunning && !isNetDown) { + if (isRunning && !m_isNetDown) { m_redundancyManager->maintainRedundancyLevel(); while (m_redundancySema.tryacquire() != -1) { ; http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrConnectionManager.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TcrConnectionManager.hpp b/src/cppcache/src/TcrConnectionManager.hpp index c902286..bb9afbc 100644 --- a/src/cppcache/src/TcrConnectionManager.hpp +++ b/src/cppcache/src/TcrConnectionManager.hpp @@ -76,7 +76,6 @@ class CPPCACHE_EXPORT TcrConnectionManager { void netDown(); void revive(); void setClientCrashTEST() { TEST_DURABLE_CLIENT_CRASH = true; } - volatile static bool isNetDown; volatile static bool TEST_DURABLE_CLIENT_CRASH; inline ACE_Map_Manager<std::string, TcrEndpoint*, ACE_Recursive_Thread_Mutex>& @@ -106,7 +105,7 @@ class CPPCACHE_EXPORT TcrConnectionManager { bool isDurable() { return m_isDurable; }; bool haEnabled() { return m_redundancyManager->m_HAenabled; }; - CacheImpl* getCacheImpl() { return m_cache; }; + CacheImpl* getCacheImpl() const { return m_cache; }; GfErrType sendSyncRequestCq(TcrMessage& request, TcrMessageReply& reply, TcrHADistributionManager* theHADM); @@ -140,6 +139,8 @@ class CPPCACHE_EXPORT TcrConnectionManager { return m_redundancyManager->sendRequestToPrimary(request, reply); } + bool isNetDown() const { return m_isNetDown; } + private: CacheImpl* m_cache; volatile bool m_initGuard; @@ -175,6 +176,8 @@ class CPPCACHE_EXPORT TcrConnectionManager { ACE_Recursive_Thread_Mutex m_notificationLock; bool m_isDurable; + bool m_isNetDown; + ThinClientRedundancyManager* m_redundancyManager; int failover(volatile bool& isRunning);