http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrEndpoint.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TcrEndpoint.cpp b/src/cppcache/src/TcrEndpoint.cpp index 8e99010..ac480ea 100644 --- a/src/cppcache/src/TcrEndpoint.cpp +++ b/src/cppcache/src/TcrEndpoint.cpp @@ -42,15 +42,16 @@ This is replaced by the connect-timeout (times 3) system property for SR # 6525. */ const char* TcrEndpoint::NC_Notification = "NC Notification"; -TcrEndpoint::TcrEndpoint(const std::string& name, CacheImpl* cache, +TcrEndpoint::TcrEndpoint(const std::string& name, CacheImpl* cacheImpl, ACE_Semaphore& failoverSema, ACE_Semaphore& cleanupSema, ACE_Semaphore& redundancySema, ThinClientBaseDM* DM, bool isMultiUserMode) : m_needToConnectInLock(false), m_connectLockCond(m_connectLock), - m_maxConnections( - DistributedSystem::getSystemProperties()->javaConnectionPoolSize()), + m_maxConnections(cacheImpl->getDistributedSystem() + .getSystemProperties() + .javaConnectionPoolSize()), m_notifyConnection(0), m_notifyReceiver(0), m_numRegionListener(0), @@ -67,7 +68,7 @@ TcrEndpoint::TcrEndpoint(const std::string& name, CacheImpl* cache, m_numRegions(0), m_pingTimeouts(0), m_notifyCount(0), - m_cache(cache), + m_cacheImpl(cacheImpl), m_failoverSema(failoverSema), m_cleanupSema(cleanupSema), m_notificationCleanupSema(0), @@ -115,7 +116,9 @@ TcrEndpoint::~TcrEndpoint() { inline bool TcrEndpoint::needtoTakeConnectLock() { #ifdef __linux - if (DistributedSystem::getSystemProperties()->connectWaitTimeout() > 0) { + if (m_cacheImpl->getDistributedSystem() + .getSystemProperties() + .connectWaitTimeout() > 0) { return m_needToConnectInLock; // once pipe or other socket error will take // lock to connect. } @@ -130,9 +133,10 @@ GfErrType TcrEndpoint::createNewConnectionWL(TcrConnection*& newConn, bool isSecondary, uint32_t connectTimeout) { LOGFINE("TcrEndpoint::createNewConnectionWL"); - uint32_t connectWaitTimeout = - DistributedSystem::getSystemProperties()->connectWaitTimeout() * - 1000; // need to change + uint32_t connectWaitTimeout = m_cacheImpl->getDistributedSystem() + .getSystemProperties() + .connectWaitTimeout() * + 1000; // need to change ACE_Time_Value interval(0, connectWaitTimeout); ACE_Time_Value stopAt(ACE_OS::gettimeofday()); stopAt += interval; @@ -148,7 +152,8 @@ GfErrType TcrEndpoint::createNewConnectionWL(TcrConnection*& newConn, if (ret != -1) { // got lock try { LOGFINE("TcrEndpoint::createNewConnectionWL got lock"); - newConn = new TcrConnection(m_connected); + newConn = + new TcrConnection(m_cacheImpl->tcrConnectionManager(), m_connected); newConn->InitTcrConnection(this, m_name.c_str(), m_ports, isClientNotification, isSecondary, connectTimeout); @@ -204,7 +209,8 @@ GfErrType TcrEndpoint::createNewConnection( try { if (newConn == nullptr) { if (!needtoTakeConnectLock() || !appThreadRequest) { - newConn = new TcrConnection(m_connected); + newConn = new TcrConnection(m_cacheImpl->tcrConnectionManager(), + m_connected); bool authenticate = newConn->InitTcrConnection( this, m_name.c_str(), m_ports, isClientNotification, isSecondary, connectTimeout); @@ -231,6 +237,10 @@ GfErrType TcrEndpoint::createNewConnection( LOGFINE("Sending update notification message to endpoint %s", m_name.c_str()); TcrMessageUpdateClientNotification updateNotificationMsg( + newConn->getConnectionManager() + .getCacheImpl() + ->getCache() + ->createDataOutput(), static_cast<int32_t>(newConn->getPort())); newConn->send(updateNotificationMsg.getMsgData(), updateNotificationMsg.getMsgLength()); @@ -290,12 +300,12 @@ void TcrEndpoint::authenticateEndpoint(TcrConnection*& conn) { LOGDEBUG( "TcrEndpoint::authenticateEndpoint m_isAuthenticated = %d " "this->m_baseDM = %d", - m_isAuthenticated, this->m_baseDM); - if (!m_isAuthenticated && this->m_baseDM) { + m_isAuthenticated, m_baseDM); + if (!m_isAuthenticated && m_baseDM) { this->setConnected(); ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_endpointAuthenticationLock); GfErrType err = GF_NOERR; - PropertiesPtr creds = this->getCredentials(); + PropertiesPtr creds = getCredentials(); if (creds != nullptr) { LOGDEBUG("TcrEndpoint::authenticateEndpoint got creds from app = %d", @@ -304,7 +314,8 @@ void TcrEndpoint::authenticateEndpoint(TcrConnection*& conn) { LOGDEBUG("TcrEndpoint::authenticateEndpoint no creds from app "); } - TcrMessageUserCredential request(creds, this->m_baseDM); + TcrMessageUserCredential request( + m_cacheImpl->getCache()->createDataOutput(), creds, m_baseDM); LOGDEBUG("request is created"); TcrMessageReply reply(true, this->m_baseDM); @@ -339,27 +350,17 @@ void TcrEndpoint::authenticateEndpoint(TcrConnection*& conn) { } PropertiesPtr TcrEndpoint::getCredentials() { - PropertiesPtr tmpSecurityProperties = - DistributedSystem::getSystemProperties()->getSecurityProperties(); + const auto& distributedSystem = m_cacheImpl->getDistributedSystem(); + const auto& tmpSecurityProperties = + distributedSystem.getSystemProperties().getSecurityProperties(); - AuthInitializePtr authInitialize = DistributedSystem::m_impl->getAuthLoader(); - - if (authInitialize != nullptr) { + if (const auto& authInitialize = distributedSystem.m_impl->getAuthLoader()) { LOGFINER( "Acquired handle to AuthInitialize plugin, " "getting credentials for %s", m_name.c_str()); - /* adongre - * CID 28899: Copy into fixed size buffer (STRING_OVERFLOW) - * You might overrun the 100 byte fixed-size string "tmpEndpoint" by copying - * the return value of - * "stlp_std::basic_string<char, stlp_std::char_traits<char>, - * stlp_std::allocator<char> >::c_str() const" without checking the length. - */ - // char tmpEndpoint[100] = { '\0' } ; - // strcpy(tmpEndpoint, m_name.c_str()); - PropertiesPtr tmpAuthIniSecurityProperties = authInitialize->getCredentials( - tmpSecurityProperties, /*tmpEndpoint*/ m_name.c_str()); + const auto& tmpAuthIniSecurityProperties = + authInitialize->getCredentials(tmpSecurityProperties, m_name.c_str()); LOGFINER("Done getting credentials"); return tmpAuthIniSecurityProperties; } @@ -372,9 +373,10 @@ ServerQueueStatus TcrEndpoint::getFreshServerQueueStatus( TcrConnection* newConn; ServerQueueStatus status = NON_REDUNDANT_SERVER; - err = createNewConnection( - newConn, false, false, - DistributedSystem::getSystemProperties()->connectTimeout()); + err = createNewConnection(newConn, false, false, + m_cacheImpl->getDistributedSystem() + .getSystemProperties() + .connectTimeout()); if (err == GF_NOERR) { status = newConn->getServerQueueStatus(queueSize); @@ -438,10 +440,11 @@ GfErrType TcrEndpoint::registerDM(bool clientNotification, bool isSecondary, m_name.c_str()); for (int connNum = 0; connNum < maxConnections; ++connNum) { TcrConnection* newConn; - if ((err = createNewConnection( - newConn, false, false, - DistributedSystem::getSystemProperties()->connectTimeout(), 0, - m_connected)) != GF_NOERR) { + if ((err = createNewConnection(newConn, false, false, + m_cacheImpl->getDistributedSystem() + .getSystemProperties() + .connectTimeout(), + 0, m_connected)) != GF_NOERR) { m_connected = false; m_isActiveEndpoint = false; closeConnections(); @@ -471,10 +474,12 @@ GfErrType TcrEndpoint::registerDM(bool clientNotification, bool isSecondary, // setup notification channel for the first region ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock); if (m_numRegionListener == 0) { - if ((err = createNewConnection( - m_notifyConnection, true, isSecondary, - DistributedSystem::getSystemProperties()->connectTimeout() * 3, - 0)) != GF_NOERR) { + if ((err = createNewConnection(m_notifyConnection, true, isSecondary, + m_cacheImpl->getDistributedSystem() + .getSystemProperties() + .connectTimeout() * + 3, + 0)) != GF_NOERR) { m_connected = false; m_isActiveEndpoint = false; closeConnections(); @@ -545,7 +550,8 @@ void TcrEndpoint::pingServer(ThinClientPoolDM* poolDM) { } if (!m_msgSent && !m_pingSent) { - TcrMessagePing* pingMsg = TcrMessage::getPingMessage(); + TcrMessagePing* pingMsg = + TcrMessage::getPingMessage(m_cacheImpl->getCache()); TcrMessageReply reply(true, nullptr); LOGFINEST("Sending ping message to endpoint %s", m_name.c_str()); GfErrType error; @@ -584,19 +590,17 @@ void TcrEndpoint::pingServer(ThinClientPoolDM* poolDM) { } bool TcrEndpoint::checkDupAndAdd(EventIdPtr eventid) { - return m_cache->tcrConnectionManager().checkDupAndAdd(eventid); + return m_cacheImpl->tcrConnectionManager().checkDupAndAdd(eventid); } int TcrEndpoint::receiveNotification(volatile bool& isRunning) { - char* data = 0; - LOGFINE("Started subscription channel for endpoint %s", m_name.c_str()); while (isRunning) { TcrMessageReply* msg = nullptr; try { size_t dataLen; ConnErrType opErr = CONN_NOERR; - data = m_notifyConnection->receive(&dataLen, &opErr, 5); + auto data = m_notifyConnection->receive(&dataLen, &opErr, 5); if (opErr == CONN_IOERR) { // Endpoint is disconnected, this exception is expected @@ -616,11 +620,12 @@ int TcrEndpoint::receiveNotification(volatile bool& isRunning) { } if (data) { - msg = new TcrMessageReply(true, nullptr); + msg = new TcrMessageReply(true, m_baseDM); msg->initCqMap(); msg->setData(data, static_cast<int32_t>(dataLen), - this->getDistributedMemberID()); - data = nullptr; // memory is released by TcrMessage setData(). + this->getDistributedMemberID(), + *(m_cacheImpl->getSerializationRegistry()), + *(m_cacheImpl->getMemberListForVersionStamp())); handleNotificationStats(static_cast<int64_t>(dataLen)); LOGDEBUG("receive notification %d", msg->getMessageType()); @@ -644,7 +649,7 @@ int TcrEndpoint::receiveNotification(volatile bool& isRunning) { if (msg->getMessageType() != TcrMessage::CLIENT_MARKER) { const std::string& regionFullPath1 = msg->getRegionName(); RegionPtr region1; - m_cache->getRegion(regionFullPath1.c_str(), region1); + m_cacheImpl->getRegion(regionFullPath1.c_str(), region1); if (region1 != nullptr && !static_cast<ThinClientRegion*>(region1.get()) ->getDistMgr() @@ -670,7 +675,7 @@ int TcrEndpoint::receiveNotification(volatile bool& isRunning) { if (isMarker) { LOGFINE("Got a marker message on endpont %s", m_name.c_str()); - m_cache->processMarker(); + m_cacheImpl->processMarker(); processMarker(); GF_SAFE_DELETE(msg); } else { @@ -678,14 +683,14 @@ int TcrEndpoint::receiveNotification(volatile bool& isRunning) { { const std::string& regionFullPath = msg->getRegionName(); RegionPtr region; - m_cache->getRegion(regionFullPath.c_str(), region); + m_cacheImpl->getRegion(regionFullPath.c_str(), region); if (region != nullptr) { static_cast<ThinClientRegion*>(region.get()) ->receiveNotification(msg); } else { LOGWARN( "Notification for region %s that does not exist in " - "client cache.", + "client cacheImpl.", regionFullPath.c_str()); } } else { @@ -842,13 +847,16 @@ GfErrType TcrEndpoint::sendRequestConn(const TcrMessage& request, } size_t dataLen; LOGDEBUG("sendRequestConn: calling sendRequest"); - char* data = conn->sendRequest( - request.getMsgData(), request.getMsgLength(), &dataLen, - request.getTimeout(), reply.getTimeout(), request.getMessageType()); + auto data = conn->sendRequest(request.getMsgData(), request.getMsgLength(), + &dataLen, request.getTimeout(), + reply.getTimeout(), request.getMessageType()); reply.setMessageTypeRequest(type); - reply.setData(data, static_cast<int32_t>(dataLen), - this->getDistributedMemberID()); // memory is released by - // TcrMessage setData(). + reply.setData( + data, static_cast<int32_t>(dataLen), this->getDistributedMemberID(), + *(m_cacheImpl->getSerializationRegistry()), + *(m_cacheImpl + ->getMemberListForVersionStamp())); // memory is released by + // TcrMessage setData(). } // reset idle timeout of the connection for pool connection manager @@ -923,10 +931,11 @@ GfErrType TcrEndpoint::sendRequestWithRetry( LOGFINE( "Creating a new connection when connection-pool-size system " "property set to 0"); - if ((error = - createNewConnection(conn, false, false, - DistributedSystem::getSystemProperties() - ->connectTimeout())) != GF_NOERR) { + if ((error = createNewConnection(conn, false, false, + m_cacheImpl->getDistributedSystem() + .getSystemProperties() + .connectTimeout())) != + GF_NOERR) { epFailure = true; continue; } @@ -940,11 +949,12 @@ GfErrType TcrEndpoint::sendRequestWithRetry( createNewConn = false; if (!m_connected) { return GF_NOTCON; - } else if ((error = createNewConnection( - conn, false, false, - DistributedSystem::getSystemProperties() - ->connectTimeout(), - 0, true)) != GF_NOERR) { + } else if ((error = + createNewConnection(conn, false, false, + m_cacheImpl->getDistributedSystem() + .getSystemProperties() + .connectTimeout(), + 0, true)) != GF_NOERR) { epFailure = true; continue; } @@ -1227,8 +1237,9 @@ void TcrEndpoint::closeConnection(TcrConnection*& conn) { void TcrEndpoint::closeConnections() { m_opConnections.close(); m_ports.clear(); - m_maxConnections = - DistributedSystem::getSystemProperties()->javaConnectionPoolSize(); + m_maxConnections = m_cacheImpl->getDistributedSystem() + .getSystemProperties() + .javaConnectionPoolSize(); } /* @@ -1245,7 +1256,7 @@ void TcrEndpoint::closeNotification() { LOGFINEST("Closing subscription channel for endpoint %s", m_name.c_str()); m_notifyConnection->close(); m_notifyReceiver->stopNoblock(); - TcrConnectionManager& tccm = m_cache->tcrConnectionManager(); + TcrConnectionManager& tccm = m_cacheImpl->tcrConnectionManager(); tccm.addNotificationForDeletion(m_notifyReceiver, m_notifyConnection, m_notificationCleanupSema); m_notifyCount++; @@ -1328,11 +1339,11 @@ void TcrEndpoint::setServerQueueStatus(ServerQueueStatus queueStatus, bool TcrEndpoint::isQueueHosted() { return m_isQueueHosted; } void TcrEndpoint::processMarker() { - m_cache->tcrConnectionManager().processMarker(); + m_cacheImpl->tcrConnectionManager().processMarker(); } QueryServicePtr TcrEndpoint::getQueryService() { - return m_cache->getQueryService(true); + return m_cacheImpl->getQueryService(true); } void TcrEndpoint::sendRequestForChunkedResponse(const TcrMessage& request, TcrMessageReply& reply,
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrEndpoint.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TcrEndpoint.hpp b/src/cppcache/src/TcrEndpoint.hpp index 117535b..e259d40 100644 --- a/src/cppcache/src/TcrEndpoint.hpp +++ b/src/cppcache/src/TcrEndpoint.hpp @@ -44,9 +44,9 @@ class ThinClientPoolDM; class CPPCACHE_EXPORT TcrEndpoint { public: TcrEndpoint( - const std::string& name, CacheImpl* cache, ACE_Semaphore& failoverSema, - ACE_Semaphore& cleanupSema, ACE_Semaphore& redundancySema, - ThinClientBaseDM* dm = nullptr, + const std::string& name, CacheImpl* cacheImpl, + ACE_Semaphore& failoverSema, ACE_Semaphore& cleanupSema, + ACE_Semaphore& redundancySema, ThinClientBaseDM* dm = nullptr, bool isMultiUserMode = false); // TODO: need to look for endpoint case /* adongre @@ -206,6 +206,7 @@ class CPPCACHE_EXPORT TcrEndpoint { ACE_Recursive_Thread_Mutex m_notifyReceiverLock; virtual bool handleIOException(const std::string& message, TcrConnection*& conn, bool isBgThread = false); + CacheImpl* m_cacheImpl; private: int64_t m_uniqueId; @@ -232,7 +233,6 @@ class CPPCACHE_EXPORT TcrEndpoint { int m_notifyCount; - CacheImpl* m_cache; ACE_Semaphore& m_failoverSema; ACE_Semaphore& m_cleanupSema; ACE_Semaphore m_notificationCleanupSema; http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrMessage.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TcrMessage.cpp b/src/cppcache/src/TcrMessage.cpp index cfba9c7..10f4986 100644 --- a/src/cppcache/src/TcrMessage.cpp +++ b/src/cppcache/src/TcrMessage.cpp @@ -32,6 +32,7 @@ #include "TXState.hpp" #include "DiskStoreId.hpp" #include "DiskVersionTag.hpp" +#include "CacheRegionHelper.hpp" using namespace apache::geode::client; static const uint32_t REGULAR_EXPRESSION = @@ -42,49 +43,28 @@ uint32_t g_headerLen = 17; } // namespace // AtomicInc TcrMessage::m_transactionId = 0; -TcrMessagePing* TcrMessage::m_pingMsg = nullptr; -TcrMessage* TcrMessage::m_closeConnMsg = nullptr; -TcrMessage* TcrMessage::m_allEPDisconnected = nullptr; uint8_t* TcrMessage::m_keepalive = nullptr; const int TcrMessage::m_flag_empty = 0x01; const int TcrMessage::m_flag_concurrency_checks = 0x02; -bool TcrMessage::init() { - bool ret = true; - if (m_pingMsg == nullptr) { - try { - m_pingMsg = new TcrMessagePing(true); - m_closeConnMsg = new TcrMessageCloseConnection(true); - - } catch (std::exception& ex) { - ret = false; - LOGERROR(ex.what()); - } catch (Exception& ex) { - ret = false; - LOGERROR(ex.getMessage()); - } catch (...) { - ret = false; - LOGERROR("unknown exception"); - } - } - if (m_allEPDisconnected == nullptr) { - m_allEPDisconnected = new TcrMessageReply(true, nullptr); - } - return ret; +TcrMessagePing* TcrMessage::getPingMessage(Cache* cache) { + static auto pingMsg = new TcrMessagePing(cache->createDataOutput(), true); + return pingMsg; } -void TcrMessage::cleanup() { - GF_SAFE_DELETE(m_pingMsg); - GF_SAFE_DELETE(m_closeConnMsg); +TcrMessage* TcrMessage::getAllEPDisMess() { + static auto allEPDisconnected = new TcrMessageReply(true, nullptr); + return allEPDisconnected; } -/* we need a static method to generate ping */ -TcrMessagePing* TcrMessage::getPingMessage() { return m_pingMsg; } - -TcrMessage* TcrMessage::getAllEPDisMess() { return m_allEPDisconnected; } -TcrMessage* TcrMessage::getCloseConnMessage() { return m_closeConnMsg; } +TcrMessage* TcrMessage::getCloseConnMessage(Cache* cache) { + static auto closeConnMsg = + new TcrMessageCloseConnection(cache->createDataOutput(), true); + return closeConnMsg; +} void TcrMessage::setKeepAlive(bool keepalive) { + // TODO global if (TcrMessage::m_keepalive != nullptr) { *TcrMessage::m_keepalive = keepalive ? 1 : 0; } @@ -170,8 +150,9 @@ void TcrMessage::readPrMetaData(DataInput& input) { } } -VersionTagPtr TcrMessage::readVersionTagPart(DataInput& input, - uint16_t endpointMemId) { +VersionTagPtr TcrMessage::readVersionTagPart( + DataInput& input, uint16_t endpointMemId, + MemberListForVersionStamp& memberListForVersionStamp) { int8_t isObj; input.read(&isObj); VersionTagPtr versionTag; @@ -179,7 +160,7 @@ VersionTagPtr TcrMessage::readVersionTagPart(DataInput& input, if (isObj == GeodeTypeIds::NullObj) return versionTag; if (isObj == GeodeTypeIdsImpl::FixedIDByte) { - versionTag = std::make_shared<VersionTag>(); + versionTag = std::make_shared<VersionTag>(memberListForVersionStamp); int8_t fixedId; input.read(&fixedId); if (fixedId == GeodeTypeIdsImpl::VersionTag) { @@ -191,7 +172,7 @@ VersionTagPtr TcrMessage::readVersionTagPart(DataInput& input, int16_t fixedId; input.readInt(&fixedId); if (fixedId == GeodeTypeIdsImpl::DiskVersionTag) { - DiskVersionTag* disk = new DiskVersionTag(); + DiskVersionTag* disk = new DiskVersionTag(memberListForVersionStamp); disk->fromData(input); versionTag.reset(disk); return versionTag; @@ -200,14 +181,17 @@ VersionTagPtr TcrMessage::readVersionTagPart(DataInput& input, return versionTag; } -void TcrMessage::readVersionTag(DataInput& input, uint16_t endpointMemId) { +void TcrMessage::readVersionTag( + DataInput& input, uint16_t endpointMemId, + MemberListForVersionStamp& memberListForVersionStamp) { int32_t lenObj; int8_t isObj; input.readInt(&lenObj); input.read(&isObj); if (lenObj == 0) return; - auto versionTag = TcrMessage::readVersionTagPart(input, endpointMemId); + auto versionTag = TcrMessage::readVersionTagPart(input, endpointMemId, + memberListForVersionStamp); this->setVersionTag(versionTag); } @@ -386,9 +370,10 @@ void TcrMessage::readUniqueIDObjectPart(DataInput& input) { int64_t TcrMessage::getConnectionId(TcrConnection* conn) { if (m_connectionIDBytes != nullptr) { CacheableBytesPtr tmp = conn->decryptBytes(m_connectionIDBytes); - DataInput di(tmp->value(), tmp->length()); + auto di = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataInput( + tmp->value(), tmp->length()); int64_t connid; - di.readInt(&connid); + di->readInt(&connid); return connid; } else { LOGWARN("Returning 0 as internal connection ID msgtype = %d ", m_msgType); @@ -402,9 +387,10 @@ int64_t TcrMessage::getUniqueId(TcrConnection* conn) { CacheableBytesPtr tmp = conn->decryptBytes(encryptBytes); - DataInput di(tmp->value(), tmp->length()); + auto di = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataInput( + tmp->value(), tmp->length()); int64_t uniqueid; - di.readInt(&uniqueid); + di->readInt(&uniqueid); return uniqueid; } @@ -859,7 +845,8 @@ void TcrMessage::processChunk(const uint8_t* bytes, int32_t len, } else if (m_msgTypeRequest == TcrMessage::PUTALL || m_msgTypeRequest == TcrMessage::PUT_ALL_WITH_CALLBACK) { TcrChunkedContext* chunk = new TcrChunkedContext( - bytes, len, m_chunkedResult, isLastChunkAndisSecurityHeader); + bytes, len, m_chunkedResult, isLastChunkAndisSecurityHeader, + m_tcdm->getConnectionManager().getCacheImpl()->getCache()); m_chunkedResult->setEndpointMemId(endpointmemId); m_tcdm->queueChunk(chunk); if (bytes == nullptr) { @@ -882,7 +869,8 @@ void TcrMessage::processChunk(const uint8_t* bytes, int32_t len, if (m_chunkedResult != nullptr) { LOGDEBUG("tcrmessage in case22 "); TcrChunkedContext* chunk = new TcrChunkedContext( - bytes, len, m_chunkedResult, isLastChunkAndisSecurityHeader); + bytes, len, m_chunkedResult, isLastChunkAndisSecurityHeader, + m_tcdm->getConnectionManager().getCacheImpl()->getCache()); m_chunkedResult->setEndpointMemId(endpointmemId); m_tcdm->queueChunk(chunk); if (bytes == nullptr) { @@ -930,9 +918,10 @@ void TcrMessage::processChunk(const uint8_t* bytes, int32_t len, case TcrMessage::EXCEPTION: { if (bytes != nullptr) { DeleteArray<const uint8_t> delChunk(bytes); - DataInput input(bytes, len); - readExceptionPart(input, isLastChunkAndisSecurityHeader); - readSecureObjectPart(input, false, true, + auto input = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataInput( + bytes, len); + readExceptionPart(*input, isLastChunkAndisSecurityHeader); + readSecureObjectPart(*input, false, true, isLastChunkAndisSecurityHeader); } break; @@ -995,27 +984,31 @@ void TcrMessage::chunkSecurityHeader(int skipPart, const uint8_t* bytes, uint8_t isLastChunkAndSecurityHeader) { LOGDEBUG("TcrMessage::chunkSecurityHeader:: skipParts = %d", skipPart); if ((isLastChunkAndSecurityHeader & 0x3) == 0x3) { - DataInput di(bytes, len); - skipParts(di, skipPart); - readSecureObjectPart(di, false, true, isLastChunkAndSecurityHeader); + auto di = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataInput( + bytes, len); + skipParts(*di, skipPart); + readSecureObjectPart(*di, false, true, isLastChunkAndSecurityHeader); } } -void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len, - uint16_t endpointMemId) { - DataInput input((uint8_t*)bytearray, len); +void TcrMessage::handleByteArrayResponse( + const char* bytearray, int32_t len, uint16_t endpointMemId, + const SerializationRegistry& serializationRegistry, + MemberListForVersionStamp& memberListForVersionStamp) { + auto input = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataInput( + (uint8_t*)bytearray, len); // TODO:: this need to make sure that pool is there // if(m_tcdm == nullptr) // throw IllegalArgumentException("Pool is nullptr in TcrMessage"); - input.setPoolName(getPoolName()); - input.readInt(&m_msgType); + input->setPoolName(getPoolName()); + input->readInt(&m_msgType); int32_t msglen; - input.readInt(&msglen); + input->readInt(&msglen); int32_t numparts; - input.readInt(&numparts); - input.readInt(&m_txId); + input->readInt(&numparts); + input->readInt(&m_txId); int8_t earlyack; - input.read(&earlyack); + input->read(&earlyack); LOGDEBUG( "handleByteArrayResponse m_msgType = %d isSecurityOn = %d requesttype " "=%d", @@ -1032,46 +1025,46 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len, switch (m_msgType) { case TcrMessage::RESPONSE: { if (m_msgTypeRequest == TcrMessage::CONTAINS_KEY) { - readBooleanPartAsObject(input, &m_boolValue); + readBooleanPartAsObject(*input, &m_boolValue); } else if (m_msgTypeRequest == TcrMessage::USER_CREDENTIAL_MESSAGE) { - readUniqueIDObjectPart(input); + readUniqueIDObjectPart(*input); } else if (m_msgTypeRequest == TcrMessage::GET_PDX_ID_FOR_TYPE || m_msgTypeRequest == TcrMessage::GET_PDX_ID_FOR_ENUM) { // int will come in response uint32_t typeId; - readIntPart(input, &typeId); + readIntPart(*input, &typeId); m_value = CacheableInt32::create(typeId); } else if (m_msgTypeRequest == TcrMessage::GET_PDX_TYPE_BY_ID) { // PdxType will come in response - input.advanceCursor(5); // part header + input->advanceCursor(5); // part header m_value = - SerializationRegistry::deserialize(input, GeodeTypeIds::PdxType); + serializationRegistry.deserialize(*input, GeodeTypeIds::PdxType); } else if (m_msgTypeRequest == TcrMessage::GET_PDX_ENUM_BY_ID) { // PdxType will come in response - input.advanceCursor(5); // part header - m_value = SerializationRegistry::deserialize(input); + input->advanceCursor(5); // part header + m_value = serializationRegistry.deserialize(*input); } else if (m_msgTypeRequest == TcrMessage::GET_FUNCTION_ATTRIBUTES) { int32_t lenObj; - input.readInt(&lenObj); + input->readInt(&lenObj); int8_t isObj; - input.read(&isObj); + input->read(&isObj); int8_t hR; - input.read(&hR); + input->read(&hR); int8_t isHA; - input.read(&isHA); + input->read(&isHA); int8_t oFW; - input.read(&oFW); + input->read(&oFW); m_functionAttributes = new std::vector<int8_t>(); m_functionAttributes->push_back(hR); m_functionAttributes->push_back(isHA); m_functionAttributes->push_back(oFW); } else if (m_msgTypeRequest == TcrMessage::REQUEST) { int32_t receivednumparts = 2; - readObjectPart(input); + readObjectPart(*input); uint32_t flag = 0; - readIntPart(input, &flag); + readIntPart(*input, &flag); if (flag & 0x01) { - readCallbackObjectPart(input); + readCallbackObjectPart(*input); receivednumparts++; } @@ -1080,7 +1073,7 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len, } if (flag & 0x02) { - readVersionTag(input, endpointMemId); + readVersionTag(*input, endpointMemId, memberListForVersionStamp); receivednumparts++; } @@ -1088,37 +1081,37 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len, m_value = CacheableToken::tombstone(); } - if (numparts > receivednumparts) readPrMetaData(input); + if (numparts > receivednumparts) readPrMetaData(*input); } else if (m_decodeAll) { - readObjectPart(input); + readObjectPart(*input); if (numparts == 2) { if (m_isCallBackArguement) { - readCallbackObjectPart(input); + readCallbackObjectPart(*input); } else { int32_t lenObj; - input.readInt(&lenObj); + input->readInt(&lenObj); bool isObj; - input.readBoolean(&isObj); - input.read(&m_metaDataVersion); + input->readBoolean(&isObj); + input->read(&m_metaDataVersion); if (lenObj == 2) { - input.read(&m_serverGroupVersion); + input->read(&m_serverGroupVersion); LOGDEBUG( "Single-hop m_serverGroupVersion in message response is %d", m_serverGroupVersion); } } } else if (numparts > 2) { - skipParts(input, 1); + skipParts(*input, 1); int32_t lenObj; - input.readInt(&lenObj); + input->readInt(&lenObj); bool isObj; - input.readBoolean(&isObj); - input.read(&m_metaDataVersion); + input->readBoolean(&isObj); + input->read(&m_metaDataVersion); LOGFINE("Single-hop metadata version in message response is %d", m_metaDataVersion); if (lenObj == 2) { - input.read(&m_serverGroupVersion); + input->read(&m_serverGroupVersion); LOGDEBUG( "Single-hop m_serverGroupVersion in message response is %d", m_serverGroupVersion); @@ -1131,16 +1124,16 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len, case TcrMessage::EXCEPTION: { uint8_t lastChunk = static_cast<uint8_t>(numparts); lastChunk = (lastChunk << 5); - readExceptionPart(input, lastChunk); + readExceptionPart(*input, lastChunk); // if (isSecurityOn) - // readSecureObjectPart( input ); + // readSecureObjectPart( *input ); break; } case TcrMessage::INVALID: { // Read the string in the reply LOGWARN("Received invalid message type as reply from server"); - readObjectPart(input, true); + readObjectPart(*input, true); break; } @@ -1170,41 +1163,43 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len, case TcrMessage::REPLY: { switch (m_msgTypeRequest) { case TcrMessage::PUT: { - readPrMetaData(input); + readPrMetaData(*input); uint32_t flags = 0; - readIntPart(input, &flags); + readIntPart(*input, &flags); if (flags & 0x01) { // has old value - readOldValue(input); + readOldValue(*input); } if (flags & 0x04) { - readVersionTag(input, endpointMemId); + readVersionTag(*input, endpointMemId, memberListForVersionStamp); } break; } case TcrMessage::INVALIDATE: { uint32_t flags = 0; - readIntPart(input, &flags); - if (flags & 0x01) readVersionTag(input, endpointMemId); - readPrMetaData(input); + readIntPart(*input, &flags); + if (flags & 0x01) + readVersionTag(*input, endpointMemId, memberListForVersionStamp); + readPrMetaData(*input); break; } case TcrMessage::DESTROY: { uint32_t flags = 0; - readIntPart(input, &flags); - if (flags & 0x01) readVersionTag(input, endpointMemId); - readPrMetaData(input); + readIntPart(*input, &flags); + if (flags & 0x01) + readVersionTag(*input, endpointMemId, memberListForVersionStamp); + readPrMetaData(*input); // skip the Destroy65.java response entryNotFound int part so // that the readSecureObjectPart() call below gets the security part - // skipParts(input, 1); - readIntPart(input, &m_entryNotFound); + // skipParts(*input, 1); + readIntPart(*input, &m_entryNotFound); LOGDEBUG("Inside TcrMessage::REPLY::DESTROY m_entryNotFound = %d ", m_entryNotFound); break; } case TcrMessage::PING: default: { - readPrMetaData(input); + readPrMetaData(*input); break; } } @@ -1213,36 +1208,36 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len, case TcrMessage::LOCAL_INVALIDATE: case TcrMessage::LOCAL_DESTROY: { int32_t regionLen; - input.readInt(®ionLen); + input->readInt(®ionLen); int8_t isObj; - input.read(&isObj); + input->read(&isObj); char* regname = nullptr; regname = new char[regionLen + 1]; DeleteArray<char> delRegName(regname); - input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen); + input->readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen); regname[regionLen] = '\0'; m_regionName = regname; - readKeyPart(input); + readKeyPart(*input); - // skipParts(input, 1); // skip callbackarg parts - readCallbackObjectPart(input); - readVersionTag(input, endpointMemId); - readBooleanPartAsObject(input, &m_isInterestListPassed); - readBooleanPartAsObject(input, &m_hasCqsPart); + // skipParts(*input, 1); // skip callbackarg parts + readCallbackObjectPart(*input); + readVersionTag(*input, endpointMemId, memberListForVersionStamp); + readBooleanPartAsObject(*input, &m_isInterestListPassed); + readBooleanPartAsObject(*input, &m_hasCqsPart); if (m_hasCqsPart) { if (m_msgType == TcrMessage::LOCAL_INVALIDATE) { - readIntPart(input, &m_msgTypeForCq); + readIntPart(*input, &m_msgTypeForCq); } else { m_msgTypeForCq = static_cast<uint32_t>(m_msgType); } // LOGINFO("got cq local local_invalidate/local_destroy read // m_hasCqsPart"); - readCqsPart(input); + readCqsPart(*input); } // read eventid part - readEventIdPart(input, false); + readEventIdPart(*input, false); break; } @@ -1250,79 +1245,80 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len, case TcrMessage::LOCAL_CREATE: case TcrMessage::LOCAL_UPDATE: { int32_t regionLen; - input.readInt(®ionLen); + input->readInt(®ionLen); int8_t isObj; - input.read(&isObj); + input->read(&isObj); char* regname = nullptr; regname = new char[regionLen + 1]; DeleteArray<char> delRegName(regname); - input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen); + input->readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen); regname[regionLen] = '\0'; m_regionName = regname; - readKeyPart(input); + readKeyPart(*input); // Read delta flag bool isDelta = false; - readBooleanPartAsObject(input, &isDelta); + readBooleanPartAsObject(*input, &isDelta); if (isDelta) { - input.readInt(&m_deltaBytesLen); + input->readInt(&m_deltaBytesLen); int8_t isObj; - input.read(&isObj); + input->read(&isObj); m_deltaBytes = new uint8_t[m_deltaBytesLen]; - input.readBytesOnly(m_deltaBytes, m_deltaBytesLen); - m_delta = new DataInput(m_deltaBytes, m_deltaBytesLen); + input->readBytesOnly(m_deltaBytes, m_deltaBytesLen); + m_delta = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataInput( + m_deltaBytes, m_deltaBytesLen); } else { - readObjectPart(input); + readObjectPart(*input); } // skip callbackarg part - // skipParts(input, 1); - readCallbackObjectPart(input); - readVersionTag(input, endpointMemId); - readBooleanPartAsObject(input, &m_isInterestListPassed); - readBooleanPartAsObject(input, &m_hasCqsPart); + // skipParts(*input, 1); + readCallbackObjectPart(*input); + readVersionTag(*input, endpointMemId, memberListForVersionStamp); + readBooleanPartAsObject(*input, &m_isInterestListPassed); + readBooleanPartAsObject(*input, &m_hasCqsPart); if (m_hasCqsPart) { // LOGINFO("got cq local_create/local_create"); - readCqsPart(input); + readCqsPart(*input); m_msgTypeForCq = static_cast<uint32_t>(m_msgType); } // read eventid part - readEventIdPart(input, false); + readEventIdPart(*input, false); GF_SAFE_DELETE_ARRAY(regname); // COVERITY ---> 30299 Resource leak break; } case TcrMessage::CLIENT_MARKER: { // dont skip (non-existent) callbackarg part, just read eventid part - readEventIdPart(input, false); + readEventIdPart(*input, false); break; } case TcrMessage::LOCAL_DESTROY_REGION: case TcrMessage::CLEAR_REGION: { int32_t regionLen; - input.readInt(®ionLen); + input->readInt(®ionLen); int8_t isObj; - input.read(&isObj); + input->read(&isObj); char* regname = nullptr; regname = new char[regionLen + 1]; DeleteArray<char> delRegName(regname); - input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen); + input->readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen); regname[regionLen] = '\0'; m_regionName = regname; // skip callbackarg part - // skipParts(input, 1); - readCallbackObjectPart(input); - readBooleanPartAsObject(input, &m_hasCqsPart); + // skipParts(*input, 1); + readCallbackObjectPart(*input); + readBooleanPartAsObject(*input, &m_hasCqsPart); if (m_hasCqsPart) { // LOGINFO("got cq region_destroy read m_hasCqsPart"); - readCqsPart(input); + readCqsPart(*input); } // read eventid part - readEventIdPart(input, false); + readEventIdPart(*input, false); break; } @@ -1334,27 +1330,27 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len, m_metadata = new std::vector<std::vector<BucketServerLocationPtr> >(); for (int32_t i = 0; i < numparts; i++) { int32_t bits32; - input.readInt(&bits32); // partlen; + input->readInt(&bits32); // partlen; int8_t bits8; - input.read(&bits8); // isObj; - input.read(&bits8); // cacheable vector typeid + input->read(&bits8); // isObj; + input->read(&bits8); // cacheable vector typeid LOGDEBUG("Expected typeID %d, got %d", GeodeTypeIds::CacheableArrayList, bits8); - input.readArrayLen(&bits32); // array length + input->readArrayLen(&bits32); // array length LOGDEBUG("Array length = %d ", bits32); if (bits32 > 0) { std::vector<BucketServerLocationPtr> bucketServerLocations; for (int32_t index = 0; index < bits32; index++) { int8_t header; - input.read(&header); // ignore DS typeid - input.read(&header); // ignore CLASS typeid - input.read(&header); // ignore string typeid + input->read(&header); // ignore DS typeid + input->read(&header); // ignore CLASS typeid + input->read(&header); // ignore string typeid uint16_t classLen; - input.readInt(&classLen); // Read classLen - input.advanceCursor(classLen); + input->readInt(&classLen); // Read classLen + input->advanceCursor(classLen); auto location = std::make_shared<BucketServerLocation>(); - location->fromData(input); + location->fromData(*input); LOGFINE("location contains %d\t%s\t%d\t%d\t%s", location->getBucketId(), location->getServerName().c_str(), location->getPort(), location->getVersion(), @@ -1375,43 +1371,43 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len, case TcrMessage::RESPONSE_CLIENT_PARTITION_ATTRIBUTES: { int32_t bits32; - input.readInt(&bits32); // partlen; + input->readInt(&bits32); // partlen; int8_t bits8; - input.read(&bits8); // isObj; + input->read(&bits8); // isObj; - m_bucketCount = input.readNativeInt32(); // PART1 = bucketCount + m_bucketCount = input->readNativeInt32(); // PART1 = bucketCount - input.readInt(&bits32); // partlen; - input.read(&bits8); // isObj; + input->readInt(&bits32); // partlen; + input->read(&bits8); // isObj; if (bits32 > 0) { - input.readNativeString(m_colocatedWith); // PART2 = colocatedwith + input->readNativeString(m_colocatedWith); // PART2 = colocatedwith } if (numparts == 4) { - input.readInt(&bits32); // partlen; - input.read(&bits8); // isObj; + input->readInt(&bits32); // partlen; + input->read(&bits8); // isObj; if (bits32 > 0) { - input.readNativeString( + input->readNativeString( m_partitionResolverName); // PART3 = partitionresolvername } - input.readInt(&bits32); // partlen; - input.read(&bits8); // isObj; - input.read(&bits8); // cacheable CacheableHashSet typeid + input->readInt(&bits32); // partlen; + input->read(&bits8); // isObj; + input->read(&bits8); // cacheable CacheableHashSet typeid - input.readArrayLen(&bits32); // array length + input->readArrayLen(&bits32); // array length if (bits32 > 0) { m_fpaSet = new std::vector<FixedPartitionAttributesImplPtr>(); for (int32_t index = 0; index < bits32; index++) { int8_t header; - input.read(&header); // ignore DS typeid - input.read(&header); // ignore CLASS typeid - input.read(&header); // ignore string typeid + input->read(&header); // ignore DS typeid + input->read(&header); // ignore CLASS typeid + input->read(&header); // ignore string typeid uint16_t classLen; - input.readInt(&classLen); // Read classLen - input.advanceCursor(classLen); + input->readInt(&classLen); // Read classLen + input->advanceCursor(classLen); auto fpa = std::make_shared<FixedPartitionAttributesImpl>(); - fpa->fromData(input); // PART4 = set of FixedAttributes. + fpa->fromData(*input); // PART4 = set of FixedAttributes. LOGDEBUG("fpa contains %d\t%s\t%d\t%d", fpa->getNumBuckets(), fpa->getPartitionName().c_str(), fpa->isPrimary(), fpa->getStartingBucketID()); @@ -1424,38 +1420,38 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len, case TcrMessage::TOMBSTONE_OPERATION: { uint32_t tombstoneOpType; int32_t regionLen; - input.readInt(®ionLen); + input->readInt(®ionLen); int8_t isObj; - input.read(&isObj); + input->read(&isObj); char* regname = nullptr; regname = new char[regionLen + 1]; DeleteArray<char> delRegName(regname); - input.readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen); + input->readBytesOnly(reinterpret_cast<int8_t*>(regname), regionLen); regname[regionLen] = '\0'; m_regionName = regname; - readIntPart(input, &tombstoneOpType); // partlen; + readIntPart(*input, &tombstoneOpType); // partlen; int32_t len; - input.readInt(&len); - input.read(&isObj); + input->readInt(&len); + input->read(&isObj); if (tombstoneOpType == 0) { if (m_tombstoneVersions == nullptr) { m_tombstoneVersions = CacheableHashMap::create(); } - readHashMapForGCVersions(input, m_tombstoneVersions); + readHashMapForGCVersions(*input, m_tombstoneVersions); } else if (tombstoneOpType == 1) { if (m_tombstoneKeys == nullptr) { m_tombstoneKeys = CacheableHashSet::create(); } - // input.readObject(m_tombstoneKeys); - readHashSetForGCVersions(input, m_tombstoneKeys); + // input->readObject(m_tombstoneKeys); + readHashSetForGCVersions(*input, m_tombstoneKeys); } else { LOGERROR("Failed to read the tombstone versions"); break; } // readEventId Part - readEventIdPart(input, false); + readEventIdPart(*input, false); break; } case TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES_ERROR: { @@ -1483,12 +1479,14 @@ void TcrMessage::handleByteArrayResponse(const char* bytearray, int32_t len, throw MessageException("handleByteArrayResponse: unknown message type"); } LOGDEBUG("handleByteArrayResponse earlyack = %d ", earlyack); - if (earlyack & 0x2) readSecureObjectPart(input); + if (earlyack & 0x2) readSecureObjectPart(*input); } TcrMessageDestroyRegion::TcrMessageDestroyRegion( - const Region* region, const UserDataPtr& aCallbackArgument, - int messageResponsetimeout, ThinClientBaseDM* connectionDM) { + std::unique_ptr<DataOutput> dataOutput, const Region* region, + const UserDataPtr& aCallbackArgument, int messageResponsetimeout, + ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::DESTROY_REGION; m_tcdm = connectionDM; m_regionName = @@ -1519,8 +1517,10 @@ TcrMessageDestroyRegion::TcrMessageDestroyRegion( } TcrMessageClearRegion::TcrMessageClearRegion( - const Region* region, const UserDataPtr& aCallbackArgument, - int messageResponsetimeout, ThinClientBaseDM* connectionDM) { + std::unique_ptr<DataOutput> dataOutput, const Region* region, + const UserDataPtr& aCallbackArgument, int messageResponsetimeout, + ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::CLEAR_REGION; m_tcdm = connectionDM; m_regionName = @@ -1553,10 +1553,11 @@ TcrMessageClearRegion::TcrMessageClearRegion( writeMessageLength(); } -TcrMessageQuery::TcrMessageQuery(const std::string& regionName, +TcrMessageQuery::TcrMessageQuery(std::unique_ptr<DataOutput> dataOutput, + const std::string& regionName, int messageResponsetimeout, ThinClientBaseDM* connectionDM) { - m_request = new DataOutput; + m_request = std::move(dataOutput); m_msgType = TcrMessage::QUERY; m_tcdm = connectionDM; m_regionName = regionName; // this is querystri; @@ -1577,9 +1578,11 @@ TcrMessageQuery::TcrMessageQuery(const std::string& regionName, writeMessageLength(); } -TcrMessageStopCQ::TcrMessageStopCQ(const std::string& regionName, +TcrMessageStopCQ::TcrMessageStopCQ(std::unique_ptr<DataOutput> dataOutput, + const std::string& regionName, int messageResponsetimeout, ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::STOPCQ_MSG_TYPE; m_tcdm = connectionDM; m_regionName = regionName; // this is querystring @@ -1603,9 +1606,11 @@ TcrMessageStopCQ::TcrMessageStopCQ(const std::string& regionName, writeMessageLength(); } -TcrMessageCloseCQ::TcrMessageCloseCQ(const std::string& regionName, +TcrMessageCloseCQ::TcrMessageCloseCQ(std::unique_ptr<DataOutput> dataOutput, + const std::string& regionName, int messageResponsetimeout, ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::CLOSECQ_MSG_TYPE; m_tcdm = connectionDM; m_regionName = regionName; // this is querystring @@ -1627,9 +1632,10 @@ TcrMessageCloseCQ::TcrMessageCloseCQ(const std::string& regionName, } TcrMessageQueryWithParameters::TcrMessageQueryWithParameters( - const std::string& regionName, const UserDataPtr& aCallbackArgument, - CacheableVectorPtr paramList, int messageResponsetimeout, - ThinClientBaseDM* connectionDM) { + std::unique_ptr<DataOutput> dataOutput, const std::string& regionName, + const UserDataPtr& aCallbackArgument, CacheableVectorPtr paramList, + int messageResponsetimeout, ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::QUERY_WITH_PARAMETERS; m_tcdm = connectionDM; m_regionName = regionName; @@ -1664,9 +1670,10 @@ TcrMessageQueryWithParameters::TcrMessageQueryWithParameters( } TcrMessageContainsKey::TcrMessageContainsKey( - const Region* region, const CacheableKeyPtr& key, - const UserDataPtr& aCallbackArgument, bool isContainsKey, - ThinClientBaseDM* connectionDM) { + std::unique_ptr<DataOutput> dataOutput, const Region* region, + const CacheableKeyPtr& key, const UserDataPtr& aCallbackArgument, + bool isContainsKey, ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::CONTAINS_KEY; m_tcdm = connectionDM; m_regionName = @@ -1682,7 +1689,6 @@ TcrMessageContainsKey::TcrMessageContainsKey( numOfParts++; if (key == nullptr) { - delete m_request; throw IllegalArgumentException( "key passed to the constructor can't be nullptr"); } @@ -1699,7 +1705,8 @@ TcrMessageContainsKey::TcrMessageContainsKey( } TcrMessageGetDurableCqs::TcrMessageGetDurableCqs( - ThinClientBaseDM* connectionDM) { + std::unique_ptr<DataOutput> dataOutput, ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::GETDURABLECQS_MSG_TYPE; m_tcdm = connectionDM; m_timeout = DEFAULT_TIMEOUT_SECONDS; @@ -1711,10 +1718,12 @@ TcrMessageGetDurableCqs::TcrMessageGetDurableCqs( writeMessageLength(); } -TcrMessageRequest::TcrMessageRequest(const Region* region, +TcrMessageRequest::TcrMessageRequest(std::unique_ptr<DataOutput> dataOutput, + const Region* region, const CacheableKeyPtr& key, const UserDataPtr& aCallbackArgument, ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::REQUEST; m_tcdm = connectionDM; m_key = key; @@ -1731,7 +1740,6 @@ TcrMessageRequest::TcrMessageRequest(const Region* region, numOfParts++; if (key == nullptr) { - delete m_request; throw IllegalArgumentException( "key passed to the constructor can't be nullptr"); } @@ -1748,10 +1756,11 @@ TcrMessageRequest::TcrMessageRequest(const Region* region, writeMessageLength(); } -TcrMessageInvalidate::TcrMessageInvalidate(const Region* region, - const CacheableKeyPtr& key, - const UserDataPtr& aCallbackArgument, - ThinClientBaseDM* connectionDM) { +TcrMessageInvalidate::TcrMessageInvalidate( + std::unique_ptr<DataOutput> dataOutput, const Region* region, + const CacheableKeyPtr& key, const UserDataPtr& aCallbackArgument, + ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::INVALIDATE; m_tcdm = connectionDM; m_key = key; @@ -1768,7 +1777,6 @@ TcrMessageInvalidate::TcrMessageInvalidate(const Region* region, numOfParts++; if (key == nullptr) { - delete m_request; throw IllegalArgumentException( "key passed to the constructor can't be nullptr"); } @@ -1785,11 +1793,13 @@ TcrMessageInvalidate::TcrMessageInvalidate(const Region* region, writeMessageLength(); } -TcrMessageDestroy::TcrMessageDestroy(const Region* region, +TcrMessageDestroy::TcrMessageDestroy(std::unique_ptr<DataOutput> dataOutput, + const Region* region, const CacheableKeyPtr& key, const CacheablePtr& value, const UserDataPtr& aCallbackArgument, ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::DESTROY; m_tcdm = connectionDM; m_key = key; @@ -1805,7 +1815,6 @@ TcrMessageDestroy::TcrMessageDestroy(const Region* region, numOfParts++; if (key == nullptr) { - delete m_request; throw IllegalArgumentException( "key passed to the constructor can't be nullptr"); } @@ -1839,12 +1848,14 @@ TcrMessageDestroy::TcrMessageDestroy(const Region* region, } } -TcrMessagePut::TcrMessagePut(const Region* region, const CacheableKeyPtr& key, +TcrMessagePut::TcrMessagePut(std::unique_ptr<DataOutput> dataOutput, + const Region* region, const CacheableKeyPtr& key, const CacheablePtr& value, const UserDataPtr& aCallbackArgument, bool isDelta, ThinClientBaseDM* connectionDM, bool isMetaRegion, bool fullValueAfterDeltaFail, const char* regionName) { + m_request = std::move(dataOutput); // m_securityHeaderLength = 0; m_isMetaRegion = isMetaRegion; m_msgType = TcrMessage::PUT; @@ -1864,7 +1875,6 @@ TcrMessagePut::TcrMessagePut(const Region* region, const CacheableKeyPtr& key, numOfParts++; if (key == nullptr) { - delete m_request; throw IllegalArgumentException( "key passed to the constructor can't be nullptr"); } @@ -1893,10 +1903,11 @@ TcrMessageReply::TcrMessageReply(bool decodeAll, if (connectionDM != nullptr) isSecurityOn = connectionDM->isSecurityOn(); } -TcrMessagePing::TcrMessagePing(bool decodeAll) { +TcrMessagePing::TcrMessagePing(std::unique_ptr<DataOutput> dataOutput, + bool decodeAll) { m_msgType = TcrMessage::PING; m_decodeAll = decodeAll; - + m_request = std::move(dataOutput); m_request->writeInt(m_msgType); m_request->writeInt( (int32_t)0); // 17 is fixed message len ... PING only has a header. @@ -1911,10 +1922,11 @@ TcrMessagePing::TcrMessagePing(bool decodeAll) { m_txId = 0; } -TcrMessageCloseConnection::TcrMessageCloseConnection(bool decodeAll) { +TcrMessageCloseConnection::TcrMessageCloseConnection( + std::unique_ptr<DataOutput> dataOutput, bool decodeAll) { m_msgType = TcrMessage::CLOSE_CONNECTION; m_decodeAll = decodeAll; - + m_request = std::move(dataOutput); m_request->writeInt(m_msgType); m_request->writeInt((int32_t)6); m_request->writeInt((int32_t)1); // Number of parts. @@ -1929,15 +1941,19 @@ TcrMessageCloseConnection::TcrMessageCloseConnection(bool decodeAll) { m_request->write(static_cast<int8_t>(0)); // keepalive is '0'. } -TcrMessageClientMarker::TcrMessageClientMarker(bool decodeAll) { +TcrMessageClientMarker::TcrMessageClientMarker( + std::unique_ptr<DataOutput> dataOutput, bool decodeAll) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::CLIENT_MARKER; m_decodeAll = decodeAll; } TcrMessageRegisterInterestList::TcrMessageRegisterInterestList( - const Region* region, const VectorOfCacheableKey& keys, bool isDurable, - bool isCachingEnabled, bool receiveValues, - InterestResultPolicy interestPolicy, ThinClientBaseDM* connectionDM) { + std::unique_ptr<DataOutput> dataOutput, const Region* region, + const VectorOfCacheableKey& keys, bool isDurable, bool isCachingEnabled, + bool receiveValues, InterestResultPolicy interestPolicy, + ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::REGISTER_INTEREST_LIST; m_tcdm = connectionDM; m_keyList = &keys; @@ -1964,7 +1980,6 @@ TcrMessageRegisterInterestList::TcrMessageRegisterInterestList( for (uint32_t i = 0; i < numInItrestList; i++) { if (keys[i] == nullptr) { - delete m_request; throw IllegalArgumentException( "keys in the interest list cannot be nullptr"); } @@ -1987,9 +2002,11 @@ TcrMessageRegisterInterestList::TcrMessageRegisterInterestList( } TcrMessageUnregisterInterestList::TcrMessageUnregisterInterestList( - const Region* region, const VectorOfCacheableKey& keys, bool isDurable, - bool isCachingEnabled, bool receiveValues, - InterestResultPolicy interestPolicy, ThinClientBaseDM* connectionDM) { + std::unique_ptr<DataOutput> dataOutput, const Region* region, + const VectorOfCacheableKey& keys, bool isDurable, bool isCachingEnabled, + bool receiveValues, InterestResultPolicy interestPolicy, + ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::UNREGISTER_INTEREST_LIST; m_tcdm = connectionDM; m_keyList = &keys; @@ -2014,7 +2031,6 @@ TcrMessageUnregisterInterestList::TcrMessageUnregisterInterestList( for (uint32_t i = 0; i < numInItrestList; i++) { if (keys[i] == nullptr) { - delete m_request; throw IllegalArgumentException( "keys in the interest list cannot be nullptr"); } @@ -2026,9 +2042,11 @@ TcrMessageUnregisterInterestList::TcrMessageUnregisterInterestList( } TcrMessageCreateRegion::TcrMessageCreateRegion( - const std::string& str1, const std::string& str2, - InterestResultPolicy interestPolicy, bool isDurable, bool isCachingEnabled, - bool receiveValues, ThinClientBaseDM* connectionDM) { + std::unique_ptr<DataOutput> dataOutput, const std::string& str1, + const std::string& str2, InterestResultPolicy interestPolicy, + bool isDurable, bool isCachingEnabled, bool receiveValues, + ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::CREATE_REGION; m_tcdm = connectionDM; m_isDurable = isDurable; @@ -2043,9 +2061,11 @@ TcrMessageCreateRegion::TcrMessageCreateRegion( } TcrMessageRegisterInterest::TcrMessageRegisterInterest( - const std::string& str1, const std::string& str2, - InterestResultPolicy interestPolicy, bool isDurable, bool isCachingEnabled, - bool receiveValues, ThinClientBaseDM* connectionDM) { + std::unique_ptr<DataOutput> dataOutput, const std::string& str1, + const std::string& str2, InterestResultPolicy interestPolicy, + bool isDurable, bool isCachingEnabled, bool receiveValues, + ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::REGISTER_INTEREST; m_tcdm = connectionDM; m_isDurable = isDurable; @@ -2078,9 +2098,11 @@ TcrMessageRegisterInterest::TcrMessageRegisterInterest( } TcrMessageUnregisterInterest::TcrMessageUnregisterInterest( - const std::string& str1, const std::string& str2, - InterestResultPolicy interestPolicy, bool isDurable, bool isCachingEnabled, - bool receiveValues, ThinClientBaseDM* connectionDM) { + std::unique_ptr<DataOutput> dataOutput, const std::string& str1, + const std::string& str2, InterestResultPolicy interestPolicy, + bool isDurable, bool isCachingEnabled, bool receiveValues, + ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::UNREGISTER_INTEREST; m_tcdm = connectionDM; m_isDurable = isDurable; @@ -2100,8 +2122,9 @@ TcrMessageUnregisterInterest::TcrMessageUnregisterInterest( m_interestPolicy = interestPolicy.ordinal; } -TcrMessageTxSynchronization::TcrMessageTxSynchronization(int ordinal, int txid, - int status) { +TcrMessageTxSynchronization::TcrMessageTxSynchronization( + std::unique_ptr<DataOutput> dataOutput, int ordinal, int txid, int status) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::TX_SYNCHRONIZATION; writeHeader(m_msgType, ordinal == 1 ? 3 : 2); @@ -2114,7 +2137,9 @@ TcrMessageTxSynchronization::TcrMessageTxSynchronization(int ordinal, int txid, writeMessageLength(); } -TcrMessageClientReady::TcrMessageClientReady() { +TcrMessageClientReady::TcrMessageClientReady( + std::unique_ptr<DataOutput> dataOutput) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::CLIENT_READY; writeHeader(m_msgType, 1); @@ -2123,8 +2148,9 @@ TcrMessageClientReady::TcrMessageClientReady() { writeMessageLength(); } -TcrMessageCommit::TcrMessageCommit() { +TcrMessageCommit::TcrMessageCommit(std::unique_ptr<DataOutput> dataOutput) { m_msgType = TcrMessage::COMMIT; + m_request = std::move(dataOutput); writeHeader(m_msgType, 1); // the server expects at least 1 part, so writing a dummy @@ -2132,8 +2158,9 @@ TcrMessageCommit::TcrMessageCommit() { writeMessageLength(); } -TcrMessageRollback::TcrMessageRollback() { +TcrMessageRollback::TcrMessageRollback(std::unique_ptr<DataOutput> dataOutput) { m_msgType = TcrMessage::ROLLBACK; + m_request = std::move(dataOutput); writeHeader(m_msgType, 1); // the server expects at least 1 part, so writing a dummy @@ -2141,8 +2168,10 @@ TcrMessageRollback::TcrMessageRollback() { writeMessageLength(); } -TcrMessageTxFailover::TcrMessageTxFailover() { +TcrMessageTxFailover::TcrMessageTxFailover( + std::unique_ptr<DataOutput> dataOutput) { m_msgType = TcrMessage::TX_FAILOVER; + m_request = std::move(dataOutput); writeHeader(m_msgType, 1); // the server expects at least 1 part, so writing a dummy @@ -2151,8 +2180,10 @@ TcrMessageTxFailover::TcrMessageTxFailover() { } // constructor for MAKE_PRIMARY message. -TcrMessageMakePrimary::TcrMessageMakePrimary(bool processedMarker) { +TcrMessageMakePrimary::TcrMessageMakePrimary( + std::unique_ptr<DataOutput> dataOutput, bool processedMarker) { m_msgType = TcrMessage::MAKE_PRIMARY; + m_request = std::move(dataOutput); writeHeader(m_msgType, 1); writeBytePart(processedMarker ? 1 : 0); // boolean processedMarker @@ -2161,8 +2192,10 @@ TcrMessageMakePrimary::TcrMessageMakePrimary(bool processedMarker) { // constructor for PERIODIC_ACK of notified eventids TcrMessagePeriodicAck::TcrMessagePeriodicAck( + std::unique_ptr<DataOutput> dataOutput, const EventIdMapEntryList& entries) { m_msgType = TcrMessage::PERIODIC_ACK; + m_request = std::move(dataOutput); uint32_t numParts = static_cast<uint32_t>(entries.size()); GF_D_ASSERT(numParts > 0); @@ -2178,7 +2211,8 @@ TcrMessagePeriodicAck::TcrMessagePeriodicAck( writeMessageLength(); } -TcrMessagePutAll::TcrMessagePutAll(const Region* region, +TcrMessagePutAll::TcrMessagePutAll(std::unique_ptr<DataOutput> dataOutput, + const Region* region, const HashMapOfCacheable& map, int messageResponsetimeout, ThinClientBaseDM* connectionDM, @@ -2187,6 +2221,7 @@ TcrMessagePutAll::TcrMessagePutAll(const Region* region, m_regionName = region->getFullPath(); m_region = region; m_messageResponseTimeout = messageResponsetimeout; + m_request = std::move(dataOutput); // TODO check the number of parts in this constructor. doubt because in PUT // value can be nullptr also. @@ -2245,7 +2280,8 @@ TcrMessagePutAll::TcrMessagePutAll(const Region* region, writeMessageLength(); } -TcrMessageRemoveAll::TcrMessageRemoveAll(const Region* region, +TcrMessageRemoveAll::TcrMessageRemoveAll(std::unique_ptr<DataOutput> dataOutput, + const Region* region, const VectorOfCacheableKey& keys, const UserDataPtr& aCallbackArgument, ThinClientBaseDM* connectionDM) { @@ -2253,6 +2289,7 @@ TcrMessageRemoveAll::TcrMessageRemoveAll(const Region* region, m_tcdm = connectionDM; m_regionName = region->getFullPath(); m_region = region; + m_request = std::move(dataOutput); // TODO check the number of parts in this constructor. doubt because in PUT // value can be nullptr also. @@ -2288,15 +2325,17 @@ TcrMessageRemoveAll::TcrMessageRemoveAll(const Region* region, } TcrMessageUpdateClientNotification::TcrMessageUpdateClientNotification( - int32_t port) { + std::unique_ptr<DataOutput> dataOutput, int32_t port) { m_msgType = TcrMessage::UPDATE_CLIENT_NOTIFICATION; + m_request = std::move(dataOutput); writeHeader(m_msgType, 1); writeIntPart(port); writeMessageLength(); } -TcrMessageGetAll::TcrMessageGetAll(const Region* region, +TcrMessageGetAll::TcrMessageGetAll(std::unique_ptr<DataOutput> dataOutput, + const Region* region, const VectorOfCacheableKey* keys, ThinClientBaseDM* connectionDM, const UserDataPtr& aCallbackArgument) { @@ -2306,6 +2345,7 @@ TcrMessageGetAll::TcrMessageGetAll(const Region* region, m_callbackArgument = aCallbackArgument; m_regionName = region->getFullPath(); m_region = region; + m_request = std::move(dataOutput); /*CacheableObjectArrayPtr keyArr = nullptr; if (keys != nullptr) { @@ -2348,10 +2388,13 @@ void TcrMessage::InitializeGetallMsg(const UserDataPtr& aCallbackArgument) { writeMessageLength(); } -TcrMessageExecuteCq::TcrMessageExecuteCq(const std::string& str1, +TcrMessageExecuteCq::TcrMessageExecuteCq(std::unique_ptr<DataOutput> dataOutput, + const std::string& str1, const std::string& str2, int state, bool isDurable, ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); + m_msgType = TcrMessage::EXECUTECQ_MSG_TYPE; m_tcdm = connectionDM; m_isDurable = isDurable; @@ -2374,8 +2417,11 @@ TcrMessageExecuteCq::TcrMessageExecuteCq(const std::string& str1, } TcrMessageExecuteCqWithIr::TcrMessageExecuteCqWithIr( - const std::string& str1, const std::string& str2, int state, bool isDurable, + std::unique_ptr<DataOutput> dataOutput, const std::string& str1, + const std::string& str2, int state, bool isDurable, ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); + m_msgType = TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE; m_tcdm = connectionDM; m_isDurable = isDurable; @@ -2398,33 +2444,31 @@ TcrMessageExecuteCqWithIr::TcrMessageExecuteCqWithIr( } TcrMessageExecuteFunction::TcrMessageExecuteFunction( - const std::string& funcName, const CacheablePtr& args, uint8_t getResult, - ThinClientBaseDM* connectionDM, int32_t timeout) { + std::unique_ptr<DataOutput> dataOutput, const std::string& funcName, + const CacheablePtr& args, uint8_t getResult, ThinClientBaseDM* connectionDM, + int32_t timeout) { + m_request = std::move(dataOutput); + m_msgType = TcrMessage::EXECUTE_FUNCTION; m_tcdm = connectionDM; m_hasResult = getResult; uint32_t numOfParts = 3; writeHeader(m_msgType, numOfParts); - // writeBytePart(getResult ? 1 : 0); - // if gfcpp property unit set then timeout will be in millisecond - // otherwise it will be in second - if ((DistributedSystem::getSystemProperties() != nullptr) && - (DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis())) { - writeByteAndTimeOutPart(getResult, timeout); - } else { - writeByteAndTimeOutPart(getResult, (timeout * 1000)); - } + writeByteAndTimeOutPart(getResult, timeout); writeRegionPart(funcName); // function name string writeObjectPart(args); writeMessageLength(); } TcrMessageExecuteRegionFunction::TcrMessageExecuteRegionFunction( - const std::string& funcName, const Region* region, const CacheablePtr& args, + std::unique_ptr<DataOutput> dataOutput, const std::string& funcName, + const Region* region, const CacheablePtr& args, CacheableVectorPtr routingObj, uint8_t getResult, CacheableHashSetPtr failedNodes, int32_t timeout, ThinClientBaseDM* connectionDM, int8_t reExecute) { + m_request = std::move(dataOutput); + m_msgType = TcrMessage::EXECUTE_REGION_FUNCTION; m_tcdm = connectionDM; m_regionName = @@ -2445,15 +2489,7 @@ TcrMessageExecuteRegionFunction::TcrMessageExecuteRegionFunction( numOfParts++; } writeHeader(m_msgType, numOfParts); - - // if gfcpp property unit set then timeout will be in millisecond - // otherwise it will be in second - if ((DistributedSystem::getSystemProperties() != nullptr) && - (DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis())) { - writeByteAndTimeOutPart(getResult, timeout); - } else { - writeByteAndTimeOutPart(getResult, (timeout * 1000)); - } + writeByteAndTimeOutPart(getResult, timeout); writeRegionPart(m_regionName); writeRegionPart(funcName); // function name string writeObjectPart(args); @@ -2466,7 +2502,7 @@ TcrMessageExecuteRegionFunction::TcrMessageExecuteRegionFunction( writeObjectPart(value); } } else { - writeIntPart(0); + writeIntPart(0); } if (failedNodes) { writeIntPart(static_cast<int32_t>(failedNodes->size())); @@ -2479,10 +2515,13 @@ TcrMessageExecuteRegionFunction::TcrMessageExecuteRegionFunction( TcrMessageExecuteRegionFunctionSingleHop:: TcrMessageExecuteRegionFunctionSingleHop( - const std::string& funcName, const Region* region, - const CacheablePtr& args, CacheableHashSetPtr routingObj, - uint8_t getResult, CacheableHashSetPtr failedNodes, bool allBuckets, - int32_t timeout, ThinClientBaseDM* connectionDM) { + std::unique_ptr<DataOutput> dataOutput, const std::string& funcName, + const Region* region, const CacheablePtr& args, + CacheableHashSetPtr routingObj, uint8_t getResult, + CacheableHashSetPtr failedNodes, bool allBuckets, int32_t timeout, + ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); + m_msgType = TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP; m_tcdm = connectionDM; m_regionName = @@ -2498,15 +2537,7 @@ TcrMessageExecuteRegionFunctionSingleHop:: numOfParts++; } writeHeader(m_msgType, numOfParts); - - // if gfcpp property unit set then timeout will be in millisecond - // otherwise it will be in second - if ((DistributedSystem::getSystemProperties() != nullptr) && - (DistributedSystem::getSystemProperties()->readTimeoutUnitInMillis())) { - writeByteAndTimeOutPart(getResult, timeout); - } else { - writeByteAndTimeOutPart(getResult, (timeout * 1000)); - } + writeByteAndTimeOutPart(getResult, timeout); writeRegionPart(m_regionName); writeRegionPart(funcName); // function name string writeObjectPart(args); @@ -2541,7 +2572,9 @@ TcrMessageExecuteRegionFunctionSingleHop:: } TcrMessageGetClientPartitionAttributes::TcrMessageGetClientPartitionAttributes( - const char* regionName) { + std::unique_ptr<DataOutput> dataOutput, const char* regionName) { + m_request = std::move(dataOutput); + m_msgType = TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES; writeHeader(m_msgType, 1); writeRegionPart(regionName); @@ -2549,14 +2582,19 @@ TcrMessageGetClientPartitionAttributes::TcrMessageGetClientPartitionAttributes( } TcrMessageGetClientPrMetadata::TcrMessageGetClientPrMetadata( - const char* regionName) { + std::unique_ptr<DataOutput> dataOutput, const char* regionName) { + m_request = std::move(dataOutput); + m_msgType = TcrMessage::GET_CLIENT_PR_METADATA; writeHeader(m_msgType, 1); writeRegionPart(regionName); writeMessageLength(); } -TcrMessageSize::TcrMessageSize(const char* regionName) { +TcrMessageSize::TcrMessageSize(std::unique_ptr<DataOutput> dataOutput, + const char* regionName) { + m_request = std::move(dataOutput); + m_msgType = TcrMessage::SIZE; writeHeader(m_msgType, 1); writeRegionPart(regionName); @@ -2564,7 +2602,10 @@ TcrMessageSize::TcrMessageSize(const char* regionName) { } TcrMessageUserCredential::TcrMessageUserCredential( - PropertiesPtr creds, ThinClientBaseDM* connectionDM) { + std::unique_ptr<DataOutput> dataOutput, PropertiesPtr creds, + ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); + m_msgType = TcrMessage::USER_CREDENTIAL_MESSAGE; m_tcdm = connectionDM; @@ -2584,7 +2625,10 @@ TcrMessageUserCredential::TcrMessageUserCredential( } TcrMessageRemoveUserAuth::TcrMessageRemoveUserAuth( - bool keepAlive, ThinClientBaseDM* connectionDM) { + std::unique_ptr<DataOutput> dataOutput, bool keepAlive, + ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); + m_msgType = TcrMessage::REMOVE_USER_AUTH; m_tcdm = connectionDM; LOGDEBUG("Tcrmessage sending REMOVE_USER_AUTH message to server"); @@ -2605,12 +2649,12 @@ void TcrMessage::createUserCredentialMessage(TcrConnection* conn) { m_isSecurityHeaderAdded = false; writeHeader(m_msgType, 1); - DataOutput dOut; + auto dOut = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataOutput(); - if (m_creds != nullptr) m_creds->toData(dOut); + if (m_creds != nullptr) m_creds->toData(*dOut); CacheableBytesPtr credBytes = - CacheableBytes::create(dOut.getBuffer(), dOut.getBufferLength()); + CacheableBytes::create(dOut->getBuffer(), dOut->getBufferLength()); CacheableBytesPtr encryptBytes = conn->encryptBytes(credBytes); writeObjectPart(encryptBytes); @@ -2633,13 +2677,13 @@ void TcrMessage::addSecurityPart(int64_t connectionId, int64_t unique_id, } m_isSecurityHeaderAdded = true; LOGDEBUG("addSecurityPart( , ) "); - DataOutput dOutput; + auto dOutput = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataOutput(); - dOutput.writeInt(connectionId); - dOutput.writeInt(unique_id); + dOutput->writeInt(connectionId); + dOutput->writeInt(unique_id); CacheableBytesPtr bytes = - CacheableBytes::create(dOutput.getBuffer(), dOutput.getBufferLength()); + CacheableBytes::create(dOutput->getBuffer(), dOutput->getBufferLength()); CacheableBytesPtr encryptBytes = conn->encryptBytes(bytes); @@ -2663,12 +2707,12 @@ void TcrMessage::addSecurityPart(int64_t connectionId, TcrConnection* conn) { } m_isSecurityHeaderAdded = true; LOGDEBUG("TcrMessage::addSecurityPart only connid"); - DataOutput dOutput; + auto dOutput = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataOutput(); - dOutput.writeInt(connectionId); + dOutput->writeInt(connectionId); CacheableBytesPtr bytes = - CacheableBytes::create(dOutput.getBuffer(), dOutput.getBufferLength()); + CacheableBytes::create(dOutput->getBuffer(), dOutput->getBufferLength()); CacheableBytesPtr encryptBytes = conn->encryptBytes(bytes); @@ -2681,7 +2725,9 @@ void TcrMessage::addSecurityPart(int64_t connectionId, TcrConnection* conn) { ->asChar()); } -TcrMessageRequestEventValue::TcrMessageRequestEventValue(EventIdPtr eventId) { +TcrMessageRequestEventValue::TcrMessageRequestEventValue( + std::unique_ptr<DataOutput> dataOutput, EventIdPtr eventId) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::REQUEST_EVENT_VALUE; uint32_t numOfParts = 1; @@ -2691,8 +2737,9 @@ TcrMessageRequestEventValue::TcrMessageRequestEventValue(EventIdPtr eventId) { } TcrMessageGetPdxIdForType::TcrMessageGetPdxIdForType( - const CacheablePtr& pdxType, ThinClientBaseDM* connectionDM, - int32_t pdxTypeId) { + std::unique_ptr<DataOutput> dataOutput, const CacheablePtr& pdxType, + ThinClientBaseDM* connectionDM, int32_t pdxTypeId) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::GET_PDX_ID_FOR_TYPE; m_tcdm = connectionDM; @@ -2706,9 +2753,10 @@ TcrMessageGetPdxIdForType::TcrMessageGetPdxIdForType( ->asChar()); } -TcrMessageAddPdxType::TcrMessageAddPdxType(const CacheablePtr& pdxType, - ThinClientBaseDM* connectionDM, - int32_t pdxTypeId) { +TcrMessageAddPdxType::TcrMessageAddPdxType( + std::unique_ptr<DataOutput> dataOutput, const CacheablePtr& pdxType, + ThinClientBaseDM* connectionDM, int32_t pdxTypeId) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::ADD_PDX_TYPE; m_tcdm = connectionDM; @@ -2724,8 +2772,9 @@ TcrMessageAddPdxType::TcrMessageAddPdxType(const CacheablePtr& pdxType, } TcrMessageGetPdxIdForEnum::TcrMessageGetPdxIdForEnum( - const CacheablePtr& pdxType, ThinClientBaseDM* connectionDM, - int32_t pdxTypeId) { + std::unique_ptr<DataOutput> dataOutput, const CacheablePtr& pdxType, + ThinClientBaseDM* connectionDM, int32_t pdxTypeId) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::GET_PDX_ID_FOR_ENUM; m_tcdm = connectionDM; @@ -2739,9 +2788,10 @@ TcrMessageGetPdxIdForEnum::TcrMessageGetPdxIdForEnum( ->asChar()); } -TcrMessageAddPdxEnum::TcrMessageAddPdxEnum(const CacheablePtr& pdxType, - ThinClientBaseDM* connectionDM, - int32_t pdxTypeId) { +TcrMessageAddPdxEnum::TcrMessageAddPdxEnum( + std::unique_ptr<DataOutput> dataOutput, const CacheablePtr& pdxType, + ThinClientBaseDM* connectionDM, int32_t pdxTypeId) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::ADD_PDX_ENUM; m_tcdm = connectionDM; @@ -2757,7 +2807,9 @@ TcrMessageAddPdxEnum::TcrMessageAddPdxEnum(const CacheablePtr& pdxType, } TcrMessageGetPdxTypeById::TcrMessageGetPdxTypeById( - int32_t typeId, ThinClientBaseDM* connectionDM) { + std::unique_ptr<DataOutput> dataOutput, int32_t typeId, + ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::GET_PDX_TYPE_BY_ID; m_tcdm = connectionDM; @@ -2774,7 +2826,9 @@ TcrMessageGetPdxTypeById::TcrMessageGetPdxTypeById( } TcrMessageGetPdxEnumById::TcrMessageGetPdxEnumById( - int32_t typeId, ThinClientBaseDM* connectionDM) { + std::unique_ptr<DataOutput> dataOutput, int32_t typeId, + ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::GET_PDX_ENUM_BY_ID; m_tcdm = connectionDM; @@ -2791,7 +2845,9 @@ TcrMessageGetPdxEnumById::TcrMessageGetPdxEnumById( } TcrMessageGetFunctionAttributes::TcrMessageGetFunctionAttributes( - const std::string& funcName, ThinClientBaseDM* connectionDM) { + std::unique_ptr<DataOutput> dataOutput, const std::string& funcName, + ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::GET_FUNCTION_ATTRIBUTES; m_tcdm = connectionDM; @@ -2801,8 +2857,10 @@ TcrMessageGetFunctionAttributes::TcrMessageGetFunctionAttributes( writeMessageLength(); } -TcrMessageKeySet::TcrMessageKeySet(const std::string& funcName, +TcrMessageKeySet::TcrMessageKeySet(std::unique_ptr<DataOutput> dataOutput, + const std::string& funcName, ThinClientBaseDM* connectionDM) { + m_request = std::move(dataOutput); m_msgType = TcrMessage::KEY_SET; m_tcdm = connectionDM; @@ -2812,17 +2870,21 @@ TcrMessageKeySet::TcrMessageKeySet(const std::string& funcName, writeMessageLength(); } -void TcrMessage::setData(const char* bytearray, int32_t len, uint16_t memId) { +void TcrMessage::setData(const char* bytearray, int32_t len, uint16_t memId, + const SerializationRegistry& serializationRegistry, + MemberListForVersionStamp& memberListForVersionStamp) { + if (m_request == nullptr) { + m_request = m_tcdm->getConnectionManager().getCacheImpl()->getCache()->createDataOutput(); + } if (bytearray) { DeleteArray<const char> delByteArr(bytearray); - handleByteArrayResponse(bytearray, len, memId); + handleByteArrayResponse(bytearray, len, memId, serializationRegistry, + memberListForVersionStamp); } } TcrMessage::~TcrMessage() { - GF_SAFE_DELETE(m_request); GF_SAFE_DELETE(m_cqs); - GF_SAFE_DELETE(m_delta); /* adongre * CID 29167: Non-array delete for scalars (DELETE_ARRAY) * Coverity - II