Github user fdaniel7 commented on a diff in the pull request:

    https://github.com/apache/incubator-geode/pull/245#discussion_r84839367
  
    --- Diff: geode-client-native/src/cppcache/impl/ClientMetadataService.cpp 
---
    @@ -1,925 +1 @@
    -/*=========================================================================
    - * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
    - * This product is protected by U.S. and international copyright
    - * and intellectual property laws. Pivotal products are covered by
    - * one or more patents listed at http://www.pivotal.io/patents.
    - *=========================================================================
    - */
    -#include "TcrMessage.hpp"
    -#include "ClientMetadataService.hpp"
    -#include "ThinClientPoolDM.hpp"
    -#include "../FixedPartitionResolver.hpp"
    -#include <iterator>
    -#include <cstdlib>
    -#include <climits>
    -
    -namespace gemfire {
    -  const char * ClientMetadataService::NC_CMDSvcThread = "NC CMDSvcThread";
    -  ClientMetadataService::~ClientMetadataService ()
    -  {
    -    delete m_regionQueue;
    -    if(m_bucketWaitTimeout > 0)
    -    {
    -      try
    -      {
    -        std::map<std::string, PRbuckets*>::iterator  bi;
    -        for (bi = m_bucketStatus.begin(); bi != m_bucketStatus.end(); ++bi)
    -        {
    -          delete bi->second;
    -        }
    -
    -      }catch(...)
    -      {
    -        LOGINFO("Exception in ClientMetadataService destructor");
    -      }
    -    }
    -  }
    -
    -  ClientMetadataService::ClientMetadataService(PoolPtr pool)
    -    /* adongre
    -    * CID 28928: Uninitialized scalar field (UNINIT_CTOR)
    -    */
    -    : m_run(false)
    -
    -  {
    -    m_regionQueue = new Queue<std::string>(false);
    -    m_pool = pool;
    -    m_bucketWaitTimeout = 
DistributedSystem::getSystemProperties()->bucketWaitTimeout();
    -  }
    -
    -  int ClientMetadataService::svc() {
    -    DistributedSystemImpl::setThreadName(NC_CMDSvcThread);
    -    LOGINFO("ClientMetadataService started for pool %s", 
m_pool->getName()); 
    -    while (m_run) {
    -      m_regionQueueSema.acquire( );
    -      ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( 
m_pool.ptr( ) ); 
    -      CacheImpl * cache = tcrdm->getConnectionManager().getCacheImpl();
    -      while(true)
    -      {
    -        std::string * regionFullPath = m_regionQueue->get();
    -
    -        if(regionFullPath != NULL && regionFullPath->c_str() != NULL )
    -        {
    -          while(true) 
    -          {
    -            if(m_regionQueue->size() > 0) 
    -            {
    -              std::string * nextRegionFullPath = m_regionQueue->get();
    -              if(nextRegionFullPath != NULL && nextRegionFullPath->c_str() 
!= NULL && regionFullPath->compare(nextRegionFullPath->c_str()) == 0)
    -              {
    -                delete nextRegionFullPath;//we are going for same
    -              }
    -              else
    -              {
    -                //different region; put it back
    -                m_regionQueue->put(nextRegionFullPath);
    -                break;
    -              }
    -            } else
    -              break;
    -          }
    -        }
    -
    -        if (!cache->isCacheDestroyPending() && regionFullPath != NULL && 
regionFullPath->c_str() != NULL) {
    -          getClientPRMetadata(regionFullPath->c_str());
    -          delete regionFullPath;
    -          regionFullPath = NULL;
    -        }
    -        else
    -        {
    -          delete regionFullPath;
    -          regionFullPath = NULL;
    -          break;
    -        }
    -      }
    -      //while(m_regionQueueSema.tryacquire( ) != -1); // release all
    -    }
    -    LOGINFO("ClientMetadataService stopped for pool %s", 
m_pool->getName());
    -    return 0;
    -  }
    -
    -  void  ClientMetadataService::getClientPRMetadata(const char * 
regionFullPath) {
    -    if (regionFullPath == NULL) return;
    -    ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( 
m_pool.ptr( ) );
    -    if(tcrdm == NULL)
    -    {
    -      throw IllegalArgumentException("ClientMetaData: pool cast to 
ThinClientPoolDM failed");
    -    }
    -    // That means metadata for the region not found, So only for the first 
time for a particular region use GetClientPartitionAttributesOp
    -    // TcrMessage to fetch the metadata and put it into map for later 
use.send this message to server and get metadata from server.    
    -    TcrMessage reply;
    -    std::string path(regionFullPath);
    -    ClientMetadataPtr cptr = NULLPTR;
    -   {
    -     ReadGuard guard( m_regionMetadataLock );
    -    RegionMetadataMapType::iterator itr = m_regionMetaDataMap.find(path);
    -    if (itr != m_regionMetaDataMap.end()) {
    -      cptr = itr->second;
    -    }
    -     //cptr = m_regionMetaDataMap[path];
    -   }
    -    ClientMetadataPtr newCptr = NULLPTR;
    -
    -    {
    -      //ACE_Guard< ACE_Recursive_Thread_Mutex > guard( 
m_regionMetadataLock );  
    -
    -
    -      if (cptr == NULLPTR) {
    -        TcrMessage request(TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES, 
regionFullPath);
    -        GfErrType err = tcrdm->sendSyncRequest(request, reply);        
    -        if (err == GF_NOERR && reply.getMessageType() == 
TcrMessage::RESPONSE_CLIENT_PARTITION_ATTRIBUTES) {
    -          cptr = new ClientMetadata(reply.getNumBuckets(), 
reply.getColocatedWith(), tcrdm, reply.getFpaSet());
    -          if(m_bucketWaitTimeout > 0 && reply.getNumBuckets() > 0)
    -          {
    -            WriteGuard guard( m_PRbucketStatusLock );
    -            m_bucketStatus[regionFullPath] = new 
PRbuckets(reply.getNumBuckets());
    -          }
    -          LOGDEBUG("ClientMetadata buckets %d ", reply.getNumBuckets());
    -          if (cptr != NULLPTR) {
    -            //m_regionMetaDataMap[regionFullPath] = cptr;    
    -          }
    -        }        
    -      }
    -    }
    -    if (cptr == NULLPTR) {
    -      return;
    -    }
    -    CacheableStringPtr colocatedWith;
    -    if (cptr != NULLPTR) {
    -      colocatedWith = cptr->getColocatedWith();
    -    }
    -    if (colocatedWith == NULLPTR) {     
    -      newCptr = SendClientPRMetadata(regionFullPath,cptr);
    -      //now we will get new instance so assign it again
    -      if(newCptr != NULLPTR) 
    -      {
    -        cptr->setPreviousone(NULLPTR);
    -        newCptr->setPreviousone(cptr);
    -        WriteGuard guard( m_regionMetadataLock );
    -        m_regionMetaDataMap[path] = newCptr;
    -           LOGINFO("Updated client meta data");
    -      }
    -    }
    -    else {
    -      {
    -        //ACE_Guard< ACE_Recursive_Thread_Mutex > guard( 
m_regionMetadataLock );
    -        //m_regionMetaDataMap[colocatedWith->asChar()] = cptr;
    -      }      
    -      newCptr = SendClientPRMetadata(colocatedWith->asChar(),cptr);        
 
    -
    -      if(newCptr != NULLPTR) 
    -      {
    -        cptr->setPreviousone(NULLPTR);
    -        newCptr->setPreviousone(cptr);
    -        //now we will get new instance so assign it again
    -        WriteGuard guard( m_regionMetadataLock );
    -        m_regionMetaDataMap[colocatedWith->asChar()] = newCptr;
    -        m_regionMetaDataMap[path] = newCptr;
    -           LOGINFO("Updated client meta data");
    -      }
    -    }    
    -  }
    -
    -  ClientMetadataPtr ClientMetadataService::SendClientPRMetadata(const 
char* regionPath, ClientMetadataPtr cptr) {
    -    ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( 
m_pool.ptr( ) );
    -    if(tcrdm == NULL)
    -    {
    -      throw IllegalArgumentException("ClientMetaData: pool cast to 
ThinClientPoolDM failed");
    -    }
    -    TcrMessage request(TcrMessage::GET_CLIENT_PR_METADATA, regionPath);
    -    TcrMessage reply;
    -    // send this message to server and get metadata from server.
    -    LOGFINE("Now sending GET_CLIENT_PR_METADATA for getting from server: 
%s",regionPath);
    -    RegionPtr region = NULLPTR;    
    -    GfErrType err = tcrdm->sendSyncRequest(request, reply);    
    -    if (err == GF_NOERR && reply.getMessageType() == 
TcrMessage::RESPONSE_CLIENT_PR_METADATA) {
    -      tcrdm->getConnectionManager().getCacheImpl()->getRegion(regionPath, 
region);      
    -      if (region != NULLPTR) {       
    -        LocalRegion* lregion = dynamic_cast<LocalRegion*> (region.ptr());  
        
    -        lregion->getRegionStats()->incMetaDataRefreshCount();        
    -      }      
    -      std::vector<BucketServerLocationsType>* metadata = 
reply.getMetadata();
    -      if(metadata == NULL)
    -        return NULLPTR;
    -      if ( metadata->empty()) {
    -        delete metadata;
    -        return NULLPTR;
    -      }
    -      ClientMetadata* newCptr = new ClientMetadata(*(cptr.ptr()));
    -      for (std::vector<BucketServerLocationsType>::iterator iter = 
metadata->begin();
    -        iter != metadata->end(); ++iter) {      
    -          if (!(*iter).empty()) {
    -            
newCptr->updateBucketServerLocations((*iter).at(0)->getBucketId(), (*iter));    
               
    -          }
    -      }
    -      delete metadata;
    -      ClientMetadataPtr newCMDPtr(newCptr);
    -      return newCMDPtr;
    -    }
    -    return NULLPTR;
    -  }
    -
    -  void ClientMetadataService::getBucketServerLocation(const RegionPtr& 
region, const CacheableKeyPtr& key,
    -    const CacheablePtr& value,const UserDataPtr& aCallbackArgument, bool 
isPrimary, BucketServerLocationPtr& serverLocation,  int8_t& version)
    -  {
    -    //ACE_Guard< ACE_Recursive_Thread_Mutex > guard( m_regionMetadataLock 
);    
    -    if (region != NULLPTR) {
    -     ReadGuard guard( m_regionMetadataLock );
    -     LOGDEBUG("ClientMetadataService::getBucketServerLocation 
m_regionMetaDataMap size is %d",m_regionMetaDataMap.size());   
    -    std::string path(region->getFullPath());
    -    ClientMetadataPtr cptr = NULLPTR;
    -    RegionMetadataMapType::iterator itr = m_regionMetaDataMap.find(path);
    -    if (itr != m_regionMetaDataMap.end()) {
    -      cptr = itr->second;
    -    }
    -      //ClientMetadataPtr cptr = m_regionMetaDataMap[path];
    -      if (cptr == NULLPTR) {
    -        //serverLocation = BucketServerLocation();
    -        return;
    -      }
    -      CacheableKeyPtr resolvekey;
    -      const PartitionResolverPtr& resolver = 
region->getAttributes()->getPartitionResolver();
    -      
    -      EntryEvent event(region, key, value, NULLPTR, aCallbackArgument, 
false);
    -      int bucketId = 0;
    -      if (resolver == NULLPTR) {
    -        resolvekey = key;
    -      }
    -      else {
    -        resolvekey = resolver->getRoutingObject(event);
    -        if (resolvekey == NULLPTR) {
    -          throw IllegalStateException("The RoutingObject returned by 
PartitionResolver is null.");
    -        }
    -      }
    -      CacheableHashSetPtr& partNames = cptr->getFixedPartitionNames();
    -      if (partNames != NULLPTR && partNames->size() > 0) {        
    -        FixedPartitionResolverPtr fpResolver( 
dynamic_cast<FixedPartitionResolver*> (resolver.ptr() ));
    -        if(fpResolver != NULLPTR) {       
    -          const char* partition = fpResolver->getPartitionName(event, 
partNames);        
    -          if ( partition == NULL ) {
    -            throw IllegalStateException("partition name returned by 
Partition resolver is null.");
    -          }
    -          else {
    -            bucketId = cptr->assignFixedBucketId(partition, resolvekey);
    -            if ( bucketId == -1) {
    -              return;
    -            }          
    -          }
    -        }
    -      }      
    -      else {
    -        if (cptr->getTotalNumBuckets() > 0 ) {          
    -          bucketId = std::abs((int)resolvekey->hashcode() % 
cptr->getTotalNumBuckets());          
    -        }
    -      }
    -      cptr->getServerLocation(bucketId, isPrimary, serverLocation, 
version);
    -    }    
    -  }
    -
    -  void 
ClientMetadataService::removeBucketServerLocation(BucketServerLocation 
serverLocation)
    -  {
    -    ReadGuard guard( m_regionMetadataLock );
    -    for(RegionMetadataMapType::iterator regionMetadataIter= 
m_regionMetaDataMap.begin();
    -      regionMetadataIter!=m_regionMetaDataMap.end();
    -      regionMetadataIter++) {
    -        ClientMetadataPtr cptr = (*regionMetadataIter).second;
    -        if (cptr != NULLPTR) {
    -          //Yogesh has commented out this as it was causing a SIGV
    -          //cptr->removeBucketServerLocation(serverLocation);
    -        }
    -    }
    -  }
    -
    -  ClientMetadataPtr ClientMetadataService::getClientMetadata(const char* 
regionFullPath)
    -  {
    -    ReadGuard guard( m_regionMetadataLock );
    -    RegionMetadataMapType::iterator regionMetadataIter = 
m_regionMetaDataMap.find(regionFullPath);
    -    if (regionMetadataIter != m_regionMetaDataMap.end()) {
    -      return (*regionMetadataIter).second;
    -    }
    -    return NULLPTR;
    -  }
    -
    - /*const  PartitionResolverPtr& ClientMetadataService::getResolver(const 
RegionPtr& region, const CacheableKeyPtr& key,
    -    const UserDataPtr& aCallbackArgument){
    -      //const char * regionFullPath = region->getFullPath();
    -      //if (regionFullPath != NULL) {
    -        //const RegionAttributesPtr& rAttrsPtr = region->getAttributes();
    -        return region->getAttributes()->getPartitionResolver();
    -      //}
    -   }*/   
    -
    -  /*BucketServerLocation 
ClientMetadataService::getServerLocation(ClientMetadataPtr cptr, int bucketId, 
bool tryPrimary)
    -  {
    -  LOGFINE("Inside getServerLocation");
    -  if (cptr == NULLPTR) {
    -  LOGDEBUG("MetaData does not exist");
    -  return BucketServerLocation();
    -  }
    -  LOGFINE("Ending getServerLocation");
    -  return cptr->getServerLocation(bucketId, tryPrimary);
    -  }*/
    -
    -  void ClientMetadataService::populateDummyServers(const char * 
regionName, ClientMetadataPtr cptr)
    -  {
    -    WriteGuard guard( m_regionMetadataLock );
    -    m_regionMetaDataMap[regionName] = cptr;
    -  }
    -
    -  void ClientMetadataService::enqueueForMetadataRefresh(
    -    const char* regionFullPath, int8 serverGroupFlag) {
    -      ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( 
m_pool.ptr( ) );
    -      if(tcrdm == NULL)
    -      {
    -        throw IllegalArgumentException("ClientMetaData: pool cast to 
ThinClientPoolDM failed");
    -      }
    -      RegionPtr region;
    -      
tcrdm->getConnectionManager().getCacheImpl()->getRegion(regionFullPath, 
region);      
    -      LocalRegion* lregion = dynamic_cast<LocalRegion*> (region.ptr());    
      
    -      lregion->getRegionStats()->incNonSingleHopCount();//we are here 
means nonSinglehop
    -
    -      std::string serverGroup = tcrdm->getServerGroup();    
    -      if (serverGroup.length() != 0 ) {    
    -        CacheImpl::setServerGroupFlag(serverGroupFlag);
    -        if (serverGroupFlag == 2 ) {
    -          LOGFINER("Network hop but, from within same server-group, so no 
metadata fetch from the server");
    -          return;
    -        }
    -      }
    -
    -      if (region != NULLPTR) {       
    -        ThinClientRegion* tcrRegion = dynamic_cast<ThinClientRegion*> 
(region.ptr());
    -        {
    -          TryWriteGuard guardRegionMetaDataRefresh( 
tcrRegion->getMataDataMutex(), tcrRegion->getMetaDataRefreshed());
    -          if ( tcrRegion->getMetaDataRefreshed() ) {
    -            return;
    -          }
    -          LOGFINE("Network hop so fetching single hop metadata from the 
server");
    -          CacheImpl::setNetworkHopFlag(true);
    -          tcrRegion->setMetaDataRefreshed(true);            
    -          std::string* tempRegionPath = new std::string(regionFullPath);
    -          m_regionQueue->put(tempRegionPath);
    -          m_regionQueueSema.release( );
    -        }
    -      }   
    -  }
    -
    -  HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>* 
ClientMetadataService::getServerToFilterMap(
    -    const VectorOfCacheableKey* keys, const RegionPtr& region, bool 
isPrimary) {
    -      //const char* regionFullPath = region->getFullPath();
    -      ClientMetadataPtr cptr = NULLPTR;
    -      {   
    -        ReadGuard guard( m_regionMetadataLock );
    -        RegionMetadataMapType::iterator cptrIter = 
m_regionMetaDataMap.find(region->getFullPath());
    -
    -        if(cptrIter != m_regionMetaDataMap.end())
    -        {
    -          cptr = cptrIter->second;
    -        }
    -
    -        if (cptr == NULLPTR || keys == NULL) {
    -          // enqueueForMetadataRefresh(region->getFullPath());
    -          return NULL;
    -          //               //serverLocation = BucketServerLocation();
    -          //               return;
    -        }
    -      }
    -      //int totalNumberOfBuckets = cptr->getTotalNumBuckets();
    -      HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>* result = 
new HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>();
    -      VectorOfCacheableKeyPtr keysWhichLeft(new VectorOfCacheableKey());
    -
    -      std::map<int, BucketServerLocationPtr> buckets;
    -
    -      for (VectorOfCacheableKey::Iterator iter = keys->begin(); iter != 
keys->end(); iter++) {
    -        CacheableKeyPtr key = *iter;
    -        LOGDEBUG("cmds = %s", key->toString()->toString());
    -        PartitionResolverPtr resolver = 
region->getAttributes()->getPartitionResolver();
    -        CacheableKeyPtr resolveKey;
    -
    -        if (resolver == NULLPTR) {
    -          // client has not registered PartitionResolver
    -          // Assuming even PR at server side is not using PartitionResolver
    -          resolveKey = key;
    -        } else {
    -          EntryEvent event(region, key, NULLPTR, NULLPTR, NULLPTR, false);
    -          resolveKey = resolver->getRoutingObject(event);
    -        }
    -
    -        int bucketId = std::abs((int)resolveKey->hashcode() % 
cptr->getTotalNumBuckets());
    -        VectorOfCacheableKeyPtr keyList = NULLPTR;
    -        std::map<int, BucketServerLocationPtr>::iterator bucketsIter =
    -          buckets.find(bucketId);
    -
    -        if (bucketsIter == buckets.end()) {
    -          int8 version = -1;
    -          //BucketServerLocationPtr serverLocation(new 
BucketServerLocation());
    -          BucketServerLocationPtr serverLocation = NULLPTR;
    -          cptr->getServerLocation(bucketId, isPrimary, 
serverLocation,version);
    -          if(serverLocation == NULLPTR)
    -          {//HItesh:if server not returns all buckets, need to confiem 
with PR team about this why??
    -            keysWhichLeft->push_back(key);
    -            continue;
    -          }
    -          else if(!serverLocation->isValid())
    -          {
    -            keysWhichLeft->push_back(key);
    -            continue;
    -          }
    -          //if(serverLocation == NULLPTR)
    -          //continue;//hitesh need to fix
    -          buckets[bucketId] = serverLocation;
    -          HashMapT<BucketServerLocationPtr, 
VectorOfCacheableKeyPtr>::Iterator itrRes = result->find(serverLocation);
    -          //keyList = (*result)[serverLocation];
    -
    -          if(itrRes == result->end())
    -          {
    -            keyList = new VectorOfCacheableKey();
    -            result->insert(serverLocation, keyList);
    -          }
    -          else
    -            keyList = itrRes.second();
    -          LOGDEBUG("new keylist buckets =%d res = %d", buckets.size(), 
result->size() );
    -        } else {
    -          keyList = (*result)[bucketsIter->second];
    -        }
    -
    -        keyList->push_back(key);
    -      }
    -
    -      if(keysWhichLeft->size() > 0 && result->size() > 0)
    -      {//add left keys in result
    -        int keyLefts = keysWhichLeft->size();
    -        int totalServers = result->size();
    -        int perServer = keyLefts/totalServers + 1;
    -
    -        int keyIdx = 0;
    -        for(HashMapT<BucketServerLocationPtr, 
VectorOfCacheableKeyPtr>::Iterator locationIter = result->begin();
    -          locationIter != result->end(); locationIter++)
    -        {
    -          VectorOfCacheableKeyPtr keys = locationIter.second();
    -          for(int i = 0; i < perServer; i++)
    -          {
    -            if(keyIdx < keyLefts)
    -            {
    -              keys->push_back(keysWhichLeft->at(keyIdx++));
    -            }
    -            else
    -              break;
    -          }
    -          if(keyIdx >= keyLefts)
    -            break;//done
    -        }
    -      }
    -      else if(result->size() == 0)
    -      {//not be able to map any key
    -        return NULL;//it will force all keys to send to one server
    -      }
    -
    -      return result;
    -  }
    -
    -
    -  void ClientMetadataService::markPrimaryBucketForTimeout(const RegionPtr& 
region, const CacheableKeyPtr& key,
    -    const CacheablePtr& value,const UserDataPtr& aCallbackArgument, bool 
isPrimary, BucketServerLocationPtr& serverLocation,  int8_t& version)
    -  {
    -    if(m_bucketWaitTimeout == 0)
    -      return;
    -
    -    WriteGuard guard( m_PRbucketStatusLock ); 
    -
    -    getBucketServerLocation(region, key, value, aCallbackArgument, 
false/*look for secondary host*/, serverLocation, version);
    -
    -    if(serverLocation != NULLPTR && serverLocation->isValid())
    -    {
    -      LOGDEBUG("Server host and port are %s:%d", 
serverLocation->getServerName().c_str(), serverLocation->getPort());
    -      int32_t bId = serverLocation->getBucketId();
    -
    -      std::map<std::string, PRbuckets*>::iterator  bs = 
m_bucketStatus.find(region->getFullPath());
    -
    -      if(bs != m_bucketStatus.end())
    -      {
    -        bs->second->setBucketTimeout(bId);
    -        LOGDEBUG("marking bucket %d as timeout ", bId); 
    -      }
    -    }  
    -  }
    -
    -  HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* 
ClientMetadataService::groupByBucketOnClientSide(
    -    const RegionPtr& region, CacheableVectorPtr* keySet, 
ClientMetadataPtr& metadata) {
    -
    -      HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* bucketToKeysMap = 
new HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>();
    -      for (CacheableVector::Iterator itr = (*keySet)->begin(); itr != 
(*keySet)->end(); ++itr) {
    -        CacheableKeyPtr key = dynCast<CacheableKeyPtr> (*itr);
    -        PartitionResolverPtr resolver = 
region->getAttributes()->getPartitionResolver();
    -        CacheableKeyPtr resolvekey;
    -        EntryEvent event(region, key, NULLPTR, NULLPTR, NULLPTR, false);
    -        int bucketId = -1;
    -        if (resolver == NULLPTR) {
    -          resolvekey = key;
    -        }
    -        else {
    -          resolvekey = resolver->getRoutingObject(event);
    -          if (resolvekey == NULLPTR) {
    -            throw IllegalStateException("The RoutingObject returned by 
PartitionResolver is null.");
    -          }
    -        }
    -        CacheableHashSetPtr partitionNames = 
metadata->getFixedPartitionNames();
    -        if (partitionNames != NULLPTR && partitionNames->size() > 0) {
    -          FixedPartitionResolverPtr fpResolver( 
dynamic_cast<FixedPartitionResolver*> (resolver.ptr() ));
    -          if(fpResolver != NULLPTR) {
    -            const char* partition = fpResolver->getPartitionName(event, 
partitionNames);
    -            if ( partition == NULL ) {
    -              throw IllegalStateException("partition name returned by 
Partition resolver is null.");
    -            }
    -            else {
    -              bucketId = metadata->assignFixedBucketId(partition, 
resolvekey);
    -              if ( bucketId == -1) {
    -                this->enqueueForMetadataRefresh(region->getFullPath(), 0);
    -              }
    -            }
    -          }
    -        }
    -        else {
    -          if (metadata->getTotalNumBuckets() > 0 ) {
    -            bucketId = std::abs((int)resolvekey->hashcode() % 
metadata->getTotalNumBuckets());
    -          }
    -        }
    -        HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>::Iterator iter =  
bucketToKeysMap->find(CacheableInt32::create(bucketId));
    -        CacheableHashSetPtr bucketKeys;
    -        if (iter == bucketToKeysMap->end()) {
    -          bucketKeys = CacheableHashSet::create();
    -          bucketToKeysMap->insert(CacheableInt32::create(bucketId), 
bucketKeys);
    -        }
    -        else {
    -          bucketKeys = iter.second();
    -        }
    -        bucketKeys->insert(key);
    -      }
    -      return bucketToKeysMap;
    -  }
    -
    -HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* 
ClientMetadataService::getServerToFilterMapFESHOP(
    -  CacheableVectorPtr* routingKeys, const RegionPtr& region, bool 
isPrimary) {
    -
    -    ClientMetadataPtr cptr = getClientMetadata(region->getFullPath());
    -
    -    if (cptr == NULLPTR /*|| cptr->adviseRandomServerLocation() == 
NULLPTR*/) {
    -      enqueueForMetadataRefresh(region->getFullPath(), 0);
    -      return NULL;
    -    }
    -
    -    if (routingKeys == NULL) {
    -      return NULL;
    -    }
    -
    -    HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* bucketToKeysMap = 
groupByBucketOnClientSide(region, routingKeys, cptr);
    -    CacheableHashSetPtr bucketSet = CacheableHashSet::create();
    -    for (HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>::Iterator iter = 
bucketToKeysMap->begin(); iter != bucketToKeysMap->end(); ++iter) {
    -      bucketSet->insert(iter.first());
    -    }
    -    LOGDEBUG("ClientMetadataService::getServerToFilterMapFESHOP: bucketSet 
size = %d ", bucketSet->size());
    -
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* 
serverToBuckets = groupByServerToBuckets(cptr, bucketSet, isPrimary);
    -
    -    if (serverToBuckets == NULL) {
    -      delete bucketToKeysMap;
    -      return NULL;
    -    }
    -
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* 
serverToKeysMap = new HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>();
    -
    -    for (HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator 
itrRes = serverToBuckets->begin(); itrRes != serverToBuckets->end(); ++itrRes) {
    -      BucketServerLocationPtr serverLocation = itrRes.first();
    -      CacheableHashSetPtr buckets = itrRes.second();
    -      for(CacheableHashSet::Iterator bucket = buckets->begin(); bucket != 
buckets->end(); ++bucket) {
    -        HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator 
iter = serverToKeysMap->find(serverLocation);
    -        CacheableHashSetPtr keys;
    -        if (iter == serverToKeysMap->end()) {
    -          keys = CacheableHashSet::create();
    -        }
    -        else {
    -          keys = iter.second();
    -        }
    -        HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>::Iterator 
bucketToKeysiter = bucketToKeysMap->find(*bucket);
    -        if (bucketToKeysiter !=  bucketToKeysMap->end()) {
    -          CacheableHashSetPtr bkeys = bucketToKeysiter.second();
    -          for (CacheableHashSet::Iterator itr = bkeys->begin(); itr != 
bkeys->end(); ++itr) {
    -            keys->insert(*itr);
    -          }
    -        }
    -        serverToKeysMap->insert(serverLocation, keys);
    -      }
    -    }
    -    delete bucketToKeysMap;
    -    delete serverToBuckets;
    -    return serverToKeysMap;
    -}
    -
    -BucketServerLocationPtr ClientMetadataService::findNextServer
    -  (HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* 
serverToBucketsMap, CacheableHashSetPtr& currentBucketSet) {
    -
    -    BucketServerLocationPtr serverLocation;
    -    int max = -1;
    -    std::vector<BucketServerLocationPtr> nodesOfEqualSize;
    -    for(HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator 
itr = serverToBucketsMap->begin(); itr != serverToBucketsMap->end(); ++itr) {
    -      CacheableHashSetPtr buckets = CacheableHashSet::create();
    -      CacheableHashSetPtr sBuckets = itr.second();
    -
    -      for(CacheableHashSet::Iterator sItr = sBuckets->begin(); sItr != 
sBuckets->end(); ++sItr) {
    -        buckets->insert(*sItr);
    -      }
    -
    -      LOGDEBUG("ClientMetadataService::findNextServer 
currentBucketSet->size() = %d  bucketSet->size() = %d ", 
currentBucketSet->size(), buckets->size());
    -
    -      for(CacheableHashSet::Iterator currentBucketSetIter = 
currentBucketSet->begin(); currentBucketSetIter != currentBucketSet->end(); 
++currentBucketSetIter) {        
    -        buckets->erase(*currentBucketSetIter);
    -        LOGDEBUG("ClientMetadataService::findNextServer bucketSet->size() 
= %d ",  buckets->size());        
    -      }
    -
    -      int size = buckets->size();
    -      if (max < size) {
    -        max = size;
    -        serverLocation = itr.first();
    -        nodesOfEqualSize.clear();
    -        nodesOfEqualSize.push_back(serverLocation);
    -      }
    -      else if (max == size){
    -        nodesOfEqualSize.push_back(serverLocation);
    -      }
    -    }
    -
    -    int nodeSize = nodesOfEqualSize.size();
    -    if(nodeSize > 0) {
    -      RandGen randgen;
    -      int random = randgen(nodeSize);
    -      return nodesOfEqualSize.at(random);
    -    }
    -    return NULLPTR;
    -}
    -
    -bool ClientMetadataService::AreBucketSetsEqual(CacheableHashSetPtr& 
currentBucketSet, CacheableHashSetPtr& bucketSet) {
    -
    -  int32_t currentBucketSetSize = currentBucketSet->size();
    -  int32_t bucketSetSetSize = bucketSet->size();
    -
    -  LOGDEBUG("ClientMetadataService::AreBucketSetsEqual currentBucketSetSize 
= %d bucketSetSetSize = %d ", currentBucketSetSize, bucketSetSetSize);
    -
    -  if (currentBucketSetSize != bucketSetSetSize) {
    -    return false;
    -  }
    -
    -  bool found = false;
    -  for(CacheableHashSet::Iterator currentBucketSetIter = 
currentBucketSet->begin(); currentBucketSetIter != currentBucketSet->end(); 
++currentBucketSetIter) {
    -    found=false;
    -    for(CacheableHashSet::Iterator bucketSetIter = bucketSet->begin(); 
bucketSetIter != bucketSet->end(); ++bucketSetIter) {
    -      if(*currentBucketSetIter == *bucketSetIter) {
    -        found = true;
    -        break;
    -      }
    -    }
    -    if (!found) return false;
    -  }
    -  return true;
    -}
    -
    -HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* 
ClientMetadataService::
    -  pruneNodes(ClientMetadataPtr& metadata, CacheableHashSetPtr&  buckets) {
    -
    -    CacheableHashSetPtr bucketSetWithoutServer = 
CacheableHashSet::create();
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* 
serverToBucketsMap
    -      = new HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>();
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* 
prunedServerToBucketsMap
    -      = new HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>();
    -
    -    for(CacheableHashSet::Iterator bucketId= buckets->begin(); bucketId != 
buckets->end(); ++bucketId) {
    -      CacheableInt32Ptr bID = *bucketId;
    -      std::vector<BucketServerLocationPtr > locations = 
metadata->adviseServerLocations(bID->value());
    -      if (locations.size() == 0) {
    -        LOGDEBUG("ClientMetadataService::pruneNodes Since no server 
location available for bucketId = %d  putting it into bucketSetWithoutServer ", 
bID->value() );
    -        bucketSetWithoutServer->insert(bID);
    -        continue;
    -      }
    -
    -      for (std::vector<BucketServerLocationPtr >::iterator location= 
locations.begin(); location != locations.end(); ++location) {
    -        HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator 
itrRes = serverToBucketsMap->find(*location);
    -        CacheableHashSetPtr bucketSet;
    -        if (itrRes == serverToBucketsMap->end()) {
    -          bucketSet = CacheableHashSet::create();
    -        }
    -        else {
    -          bucketSet = itrRes.second();
    -        }
    -        bucketSet->insert(bID);
    -        serverToBucketsMap->insert(*location, bucketSet);
    -      }
    -    }
    -
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator 
itrRes = serverToBucketsMap->begin();
    -    CacheableHashSetPtr currentBucketSet = CacheableHashSet::create();
    -    BucketServerLocationPtr randomFirstServer;
    -    if(serverToBucketsMap->empty()) {
    -      LOGDEBUG("ClientMetadataService::pruneNodes serverToBucketsMap is 
empty so returning NULL");      
    -      delete prunedServerToBucketsMap;
    -      delete serverToBucketsMap;
    -      return NULL;
    -    }
    -    else {
    -      size_t size = serverToBucketsMap->size();
    -      LOGDEBUG("ClientMetadataService::pruneNodes Total size of 
serverToBucketsMap = %d ", size);
    -      for ( size_t idx = 0; idx < (rand() % size); idx++) {
    -        itrRes++;
    -      }
    -      randomFirstServer = itrRes.first();
    -    }
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator 
itrRes1 = serverToBucketsMap->find(randomFirstServer);
    -    CacheableHashSetPtr bucketSet = itrRes1.second();
    -
    -    for(CacheableHashSet::Iterator bt = bucketSet->begin(); bt != 
bucketSet->end(); ++bt) {
    -      currentBucketSet->insert(*bt);
    -    }
    -    prunedServerToBucketsMap->insert(randomFirstServer, bucketSet);
    -    serverToBucketsMap->erase(randomFirstServer);
    -
    -    while(!AreBucketSetsEqual(currentBucketSet, buckets)) {
    -      BucketServerLocationPtr server = findNextServer(serverToBucketsMap, 
currentBucketSet);
    -      if (server == NULLPTR) {
    -        LOGDEBUG("ClientMetadataService::pruneNodes findNextServer 
returned no server");
    -        break;
    -      }
    -
    -      HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator 
itrRes2 = serverToBucketsMap->find(server);
    -      CacheableHashSetPtr bucketSet2 = itrRes2.second();
    -
    -      LOGDEBUG("ClientMetadataService::pruneNodes currentBucketSet->size() 
= %d  bucketSet2->size() = %d ", currentBucketSet->size(), bucketSet2->size());
    -
    -      for(CacheableHashSet::Iterator currentBucketSetIter = 
currentBucketSet->begin(); currentBucketSetIter != currentBucketSet->end(); 
++currentBucketSetIter) {        
    -        bucketSet2->erase(*currentBucketSetIter);
    -        LOGDEBUG("ClientMetadataService::pruneNodes bucketSet2->size() = 
%d ",  bucketSet2->size());        
    -      }
    -
    -      if (bucketSet2->empty()) {
    -        LOGDEBUG("ClientMetadataService::pruneNodes bucketSet2 is empty() 
so removing server from serverToBucketsMap");
    -        serverToBucketsMap->erase(server);
    -        continue;
    -      }
    -
    -      for(CacheableHashSet::Iterator itr = bucketSet2->begin(); itr != 
bucketSet2->end(); ++itr) {
    -        currentBucketSet->insert(*itr);
    -      }
    -
    -      prunedServerToBucketsMap->insert(server, bucketSet2);
    -      serverToBucketsMap->erase(server);
    -    }
    -
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator 
itrRes2 = prunedServerToBucketsMap->begin();
    -    for(CacheableHashSet::Iterator itr = bucketSetWithoutServer->begin(); 
itr != bucketSetWithoutServer->end(); ++itr) {
    -      CacheableInt32Ptr buckstId = *itr;
    -      itrRes2.second()->insert(buckstId);
    -    }
    -
    -    delete serverToBucketsMap;
    -    return prunedServerToBucketsMap;
    -}
    -
    -HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* 
ClientMetadataService::
    -  groupByServerToAllBuckets(const RegionPtr& region, bool 
optimizeForWrite) {
    -
    -    ClientMetadataPtr cptr = getClientMetadata(region->getFullPath());
    -    if (cptr == NULLPTR ) {
    -      enqueueForMetadataRefresh(region->getFullPath(), false);
    -      return NULL;
    -    }
    -    int totalBuckets = cptr->getTotalNumBuckets();
    -    CacheableHashSetPtr bucketSet = CacheableHashSet::create();
    -    for(int i=0; i<totalBuckets; i++) {
    -      bucketSet->insert(CacheableInt32::create(i));
    -    }
    -    return groupByServerToBuckets(cptr, bucketSet, optimizeForWrite);
    -}
    -
    -HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* 
ClientMetadataService::
    -  groupByServerToBuckets(ClientMetadataPtr& metadata, CacheableHashSetPtr& 
bucketSet, bool optimizeForWrite) {
    -    if (optimizeForWrite) {
    -      HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* 
serverToBucketsMap
    -        = new HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>();
    -      CacheableHashSetPtr bucketsWithoutServer = 
CacheableHashSet::create();
    -      for(CacheableHashSet::Iterator itr = bucketSet->begin(); itr != 
bucketSet->end(); ++itr ) {
    -        CacheableInt32Ptr bucketId = *itr;
    -        BucketServerLocationPtr serverLocation = 
metadata->advisePrimaryServerLocation(bucketId->value());
    -        if (serverLocation == NULLPTR) {
    -          bucketsWithoutServer->insert(bucketId);
    -          continue;
    -        }
    -        else if(!serverLocation->isValid()) {
    -          bucketsWithoutServer->insert(bucketId);
    -          continue;
    -        }
    -        HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator 
itrRes = serverToBucketsMap->find(serverLocation);
    -        CacheableHashSetPtr buckets;
    -        if(itrRes == serverToBucketsMap->end()) {
    -          buckets = CacheableHashSet::create();
    -          serverToBucketsMap->insert(serverLocation, buckets);
    -        }
    -        else {
    -          buckets = itrRes.second();
    -        }
    -        buckets->insert(bucketId);
    -      }
    -
    -      if (!serverToBucketsMap->empty()) {
    -        HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator 
itrRes = serverToBucketsMap->begin();
    -        for(CacheableHashSet::Iterator itr = 
bucketsWithoutServer->begin(); itr != bucketsWithoutServer->end(); ++itr ) {
    -          itrRes.second()->insert(*itr);
    -          LOGDEBUG("ClientMetadataService::groupByServerToBuckets 
inserting bucketsWithoutServer");
    -        }
    -      }
    -      return serverToBucketsMap;
    -    }
    -    else {
    -      return pruneNodes(metadata, bucketSet);
    -    }
    -}
    -
    -void 
ClientMetadataService::markPrimaryBucketForTimeoutButLookSecondaryBucket(const 
RegionPtr& region, const CacheableKeyPtr& key,
    -    const CacheablePtr& value,const UserDataPtr& aCallbackArgument, bool 
isPrimary, BucketServerLocationPtr& serverLocation,  int8_t& version)
    -{
    -  if(m_bucketWaitTimeout == 0)
    -    return;
    -
    -  WriteGuard guard( m_PRbucketStatusLock ); 
    -
    -  std::map<std::string, PRbuckets*>::iterator  bs = 
m_bucketStatus.find(region->getFullPath());
    -
    -  PRbuckets* prBuckets = NULL;
    -  if(bs != m_bucketStatus.end())
    -  {
    -    prBuckets = bs->second;    
    -  }
    -
    -  if(prBuckets == NULL)
    -    return;
    -
    -  getBucketServerLocation(region, key, value, aCallbackArgument, true, 
serverLocation, version);
    -
    -
    -  ClientMetadataPtr cptr = NULLPTR;
    -  {   
    -   ReadGuard guard( m_regionMetadataLock );
    -   RegionMetadataMapType::iterator cptrIter = 
m_regionMetaDataMap.find(region->getFullPath());
    -
    -   if(cptrIter != m_regionMetaDataMap.end())
    -   {
    -     cptr = cptrIter->second;
    -   }
    -
    -   if (cptr == NULLPTR ) {
    -     return ;
    -   }
    -  }
    -
    -  LOGFINE("Setting in markPrimaryBucketForTimeoutButLookSecondaryBucket");
    -
    -  int32_t totalBuckets = cptr->getTotalNumBuckets();
    -
    -  for(int32_t i =0; i< totalBuckets; i++)
    -  {
    -    int8_t version;
    -    BucketServerLocationPtr bsl;
    -    cptr->getServerLocation(i, false, bsl, version);
    -
    -    if(bsl == serverLocation)
    -    {
    -      prBuckets->setBucketTimeout(i);
    -      LOGFINE("markPrimaryBucketForTimeoutButLookSecondaryBucket::setting 
bucket timeout...");
    -    }
    -  }
    -
    -}
    -
    -bool ClientMetadataService::isBucketMarkedForTimeout(const char * 
regionFullPath, int32_t bucketid)
    -{
    -  if(m_bucketWaitTimeout == 0)
    -    return false;
    -
    -  ReadGuard guard( m_PRbucketStatusLock );
    -  
    -  std::map<std::string, PRbuckets*>::iterator  bs = 
m_bucketStatus.find(regionFullPath);
    -
    -  if(bs != m_bucketStatus.end())
    -  {
    -    bool m = bs->second->isBucketTimedOut(bucketid, m_bucketWaitTimeout);
    -    if(m == true) {
    -      ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( 
m_pool.ptr( ) ); 
    -      CacheImpl * cache = tcrdm->getConnectionManager().getCacheImpl();
    -      cache->setBlackListBucketTimeouts();
    -    }
    -    LOGFINE("isBucketMarkedForTimeout:: for bucket %d returning = %d", 
bucketid, m); 
    -    
    -    return m;
    -  }
    -
    -  return false;
    -}
    -
    -}
    -
    
+/*========================================================================= * 
Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. * This 
product is protected by U.S. and international copyright * and intellectual 
property laws. Pivotal products are covered by * one or more patents listed at 
http://www.pivotal.io/patents. 
*========================================================================= 
*/#include "TcrMessage.hpp"#include "ClientMetadataService.hpp"#include 
"ThinClientPoolDM.hpp"#include "../FixedPartitionResolver.hpp"#include 
<iterator>#include <cstdlib>#include <climits>namespace gemfire {  const char * 
ClientMetadataService::NC_CMDSvcThread = "NC CMDSvcThread";  
ClientMetadataService::~ClientMetadataService ()  {    delete m_regionQueue;    
if(m_bucketWaitTimeout > 0)    {      try      {        std::map<std::string, 
PRbuckets*>::iterator  bi;        for (bi = m_bucketStatus.begin(); bi != 
m_bucketStatus.end(); ++
 bi)        {          delete bi->second;        }      }catch(...)      {      
  LOGINFO("Exception in ClientMetadataService destructor");      }    }  }  
ClientMetadataService::ClientMetadataService(PoolPtr pool)    /* adongre    * 
CID 28928: Uninitialized scalar field (UNINIT_CTOR)    */    : m_run(false)  {  
  m_regionQueue = new Queue<std::string>(false);    m_pool = pool;    
m_bucketWaitTimeout = 
DistributedSystem::getSystemProperties()->bucketWaitTimeout();  }  int 
ClientMetadataService::svc() {    
DistributedSystemImpl::setThreadName(NC_CMDSvcThread);    
LOGINFO("ClientMetadataService started for pool %s", m_pool->getName());     
while (m_run) {      m_regionQueueSema.acquire( );      ThinClientPoolDM* tcrdm 
= dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );       CacheImpl * cache = 
tcrdm->getConnectionManager().getCacheImpl();      while(true)      {        
std::string * regionFullPath = m_regionQueue->get
 ();        if(regionFullPath != NULL && regionFullPath->c_str() != NULL )      
  {          while(true)           {            if(m_regionQueue->size() > 0)   
          {              std::string * nextRegionFullPath = 
m_regionQueue->get();              if(nextRegionFullPath != NULL && 
nextRegionFullPath->c_str() != NULL && 
regionFullPath->compare(nextRegionFullPath->c_str()) == 0)              {       
         delete nextRegionFullPath;//we are going for same              }       
       else              {                //different region; put it back       
         m_regionQueue->put(nextRegionFullPath);                break;          
    }            } else              break;          }        }        if 
(!cache->isCacheDestroyPending() && regionFullPath != NULL && 
regionFullPath->c_str() != NULL) {          
getClientPRMetadata(regionFullPath->c_str());          delete regionFullPath;   
       regionFullPath = NULL;      
   }        else        {          delete regionFullPath;          
regionFullPath = NULL;          break;        }      }      
//while(m_regionQueueSema.tryacquire( ) != -1); // release all    }    
LOGINFO("ClientMetadataService stopped for pool %s", m_pool->getName());    
return 0;  }  void  ClientMetadataService::getClientPRMetadata(const char * 
regionFullPath) {    if (regionFullPath == NULL) return;    ThinClientPoolDM* 
tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );    if(tcrdm == 
NULL)    {      throw IllegalArgumentException("ClientMetaData: pool cast to 
ThinClientPoolDM failed");    }    // That means metadata for the region not 
found, So only for the first time for a particular region use 
GetClientPartitionAttributesOp    // TcrMessage to fetch the metadata and put 
it into map for later use.send this message to server and get metadata from 
server.        TcrMessage reply;    std::string path(regionFullPath);    Clie
 ntMetadataPtr cptr = NULLPTR;  {         ReadGuard guard( m_regionMetadataLock 
);    RegionMetadataMapType::iterator itr = m_regionMetaDataMap.find(path);    
if (itr != m_regionMetaDataMap.end()) {      cptr = itr->second;    }      
//cptr = m_regionMetaDataMap[path];   }    ClientMetadataPtr newCptr = NULLPTR; 
     if (cptr == NULLPTR) {        TcrMessage 
request(TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES, regionFullPath);        
GfErrType err = tcrdm->sendSyncRequest(request, reply);                if (err 
== GF_NOERR && reply.getMessageType() == 
TcrMessage::RESPONSE_CLIENT_PARTITION_ATTRIBUTES) {          cptr = new 
ClientMetadata(reply.getNumBuckets(), reply.getColocatedWith(), tcrdm, 
reply.getFpaSet());          if(m_bucketWaitTimeout > 0 && 
reply.getNumBuckets() > 0)          {            WriteGuard guard( 
m_PRbucketStatusLock );            m_bucketStatus[regionFullPath] = new 
PRbuckets(reply.getNumBuckets());          }          LOGDEBUG("ClientMetadata b
 uckets %d ", reply.getNumBuckets());        }              }    if (cptr == 
NULLPTR) {      return;    }    CacheableStringPtr colocatedWith;    
colocatedWith = cptr->getColocatedWith();    if (colocatedWith == NULLPTR) {    
       newCptr = SendClientPRMetadata(regionFullPath,cptr);      //now we will 
get new instance so assign it again      if(newCptr != NULLPTR)       {        
cptr->setPreviousone(NULLPTR);        newCptr->setPreviousone(cptr);        
WriteGuard guard( m_regionMetadataLock );        m_regionMetaDataMap[path] = 
newCptr;               LOGFINE("Updated client meta data");      }    }    else 
{      newCptr = SendClientPRMetadata(colocatedWith->asChar(),cptr);            
   if(newCptr != NULLPTR)       {        cptr->setPreviousone(NULLPTR);        
newCptr->setPreviousone(cptr);        //now we will get new instance so assign 
it again        WriteGuard guard( m_regionMetadataLock );        
m_regionMetaDataMap[colocatedWith->
 asChar()] = newCptr;        m_regionMetaDataMap[path] = newCptr;               
LOGFINE("Updated client meta data");      }    }      }  ClientMetadataPtr 
ClientMetadataService::SendClientPRMetadata(const char* regionPath, 
ClientMetadataPtr cptr) {    ThinClientPoolDM* tcrdm = dynamic_cast< 
ThinClientPoolDM* >( m_pool.ptr( ) );    if(tcrdm == NULL)    {      throw 
IllegalArgumentException("ClientMetaData: pool cast to ThinClientPoolDM 
failed");    }    TcrMessage request(TcrMessage::GET_CLIENT_PR_METADATA, 
regionPath);    TcrMessage reply;    // send this message to server and get 
metadata from server.    LOGFINE("Now sending GET_CLIENT_PR_METADATA for 
getting from server: %s",regionPath);    RegionPtr region = NULLPTR;        
GfErrType err = tcrdm->sendSyncRequest(request, reply);        if (err == 
GF_NOERR && reply.getMessageType() == TcrMessage::RESPONSE_CLIENT_PR_METADATA) 
{      tcrdm->getConnectionManager().getCacheImpl()->getRegion(regionPath, 
region)
 ;            if (region != NULLPTR) {               LocalRegion* lregion = 
dynamic_cast<LocalRegion*> (region.ptr());                  
lregion->getRegionStats()->incMetaDataRefreshCount();              }            
std::vector<BucketServerLocationsType>* metadata = reply.getMetadata();      
if(metadata == NULL)        return NULLPTR;      if ( metadata->empty()) {      
  delete metadata;        return NULLPTR;      }      ClientMetadata* newCptr = 
new ClientMetadata(*(cptr.ptr()));      for 
(std::vector<BucketServerLocationsType>::iterator iter = metadata->begin();     
   iter != metadata->end(); ++iter) {                if (!(*iter).empty()) {    
        newCptr->updateBucketServerLocations((*iter).at(0)->getBucketId(), 
(*iter));                             }      }      delete metadata;      
ClientMetadataPtr newCMDPtr(newCptr);      return newCMDPtr;    }    return 
NULLPTR;  }  void ClientMetadataService::getBucketServerLocation
 (const RegionPtr& region, const CacheableKeyPtr& key,    const CacheablePtr& 
value,const UserDataPtr& aCallbackArgument, bool isPrimary, 
BucketServerLocationPtr& serverLocation,  int8_t& version)  {    //ACE_Guard< 
ACE_Recursive_Thread_Mutex > guard( m_regionMetadataLock );        if (region 
!= NULLPTR) {       ReadGuard guard( m_regionMetadataLock );        
LOGDEBUG("ClientMetadataService::getBucketServerLocation m_regionMetaDataMap 
size is %d",m_regionMetaDataMap.size());       std::string 
path(region->getFullPath());    ClientMetadataPtr cptr = NULLPTR;    
RegionMetadataMapType::iterator itr = m_regionMetaDataMap.find(path);    if 
(itr != m_regionMetaDataMap.end()) {      cptr = itr->second;    }      
//ClientMetadataPtr cptr = m_regionMetaDataMap[path];      if (cptr == NULLPTR) 
{        //serverLocation = BucketServerLocation();        return;      }      
CacheableKeyPtr resolvekey;      const PartitionResolverPtr& resolver = 
region->getAttributes()->getParti
 tionResolver();            EntryEvent event(region, key, value, NULLPTR, 
aCallbackArgument, false);      int bucketId = 0;      if (resolver == NULLPTR) 
{        resolvekey = key;      }      else {        resolvekey = 
resolver->getRoutingObject(event);        if (resolvekey == NULLPTR) {          
throw IllegalStateException("The RoutingObject returned by PartitionResolver is 
null.");        }      }      CacheableHashSetPtr& partNames = 
cptr->getFixedPartitionNames();      if (partNames != NULLPTR && 
partNames->size() > 0) {                FixedPartitionResolverPtr fpResolver( 
dynamic_cast<FixedPartitionResolver*> (resolver.ptr() ));        if(fpResolver 
!= NULLPTR) {                 const char* partition = 
fpResolver->getPartitionName(event, partNames);                  if ( partition 
== NULL ) {            throw IllegalStateException("partition name returned by 
Partition resolver is null.");          }          else {            bucketI
 d = cptr->assignFixedBucketId(partition, resolvekey);            if ( bucketId 
== -1) {              return;            }                    }        }      } 
           else {        if (cptr->getTotalNumBuckets() > 0 ) {                 
   bucketId = std::abs((int)resolvekey->hashcode() % 
cptr->getTotalNumBuckets());                  }      }      
cptr->getServerLocation(bucketId, isPrimary, serverLocation, version);    }     
 }  void ClientMetadataService::removeBucketServerLocation(BucketServerLocation 
serverLocation)  {    ReadGuard guard( m_regionMetadataLock );    
for(RegionMetadataMapType::iterator regionMetadataIter= 
m_regionMetaDataMap.begin();      
regionMetadataIter!=m_regionMetaDataMap.end();      regionMetadataIter++) {     
   ClientMetadataPtr cptr = (*regionMetadataIter).second;        if (cptr != 
NULLPTR) {          //Yogesh has commented out this as it was causing a SIGV    
      //cptr->removeBucketServerLocation(s
 erverLocation);        }    }  }  ClientMetadataPtr 
ClientMetadataService::getClientMetadata(const char* regionFullPath)  {    
ReadGuard guard( m_regionMetadataLock );    RegionMetadataMapType::iterator 
regionMetadataIter = m_regionMetaDataMap.find(regionFullPath);    if 
(regionMetadataIter != m_regionMetaDataMap.end()) {      return 
(*regionMetadataIter).second;    }    return NULLPTR;  } /*const  
PartitionResolverPtr& ClientMetadataService::getResolver(const RegionPtr& 
region, const CacheableKeyPtr& key,    const UserDataPtr& aCallbackArgument){   
   //const char * regionFullPath = region->getFullPath();      //if 
(regionFullPath != NULL) {        //const RegionAttributesPtr& rAttrsPtr = 
region->getAttributes();        return 
region->getAttributes()->getPartitionResolver();      //}   }*/     
/*BucketServerLocation 
ClientMetadataService::getServerLocation(ClientMetadataPtr cptr, int bucketId, 
bool tryPrimary)  {  LOGFINE("Inside 
 getServerLocation");  if (cptr == NULLPTR) {  LOGDEBUG("MetaData does not 
exist");  return BucketServerLocation();  }  LOGFINE("Ending 
getServerLocation");  return cptr->getServerLocation(bucketId, tryPrimary);  
}*/  void ClientMetadataService::populateDummyServers(const char * regionName, 
ClientMetadataPtr cptr)  {    WriteGuard guard( m_regionMetadataLock );    
m_regionMetaDataMap[regionName] = cptr;  }  void 
ClientMetadataService::enqueueForMetadataRefresh(    const char* 
regionFullPath, int8 serverGroupFlag) {      ThinClientPoolDM* tcrdm = 
dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );      if(tcrdm == NULL)      
{        throw IllegalArgumentException("ClientMetaData: pool cast to 
ThinClientPoolDM failed");      }      RegionPtr region;      
tcrdm->getConnectionManager().getCacheImpl()->getRegion(regionFullPath, 
region);            LocalRegion* lregion = dynamic_cast<LocalRegion*> 
(region.ptr());                lregion->ge
 tRegionStats()->incNonSingleHopCount();//we are here means nonSinglehop      
std::string serverGroup = tcrdm->getServerGroup();          if 
(serverGroup.length() != 0 ) {            
CacheImpl::setServerGroupFlag(serverGroupFlag);        if (serverGroupFlag == 2 
) {          LOGFINER("Network hop but, from within same server-group, so no 
metadata fetch from the server");          return;        }      }      if 
(region != NULLPTR) {               ThinClientRegion* tcrRegion = 
dynamic_cast<ThinClientRegion*> (region.ptr());        {          TryWriteGuard 
guardRegionMetaDataRefresh( tcrRegion->getMataDataMutex(), 
tcrRegion->getMetaDataRefreshed());          if ( 
tcrRegion->getMetaDataRefreshed() ) {            return;          }          
LOGFINE("Network hop so fetching single hop metadata from the server");         
 CacheImpl::setNetworkHopFlag(true);          
tcrRegion->setMetaDataRefreshed(true);                      std::string* 
tempRegion
 Path = new std::string(regionFullPath);          
m_regionQueue->put(tempRegionPath);          m_regionQueueSema.release( );      
  }      }     }  HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>* 
ClientMetadataService::getServerToFilterMap(    const VectorOfCacheableKey* 
keys, const RegionPtr& region, bool isPrimary) {      //const char* 
regionFullPath = region->getFullPath();      ClientMetadataPtr cptr = NULLPTR;  
    {           ReadGuard guard( m_regionMetadataLock );        
RegionMetadataMapType::iterator cptrIter = 
m_regionMetaDataMap.find(region->getFullPath());        if(cptrIter != 
m_regionMetaDataMap.end())        {          cptr = cptrIter->second;        }  
      if (cptr == NULLPTR || keys == NULL) {          // 
enqueueForMetadataRefresh(region->getFullPath());          return NULL;         
 //          //serverLocation = BucketServerLocation();          //          
return;        }      }      //int totalNumberOfBuckets 
 = cptr->getTotalNumBuckets();      HashMapT<BucketServerLocationPtr, 
VectorOfCacheableKeyPtr>* result = new HashMapT<BucketServerLocationPtr, 
VectorOfCacheableKeyPtr>();      VectorOfCacheableKeyPtr keysWhichLeft(new 
VectorOfCacheableKey());      std::map<int, BucketServerLocationPtr> buckets;   
   for (VectorOfCacheableKey::Iterator iter = keys->begin(); iter != 
keys->end(); iter++) {        CacheableKeyPtr key = *iter;        
LOGDEBUG("cmds = %s", key->toString()->toString());        PartitionResolverPtr 
resolver = region->getAttributes()->getPartitionResolver();        
CacheableKeyPtr resolveKey;        if (resolver == NULLPTR) {          // 
client has not registered PartitionResolver          // Assuming even PR at 
server side is not using PartitionResolver          resolveKey = key;        } 
else {          EntryEvent event(region, key, NULLPTR, NULLPTR, NULLPTR, 
false);          resolveKey = resolver->getRoutingObject(event);        } 
        int bucketId = std::abs((int)resolveKey->hashcode() % 
cptr->getTotalNumBuckets());        VectorOfCacheableKeyPtr keyList = NULLPTR;  
      std::map<int, BucketServerLocationPtr>::iterator bucketsIter =          
buckets.find(bucketId);        if (bucketsIter == buckets.end()) {          
int8 version = -1;          //BucketServerLocationPtr serverLocation(new 
BucketServerLocation());          BucketServerLocationPtr serverLocation = 
NULLPTR;          cptr->getServerLocation(bucketId, isPrimary, 
serverLocation,version);          if(serverLocation == NULLPTR)          
{//HItesh:if server not returns all buckets, need to confiem with PR team about 
this why??            keysWhichLeft->push_back(key);            continue;       
   }          else if(!serverLocation->isValid())          {            
keysWhichLeft->push_back(key);            continue;          }          
//if(serverLocation == NULLPTR)          //continue;//hitesh need to fix
           buckets[bucketId] = serverLocation;          
HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>::Iterator itrRes = 
result->find(serverLocation);          //keyList = (*result)[serverLocation];   
       if(itrRes == result->end())          {            keyList = new 
VectorOfCacheableKey();            result->insert(serverLocation, keyList);     
     }          else            keyList = itrRes.second();          
LOGDEBUG("new keylist buckets =%d res = %d", buckets.size(), result->size() );  
      } else {          keyList = (*result)[bucketsIter->second];        }      
  keyList->push_back(key);      }      if(keysWhichLeft->size() > 0 && 
result->size() > 0)      {//add left keys in result        int keyLefts = 
keysWhichLeft->size();        int totalServers = result->size();        int 
perServer = keyLefts/totalServers + 1;        int keyIdx = 0;        
for(HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr
 >::Iterator locationIter = result->begin();          locationIter != 
 >result->end(); locationIter++)        {          VectorOfCacheableKeyPtr keys 
 >= locationIter.second();          for(int i = 0; i < perServer; i++)          
 >{            if(keyIdx < keyLefts)            {              
 >keys->push_back(keysWhichLeft->at(keyIdx++));            }            else    
 >          break;          }          if(keyIdx >= keyLefts)            
 >break;//done        }      }      else if(result->size() == 0)      {//not be 
 >able to map any key        return NULL;//it will force all keys to send to 
 >one server      }      return result;  }  void 
 >ClientMetadataService::markPrimaryBucketForTimeout(const RegionPtr& region, 
 >const CacheableKeyPtr& key,    const CacheablePtr& value,const UserDataPtr& 
 >aCallbackArgument, bool isPrimary, BucketServerLocationPtr& serverLocation,  
 >int8_t& version)  {    if(m_bucketWaitTimeout == 0)      return;    W
 riteGuard guard( m_PRbucketStatusLock );     getBucketServerLocation(region, 
key, value, aCallbackArgument, false/*look for secondary host*/, 
serverLocation, version);    if(serverLocation != NULLPTR && 
serverLocation->isValid())    {      LOGDEBUG("Server host and port are %s:%d", 
serverLocation->getServerName().c_str(), serverLocation->getPort());      
int32_t bId = serverLocation->getBucketId();      std::map<std::string, 
PRbuckets*>::iterator  bs = m_bucketStatus.find(region->getFullPath());      
if(bs != m_bucketStatus.end())      {        bs->second->setBucketTimeout(bId); 
       LOGDEBUG("marking bucket %d as timeout ", bId);       }    }    }  
HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* 
ClientMetadataService::groupByBucketOnClientSide(    const RegionPtr& region, 
CacheableVectorPtr* keySet, ClientMetadataPtr& metadata) {      
HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* bucketToKeysMap = new 
HashMapT<CacheableInt32Ptr, Ca
 cheableHashSetPtr>();      for (CacheableVector::Iterator itr = 
(*keySet)->begin(); itr != (*keySet)->end(); ++itr) {        CacheableKeyPtr 
key = dynCast<CacheableKeyPtr> (*itr);        PartitionResolverPtr resolver = 
region->getAttributes()->getPartitionResolver();        CacheableKeyPtr 
resolvekey;        EntryEvent event(region, key, NULLPTR, NULLPTR, NULLPTR, 
false);        int bucketId = -1;        if (resolver == NULLPTR) {          
resolvekey = key;        }        else {          resolvekey = 
resolver->getRoutingObject(event);          if (resolvekey == NULLPTR) {        
    throw IllegalStateException("The RoutingObject returned by 
PartitionResolver is null.");          }        }        CacheableHashSetPtr 
partitionNames = metadata->getFixedPartitionNames();        if (partitionNames 
!= NULLPTR && partitionNames->size() > 0) {          FixedPartitionResolverPtr 
fpResolver( dynamic_cast<FixedPartitionResolver*> (resolver.ptr() ));     
      if(fpResolver != NULLPTR) {            const char* partition = 
fpResolver->getPartitionName(event, partitionNames);            if ( partition 
== NULL ) {              throw IllegalStateException("partition name returned 
by Partition resolver is null.");            }            else {              
bucketId = metadata->assignFixedBucketId(partition, resolvekey);              
if ( bucketId == -1) {                
this->enqueueForMetadataRefresh(region->getFullPath(), 0);              }       
     }          }        }        else {          if 
(metadata->getTotalNumBuckets() > 0 ) {            bucketId = 
std::abs((int)resolvekey->hashcode() % metadata->getTotalNumBuckets());         
 }        }        HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>::Iterator 
iter =  bucketToKeysMap->find(CacheableInt32::create(bucketId));        
CacheableHashSetPtr bucketKeys;        if (iter == bucketToKeysMap->end()) {    
      bucketKeys = CacheableHas
 hSet::create();          
bucketToKeysMap->insert(CacheableInt32::create(bucketId), bucketKeys);        } 
       else {          bucketKeys = iter.second();        }        
bucketKeys->insert(key);      }      return bucketToKeysMap;  
}HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* 
ClientMetadataService::getServerToFilterMapFESHOP(  CacheableVectorPtr* 
routingKeys, const RegionPtr& region, bool isPrimary) {    ClientMetadataPtr 
cptr = getClientMetadata(region->getFullPath());    if (cptr == NULLPTR /*|| 
cptr->adviseRandomServerLocation() == NULLPTR*/) {      
enqueueForMetadataRefresh(region->getFullPath(), 0);      return NULL;    }    
if (routingKeys == NULL) {      return NULL;    }    
HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* bucketToKeysMap = 
groupByBucketOnClientSide(region, routingKeys, cptr);    CacheableHashSetPtr 
bucketSet = CacheableHashSet::create();    for (HashMapT<CacheableInt32Ptr, 
CacheableHashSetP
 tr>::Iterator iter = bucketToKeysMap->begin(); iter != bucketToKeysMap->end(); 
++iter) {      bucketSet->insert(iter.first());    }    
LOGDEBUG("ClientMetadataService::getServerToFilterMapFESHOP: bucketSet size = 
%d ", bucketSet->size());    HashMapT<BucketServerLocationPtr, 
CacheableHashSetPtr>* serverToBuckets = groupByServerToBuckets(cptr, bucketSet, 
isPrimary);    if (serverToBuckets == NULL) {      delete bucketToKeysMap;      
return NULL;    }    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* 
serverToKeysMap = new HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>(); 
   for (HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes 
= serverToBuckets->begin(); itrRes != serverToBuckets->end(); ++itrRes) {      
BucketServerLocationPtr serverLocation = itrRes.first();      
CacheableHashSetPtr buckets = itrRes.second();      
for(CacheableHashSet::Iterator bucket = buckets->begin(); bucket != 
buckets->end(); ++bucket) {
         HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator iter 
= serverToKeysMap->find(serverLocation);        CacheableHashSetPtr keys;       
 if (iter == serverToKeysMap->end()) {          keys = 
CacheableHashSet::create();        }        else {          keys = 
iter.second();        }        HashMapT<CacheableInt32Ptr, 
CacheableHashSetPtr>::Iterator bucketToKeysiter = 
bucketToKeysMap->find(*bucket);        if (bucketToKeysiter !=  
bucketToKeysMap->end()) {          CacheableHashSetPtr bkeys = 
bucketToKeysiter.second();          for (CacheableHashSet::Iterator itr = 
bkeys->begin(); itr != bkeys->end(); ++itr) {            keys->insert(*itr);    
      }        }        serverToKeysMap->insert(serverLocation, keys);      }   
 }    delete bucketToKeysMap;    delete serverToBuckets;    return 
serverToKeysMap;}BucketServerLocationPtr ClientMetadataService::findNextServer  
(HashMapT<BucketServerLocationPtr, CacheableHashSetP
 tr>* serverToBucketsMap, CacheableHashSetPtr& currentBucketSet) {    
BucketServerLocationPtr serverLocation;    int max = -1;    
std::vector<BucketServerLocationPtr> nodesOfEqualSize;    
for(HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itr = 
serverToBucketsMap->begin(); itr != serverToBucketsMap->end(); ++itr) {      
CacheableHashSetPtr buckets = CacheableHashSet::create();      
CacheableHashSetPtr sBuckets = itr.second();      
for(CacheableHashSet::Iterator sItr = sBuckets->begin(); sItr != 
sBuckets->end(); ++sItr) {        buckets->insert(*sItr);      }      
LOGDEBUG("ClientMetadataService::findNextServer currentBucketSet->size() = %d  
bucketSet->size() = %d ", currentBucketSet->size(), buckets->size());      
for(CacheableHashSet::Iterator currentBucketSetIter = 
currentBucketSet->begin(); currentBucketSetIter != currentBucketSet->end(); 
++currentBucketSetIter) {                buckets->erase(*currentBucketSetIter); 
       LOGDE
 BUG("ClientMetadataService::findNextServer bucketSet->size() = %d ",  
buckets->size());              }      int size = buckets->size();      if (max 
< size) {        max = size;        serverLocation = itr.first();        
nodesOfEqualSize.clear();        nodesOfEqualSize.push_back(serverLocation);    
  }      else if (max == size){        
nodesOfEqualSize.push_back(serverLocation);      }    }    int nodeSize = 
nodesOfEqualSize.size();    if(nodeSize > 0) {      RandGen randgen;      int 
random = randgen(nodeSize);      return nodesOfEqualSize.at(random);    }    
return NULLPTR;}bool 
ClientMetadataService::AreBucketSetsEqual(CacheableHashSetPtr& 
currentBucketSet, CacheableHashSetPtr& bucketSet) {  int32_t 
currentBucketSetSize = currentBucketSet->size();  int32_t bucketSetSetSize = 
bucketSet->size();  LOGDEBUG("ClientMetadataService::AreBucketSetsEqual 
currentBucketSetSize = %d bucketSetSetSize = %d ", currentBucketSetSize, bu
 cketSetSetSize);  if (currentBucketSetSize != bucketSetS
    --- End diff --
    
    Hi, whitespaces removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to