Hi Zoran,

Please see my comments inline.

BR,

Hung Nguyen - DEK Technologies


--------------------------------------------------------------------------------
From: Zoran Milinkovic [email protected]
Sent: Wednesday, March 23, 2016 11:51PM
To: Neelakanta Reddy
     [email protected]
Cc: Opensaf-devel
     [email protected]
Subject: [devel] [PATCH 2 of 3] imm: add support to IMM service for 
transactional safe read [#48]


  osaf/services/saf/immsv/README             |   44 ++
  osaf/services/saf/immsv/immnd/ImmModel.cc  |  608 +++++++++++++++++++++++++++-
  osaf/services/saf/immsv/immnd/ImmModel.hh  |    7 +
  osaf/services/saf/immsv/immnd/immnd_evt.c  |  169 +++++++-
  osaf/services/saf/immsv/immnd/immnd_init.h |    3 +
  5 files changed, 788 insertions(+), 43 deletions(-)


The patch contains code for IMM service part of transactional safe read

diff --git a/osaf/services/saf/immsv/README b/osaf/services/saf/immsv/README
--- a/osaf/services/saf/immsv/README
+++ b/osaf/services/saf/immsv/README
@@ -2704,6 +2704,50 @@ Bit 5 controls OpenSAF4.5 protocols allo
  Bit 6 controls OpenSAF4.6 protocols allowed or not (normally on/1).
  Bit 7 controls OpenSAF4.7 protocols allowed or not (normally on/1).
  
+
+Safe-read (transactional read) (5.0)
+=============================================
+http://sourceforge.net/p/opensaf/tickets/48/
+
+Adds support for an OM CCB client to read-access a config object 
transactionally.
+The API works exactly the same way as saImmOmAccessorGet, except that
+
+   a) The API takes a SaImmCcbHandleT instead of a SaImmAccessorHandleT
+
+   b) The values returned for the objects config attributes are from a version
+      of the object consistent with the CCB/transaction. This means either the
+      latest applied version or a newer version created in the same CCB but
+      not yet applied.
+
+   c) Access to an object that has been deleted in the same CCB but not applied
+      is rejected with ERR_NOT_EXIST.
+
+   d) Access to an object that has been created in the same CCB but not applied
+      is allowed, providing the latest version of the config attributes in 
that CCB.
+
+   e) Safe read is not allowed using a runtime object as target.
+
+Runtime attributes residing in a config object are handled exactly the same as 
for
+saImmOmAccessorGet. The reason a safe-read call is not allowed on a runtime 
*object*
+is that a runtime object *only* contains runtime attributes. Performing a 
safe-read
+on a runtime object makes no sense.
+
+
+saImmOmCcbObjectRead(SaImmCcbHandleT ccbHandle, /* in */
+                     SaConstStringT objectName, /* in */
+                     const SaImmAttrNameT *attributeNames, /* in */
+                     SaImmAttrValuesT_2 ***attributes); /* out */
+
+
+Return Values :   SA_AIS_ERR_BUSY - The object targeted by the request is 
already
+                  the target of conflicting/exclusive operation in another CCB.
+                  The conflicting operation would be a modify or delete.
+
+                  SA_AIS_ERR_INVALID_PARAM - The objectName parameter 
identifies a
+                  runtime object.
+
+                  Returncodes otherwise identical to saImmOmAccessorGet.
+
  ----------------------------------------
  DEPENDENCIES
  ============
diff --git a/osaf/services/saf/immsv/immnd/ImmModel.cc 
b/osaf/services/saf/immsv/immnd/ImmModel.cc
--- a/osaf/services/saf/immsv/immnd/ImmModel.cc
+++ b/osaf/services/saf/immsv/immnd/ImmModel.cc
@@ -36,8 +36,8 @@
  #define SEARCH_TIMEOUT_SEC 600 /* Search timeout */
  
  // Same strings exists in immnd_evt.c
-#define IMM_VALIDATION_ABORT   "IMM: Validation abort: "
-#define IMM_RESOURCE_ABORT             "IMM: Resource abort: "
+#define IMM_VALIDATION_ABORT  "IMM: Validation abort: "
+#define IMM_RESOURCE_ABORT    "IMM: Resource abort: "
  
  struct ContinuationInfo2
  {
@@ -203,6 +203,16 @@ typedef SaUint32T ImmObjectFlags;
  // c) modifies a NO_DANGLING attribute,
  // d) deletes an object that is referred to by some NO_DANGLING attribute.
  
+#define IMM_SHARED_READ_LOCK 0x00000100
+//If shared read lock is on, it signifies that one or more ccbs have locked the
+//object for safe-read. This prevents the object from being modified or deleted
+//by any other ccb until safe-readers have released the read lock.
+//If only one single ccb has a read lock on an object, i.e. the lock is 
currently
+//not actually shared, then that ccb may upgrade the read-lock to an exclusive 
lock
+//for delete or modify.
+//See also the data structure sObjectReadLockCount.
+
+
  
  struct ObjectInfo
  {
@@ -217,7 +227,8 @@ struct ObjectInfo
      void             getAdminOwnerName(std::string *str) const;
      
      ImmAttrValue*    mAdminOwnerAttrVal; //Pointer INTO mAttrValueMap
-    SaUint32T        mCcbId;
+    SaUint32T        mCcbId;   //Zero => may be read-locked 
see:IMM_SHARED_READ_LOCK
+                               //Nonzero => may be exclusive lock if id is 
active ccb
      ImmAttrValueMap  mAttrValueMap; //<-Each ImmAttrValue needs explicit 
delete
      ClassInfo*       mClassInfo;    //<-Points INTO ClassMap. Not own copy!
      ImplementerInfo* mImplementer;  //<-Points INTO ImplementerVector
@@ -332,6 +343,8 @@ struct AugCcbParent
      SaUint32T         mAugmentAdmo; /* Aug admo with ROF==true for aug 
ccbCreates #2428 */
  };
  
+static ObjectShortCountMap sObjectReadLockCount; //Needs to be declared above 
CcbInfo
+
  struct CcbInfo
  {
      CcbInfo(): mId(0), mAdminOwnerId(0), mCcbFlags(0), mOriginatingConn(0),
@@ -340,6 +353,11 @@ struct CcbInfo
                 mErrorStrings(NULL), mAugCcbParent(NULL) {}
      bool isOk() {return mVeto == SA_AIS_OK;}
      bool isActive() {return (mState < IMM_CCB_COMMITTED);}
+    void addObjReadLock(ObjectInfo* obj, std::string& objName);
+    void removeObjReadLock(ObjectInfo* obj, bool removeObject = true);
+    void removeAllObjReadLocks(); // Removes all locks held by *this* ccb.
+    bool isReadLockedByThisCcb(ObjectInfo* obj);
+
      SaUint32T         mId;
      SaUint32T         mAdminOwnerId;
      SaUint32T         mCcbFlags;
@@ -357,9 +375,97 @@ struct CcbInfo
      ImplementerSet    mLocalAppliers;
      ImmsvAttrNameList* mErrorStrings;/*Error strings generated by current op 
*/
      AugCcbParent*     mAugCcbParent;
+    ObjectSet         mSafeReadSet;
  };
  typedef std::vector<CcbInfo*> CcbVector;
  
+void CcbInfo::addObjReadLock(ObjectInfo* obj, std::string& objName)
+{
+    ObjectShortCountMap::iterator oscm;
+    if(obj->mObjFlags & IMM_SHARED_READ_LOCK) {
+        TRACE_7("Object '%s' already locked for safe-read by some ccb(s)", 
objName.c_str());
+        //Read-lock is a shared lock, join the group of lockers.
+    } else {
+        osafassert(!(obj->mObjFlags & IMM_CREATE_LOCK ));
+        osafassert(!(obj->mObjFlags & IMM_DELETE_LOCK ));
+        /* IMM_RT_UPDATE_LOCK does not conflict with safe read because it 
deals with
+           PRTO/PRTA updates, i.e. runtime data and not config data.
+        */
+
+        osafassert(obj->mCcbId == 0);
+        /* A non-zero mCcbId for an object indicates exclusive lock 
(create/delete/modify)
+           if and only if the identified ccb is still *active*. A zero mCcbId 
for an object
+           *guarantees* that it is not exclusive locked.
+           This function *requires* that the invoker has done the pre-check of 
no interference
+           of a proposed shared reader with any pre-existing exclusive op and 
zeroed the ccbId
+           if and only if an existing ccbId was no longer identifying an 
active ccb.
+
+           There is no explicit ccb-modify lock since it can be inferred by 
(a) the ccb-id
+           of an object identifying an active CCB and (b) neither the create 
or modify
+           flag being set on the object.
+        */
+
+        obj->mObjFlags |= IMM_SHARED_READ_LOCK;
+    }
+
+    this->mSafeReadSet.insert(obj);  /* Ccb keeps track of its safe-readers. */
+
+    oscm = sObjectReadLockCount.find(obj);
+    if(oscm == sObjectReadLockCount.end()) {
+        sObjectReadLockCount[obj] = 1;
+    } else {
+        // TODO: Should check that we dont increment beyond maxshort.
+        (oscm->second)++;
+        LOG_IN("Incremented read-lock count for %s to %u", objName.c_str(), 
oscm->second);
+    }
+
+}
+
+void CcbInfo::removeAllObjReadLocks()
+{
+    /* Removes all the read-locks set by this ccb.
+       Must be executed with ccb-commit or ccb-abort.
+    */
+    ObjectSet::iterator osi;
+
+    for(osi = this->mSafeReadSet.begin(); osi != this->mSafeReadSet.end(); 
++osi)
+    {
+        removeObjReadLock(*osi, false);
+    }
+    this->mSafeReadSet.clear();
+}
+
+void CcbInfo::removeObjReadLock(ObjectInfo* obj, bool removeObject /* = true*/)
+{
+    ObjectShortCountMap::iterator oscm = sObjectReadLockCount.find(obj);
+    osafassert(oscm != sObjectReadLockCount.end());
+    osafassert(oscm->second > 0);
+    (oscm->second)--;
+    TRACE("CcbInfo::removeObjReadLock decremented safe read count for %p to 
%u",
+        obj, oscm->second);
+    if(oscm->second == 0) {
+        sObjectReadLockCount.erase(obj);
+        TRACE("Object %p no longer has any read locks", obj);
+        obj->mObjFlags &= ~IMM_SHARED_READ_LOCK;
+    } else {
+        TRACE("Object %p still has read lock count of %u", obj, oscm->second);
+    }
+
+    if(removeObject) {
+        this->mSafeReadSet.erase(obj);
+    }
+}
+
+bool CcbInfo::isReadLockedByThisCcb(ObjectInfo* obj)
+{
+    TRACE_ENTER();
+    ObjectSet::iterator osi = this->mSafeReadSet.find(obj);
+    bool result = (osi != this->mSafeReadSet.end());
+    TRACE_LEAVE();
+    return result;
+}
+
+
  struct AdminOwnerInfo
  {
      AdminOwnerInfo(): mId(0), mConn(0), mNodeId(0), mReleaseOnFinalize(false),
@@ -454,7 +560,7 @@ static const std::string saImmOiTimeout(
  
  static SaImmRepositoryInitModeT immInitMode = SA_IMM_INIT_FROM_FILE;
  
-static SaUint32T ccbIdLongDnGuard  = 0; /* Disallow long DN additions if 
longDnsAllowed is being changed in ccb*/
+static SaUint32T sCcbIdLongDnGuard  = 0; /* Disallow long DN additions if 
longDnsAllowed is being changed in ccb*/
  static bool      sIsLongDnLoaded   = false; /* track long DNs before 
opensafImm=opensafImm,safApp=safImmService is created */
  static bool      sAbortNonCriticalCcbs = false; /* Set to true at coord by 
the special imm admin-op to abort ccbs #1107 */
  
@@ -1355,7 +1461,7 @@ immModel_adminOperationInvoke(IMMND_CB *
          implNodeId, pbeExpected, displayRes, cb->mIsCoord);
  
      if(sAbortNonCriticalCcbs && !wasAbortNonCritical) {
-        LOG_IN("ABT cb->mForceClean set to true");
+        TRACE("cb->mForceClean set to true");
          cb->mForceClean = true;
      }
      return err;
@@ -1476,6 +1582,18 @@ immModel_ccbAugmentAdmo(IMMND_CB *cb, Sa
  }
  
  SaAisErrorT
+immModel_objectIsLockedByCcb(IMMND_CB *cb, struct ImmsvOmSearchInit* req)
+{
+   return ImmModel::instance(&cb->immModel)->objectIsLockedByCcb(req);
+}
+
+SaAisErrorT
+immModel_ccbReadLockObject(IMMND_CB *cb, struct ImmsvOmSearchInit* req)
+{
+   return ImmModel::instance(&cb->immModel)->ccbReadLockObject(req);
+}
+
+SaAisErrorT
  immModel_searchInitialize(IMMND_CB *cb, struct ImmsvOmSearchInit* req,
      void** searchOp, SaBoolT isSync, SaBoolT isAccessor)
  {
@@ -2892,7 +3010,7 @@ ImmModel::getLongDnsAllowed(ObjectInfo*
  {
      TRACE_ENTER();
      bool longDnsAllowed = false;
-    if(ccbIdLongDnGuard)  {
+    if(sCcbIdLongDnGuard)  {
          /* A ccb is currently mutating longDnsAllowed */
          return false;
      }
@@ -5856,8 +5974,9 @@ ImmModel::ccbCommit(SaUint32T ccbId, Con
      }//for
      
      ccb->mMutations.clear();
-
-    if(ccbIdLongDnGuard == ccbId) {ccbIdLongDnGuard= 0;}
+    ccb->removeAllObjReadLocks();
+
+    if(sCcbIdLongDnGuard == ccbId) {sCcbIdLongDnGuard = 0;}
      
      //If there are implementers involved then send the final apply callback
      //to them and remove the implementers from the Ccb.
@@ -6069,7 +6188,7 @@ ImmModel::ccbAbort(SaUint32T ccbId, Conn
          }
      }
  
-    if(ccbIdLongDnGuard == ccbId) {ccbIdLongDnGuard= 0;}
+    if(sCcbIdLongDnGuard == ccbId) {sCcbIdLongDnGuard = 0;}
  
      return true;
  }
@@ -6259,6 +6378,7 @@ ImmModel::ccbTerminate(SaUint32T ccbId)
          createsAbortedInCcb.clear();
  
          ccb->mMutations.clear();
+        ccb->removeAllObjReadLocks();
          if(ccb->mImplementers.size()) {
              LOG_WA("Ccb destroyed without notifying some implementers from 
IMMND.");
              CcbImplementerMap::iterator ix;
@@ -6357,14 +6477,20 @@ ImmModel::ccbAugmentInit(immsv_oi_ccb_up
              case IMM_CCB_READY:
              case IMM_CCB_COMMITTED:
              case IMM_CCB_ABORTED:
-            case IMM_CCB_PREPARE:
-            case IMM_CCB_VALIDATING:
              case IMM_CCB_VALIDATED:
              case IMM_CCB_PBE_ABORT:
              case IMM_CCB_CRITICAL:
-                LOG_WA("Ccb Augment attempted in wrong CCB state");
+                LOG_WA("Ccb Augment attempted in wrong CCB state %u", 
ccb->mState);
                  err = SA_AIS_ERR_BAD_OPERATION;
-            goto done;
+                goto done;
+
+            case IMM_CCB_VALIDATING:
+                LOG_IN("Augment CCB in state VALIDATING");
+                goto augment_for_safe_read;
+
+            case IMM_CCB_PREPARE:
+                LOG_IN("Augment CCB in state PREPARE %u",  IMM_CCB_PREPARE);
+                goto augment_for_safe_read;
  
              case IMM_CCB_CREATE_OP:
                  TRACE("Augment CCB in state CREATE_OP");
@@ -6378,7 +6504,6 @@ ImmModel::ccbAugmentInit(immsv_oi_ccb_up
  
              case IMM_CCB_MODIFY_OP:
                  TRACE("Augment CCB in state MODIFY_OP");
-
                  omuti =  ccb->mMutations.find(objectName);
                  if(omuti != ccb->mMutations.end()){
                      obj = omuti->second->mAfterImage;
@@ -6421,9 +6546,11 @@ ImmModel::ccbAugmentInit(immsv_oi_ccb_up
      osafassert(omuti->second->mContinuationId == rsp->inv);
      osafassert(omuti->second->mWaitForImplAck);
  
+ augment_for_safe_read:
+
      TRACE("obj:%p", obj);
-    TRACE("obj->mImplementer:%p", obj->mImplementer);
-    osafassert(obj && obj->mImplementer);
+    //TRACE("obj->mImplementer:%p", obj->mImplementer);
+    //osafassert(obj && obj->mImplementer);
  
  
      if(ccb->mVeto != SA_AIS_OK) {
@@ -6466,12 +6593,15 @@ ImmModel::ccbAugmentInit(immsv_oi_ccb_up
      ccb->mAugCcbParent->mState = ccb->mState;
      ccb->mAugCcbParent->mErrorStrings = ccb->mErrorStrings;
      ccb->mAugCcbParent->mContinuationId = rsp->inv;
-    ccb->mAugCcbParent->mImplId = obj->mImplementer->mId;
+    ccb->mAugCcbParent->mImplId = (obj)?obj->mImplementer->mId:0;
      ccb->mAugCcbParent->mAugmentAdmo = 0;
  
      ccb->mOriginatingConn = originatingConn;
      ccb->mOriginatingNode = originatingNode;
-    ccb->mState = IMM_CCB_READY;
+    if(ccb->mState < IMM_CCB_VALIDATING) {
+        ccb->mState = IMM_CCB_READY;
+    }
+
      ccb->mWaitStartTime = 0;
      ccb->mErrorStrings = NULL;
  
@@ -8491,6 +8621,7 @@ ImmModel::ccbObjectModify(const ImmsvOmC
      ObjectNameSet afimPreOpNDRefs;  // Set of NO_DANGLING references from 
after image before CCB operation
      bool hasNoDanglingRefs = false;
      bool modifiedImmMngt = false;  /* true => modification of the SAF 
immManagement object. */
+    bool upgradeReadLock = false;
      
      if(sz >= SA_MAX_UNEXTENDED_NAME_LENGTH) {
          if(longDnsPermitted) {
@@ -8577,7 +8708,7 @@ ImmModel::ccbObjectModify(const ImmsvOmC
          err = SA_AIS_ERR_BAD_OPERATION;
          goto ccbObjectModifyExit;
      }
-
+
      ccbIdOfObj = object->mCcbId;
      if(ccbIdOfObj != ccbId) {
          i1 = std::find_if(sCcbVector.begin(), sCcbVector.end(),
@@ -8588,6 +8719,25 @@ ImmModel::ccbObjectModify(const ImmsvOmC
              err = SA_AIS_ERR_BUSY;
              goto ccbObjectModifyExit;
          }
+
+        if(ccbIdOfObj == 0) {
+            ObjectShortCountMap::iterator oscm  = 
sObjectReadLockCount.find(object);
+            if(oscm != sObjectReadLockCount.end())
+            {
+                if((oscm->second > 1) || !(ccb->isReadLockedByThisCcb(object)))
+                {
+                    TRACE("ERR_BUSY: object '%s' is safe-read-locked by %u 
ccb(s)",
+                        objectName.c_str(), oscm->second);
+                    err = SA_AIS_ERR_BUSY;
+                    goto ccbObjectModifyExit;
+                } else {
+                    osafassert((oscm->second == 1) && 
(ccb->isReadLockedByThisCcb(object)));
+                    TRACE("CcbModify: Object '%s' safe-read-locked only by 
this ccb (%u) => "
+                         "upgrade to exclusive lock is possible", 
objectName.c_str(), ccbId);
+                    upgradeReadLock = true;
+                }
+            }
+        }
      }
  
      if(object->mObjFlags & IMM_RT_UPDATE_LOCK) {
@@ -9208,12 +9358,12 @@ ImmModel::ccbObjectModify(const ImmsvOmC
  
              /* Check if *this* ccb is attempting to alter longDnsAllowed.*/
              if(longDnsPermitted != longDnsAllowedAfter) {
-                if(ccbIdLongDnGuard) {
+                if(sCcbIdLongDnGuard) {
                      /* This case should never happen since it is guarded by 
regular ccb handling. */
                      setCcbErrorString(ccb, "IMM: ERR_BUSY: Other Ccb (%u) 
already using %s",
-                        ccbIdLongDnGuard, immLongDnsAllowed.c_str());
+                        sCcbIdLongDnGuard, immLongDnsAllowed.c_str());
                      LOG_IN("IMM: ERR_BUSY: Other Ccb (%u) already using %s",
-                        ccbIdLongDnGuard, immLongDnsAllowed.c_str());
+                        sCcbIdLongDnGuard, immLongDnsAllowed.c_str());
                      err = SA_AIS_ERR_BUSY;
                  } else {
                      if(!longDnsAllowedAfter) {
@@ -9281,7 +9431,7 @@ ImmModel::ccbObjectModify(const ImmsvOmC
                          }
                      } /* End of LONG DN check */
  
-                    ccbIdLongDnGuard = ccbId;
+                    sCcbIdLongDnGuard = ccbId;
                  }
              }
          }
@@ -9368,7 +9518,7 @@ ImmModel::ccbObjectModify(const ImmsvOmC
      if(err == SA_AIS_OK) {
          object->mCcbId = ccbId; //Overwrite any old obsolete ccb id.
          osafassert(oMut);
-        if(!chainedOp) {
+        if(!chainedOp) {
              ccb->mMutations[objectName] = oMut;
              if(adminOwner->mReleaseOnFinalize) {
                  //NOTE: this may not be good enough. For a modify the
@@ -9377,6 +9527,9 @@ ImmModel::ccbObjectModify(const ImmsvOmC
              }
          }
          ccb->mOpCount++;
+        if(upgradeReadLock) {
+            ccb->removeObjReadLock(object);
+        }
      } else {
          //err != SA_AIS_OK
          if(ccb->mState == IMM_CCB_MODIFY_OP) {ccb->mState = IMM_CCB_READY;}
@@ -9500,7 +9653,7 @@ ImmModel::ccbObjectDelete(const ImmsvOmC
          setCcbErrorString(ccb, IMM_RESOURCE_ABORT "CCB is not in an expected 
state");
          goto ccbObjectDeleteExit;
      }
-
+
      if(reqConn && (ccb->mOriginatingConn != reqConn)) {
          LOG_NO("ERR_BAD_HANDLE: Missmatch on connection for ccb id %u", 
ccbId);
          err = SA_AIS_ERR_BAD_HANDLE;
@@ -9650,6 +9803,23 @@ ImmModel::deleteObject(ObjectMap::iterat
                  ccb->mId, ccbIdOfObj);
              return SA_AIS_ERR_BUSY;
          }
+
+        if(ccbIdOfObj == 0) {
+            ObjectShortCountMap::iterator oscm = 
sObjectReadLockCount.find(oi->second);
+            if(oscm != sObjectReadLockCount.end()) {
+                if((oscm->second > 1) || 
!(ccb->isReadLockedByThisCcb(oi->second))) {
+                    TRACE("ERR_BUSY: object '%s' is read-locked by %u other 
ccb(s)",
+                        oi->first.c_str(), oscm->second);
+                    osafassert(!doIt);
+                    return SA_AIS_ERR_BUSY;
+                } else if (doIt) {
+                    osafassert((oscm->second == 1) && 
(ccb->isReadLockedByThisCcb(oi->second)));
+                    TRACE("CcbDelete: Object '%s' safe-read-locked only by 
this ccb (%u) => "
+                         "upgrade to exclusive lock is possible", 
oi->first.c_str(), ccb->mId);
+                    ccb->removeObjReadLock(oi->second);
+                }
+            }
+        }
      }
      
      if(oi->second->mClassInfo->mCategory != SA_IMM_CLASS_CONFIG) {
@@ -10400,6 +10570,20 @@ ImmModel::ccbCompletedContinuation(immsv
      }
      ccb = *i1;
  
+    if(ccb->mAugCcbParent &&
+      (ccb->mAugCcbParent->mContinuationId == rsp->inv)) {
+        ccb->mOriginatingConn = ccb->mAugCcbParent->mOriginatingConn;
+        ccb->mOriginatingNode = ccb->mAugCcbParent->mOriginatingNode;
+        ccb->mState = ccb->mAugCcbParent->mState;
+        ccb->mErrorStrings = ccb->mAugCcbParent->mErrorStrings;
+
+        ccb->mAugCcbParent->mErrorStrings = NULL;
+        ccb->mAugCcbParent->mContinuationId = 0;
+        free(ccb->mAugCcbParent);
+        ccb->mAugCcbParent = NULL;
+    }
+
+
      if(ccb->mState == IMM_CCB_VALIDATED) {
          LOG_IN("GOING FROM IMM_CCB_VALIDATED to IMM_CCB_PREPARE Ccb:%u", 
ccbId);
          ccb->mState = IMM_CCB_PREPARE;
@@ -10700,6 +10884,252 @@ ImmModel::ccbObjModifyContinuation(SaUin
      TRACE_LEAVE();
  }
  
+SaAisErrorT
+ImmModel::ccbReadLockObject(const ImmsvOmSearchInit* req)
+{
+    TRACE_ENTER();
+    SaAisErrorT err = SA_AIS_OK;
+    SaUint32T ccbId = req->ccbId;
+    size_t sz = strnlen((char *) req->rootName.buf, 
(size_t)req->rootName.size);
+    std::string objectName((const char*)req->rootName.buf, sz);
+    ObjectMap::iterator i;
+    ObjectInfo* obj = NULL;
+    SaUint32T ccbIdOfObj=0;
+    CcbVector::iterator i1;
+    CcbInfo* ccb = 0;
+
+    if (objectName.empty()) {
+        LOG_NO("ERR_INVALID_PARAM: Empty DN is not allowed");
+        err = SA_AIS_ERR_INVALID_PARAM;
+        goto done;
+    }
+
+    // Validate object name
+    if(! (nameCheck(objectName)||nameToInternal(objectName)) ) {
+        LOG_NO("ERR_INVALID_PARAM: Not a proper object name");
+        err = SA_AIS_ERR_INVALID_PARAM;
+        goto done;
+    }
+
+    i = sObjectMap.find(objectName);
+    if (i == sObjectMap.end()) {
+        //TRACE_7("ERR_NOT_EXIST: Object '%s' does not exist", 
objectName.c_str());
+        err = SA_AIS_ERR_NOT_EXIST;
+        goto done;
+    }
+
+    obj = i->second;
+
+    osafassert(obj->mClassInfo);
+
+    if(obj->mClassInfo->mCategory != SA_IMM_CLASS_CONFIG) {
+        LOG_NO("ERR_INVALID_PARAM: object '%s' is not a configuration object",
+            objectName.c_str());
+        err = SA_AIS_ERR_INVALID_PARAM;
+        goto done;
+    }
+
+    i1 = std::find_if(sCcbVector.begin(), sCcbVector.end(), CcbIdIs(ccbId));
+    if (i1 == sCcbVector.end()) {
+        LOG_NO("ERR_BAD_HANDLE: ccb id %u does not exist", ccbId);
+        err = SA_AIS_ERR_BAD_HANDLE;
+        goto done;
+    }
+    ccb = *i1;
+
+    if(!ccb->isOk()) {
+        LOG_IN("ERR_FAILED_OPERATION: ccb %u is in an error state "
+            "rejecting ccbObjectRead operation ", ccbId);
+        /* !ccb->isOk(), error string with abort reason must already be set */
+        err = SA_AIS_ERR_FAILED_OPERATION;
+        goto done;
+    }
+
+    if(ccb->mState > IMM_CCB_READY) {
+        if(ccb->mAugCcbParent == NULL) {
+            LOG_WA("ERR_BAD_OPERATION: ccb %u is not in an expected state: %u "
+                "rejecting ccbObjectRead operation ", ccbId, ccb->mState);
+            err = SA_AIS_ERR_BAD_OPERATION;
+            goto done;
+        } else {
+            if(ccb->mState < IMM_CCB_CRITICAL) {
+                LOG_IN("SafeRead inside augmentation allowed");
+            } else {
+                LOG_WA("ERR_BAD_OPERATION: ccb %u is not in an expected state: 
%u "
+                    "rejecting ccbObjectRead operation ", ccbId, ccb->mState);
+                err = SA_AIS_ERR_BAD_OPERATION;
+                goto done;
+            }
+        }
+    }
+
+    ccbIdOfObj = obj->mCcbId;
+    if(ccbIdOfObj != ccbId) {
+       i1 = std::find_if(sCcbVector.begin(), sCcbVector.end(),
+            CcbIdIs(ccbIdOfObj));
+        if ((i1 != sCcbVector.end()) && ((*i1)->isActive())) {
+            TRACE_7("ERR_BUSY: ccb id %u differs from active ccb id on object 
%u",
+                ccbId, ccbIdOfObj);
+            err = SA_AIS_ERR_BUSY;
+            goto done;
+        }
+        /* Set ccb-id to zero to signify no exclusive lock set.
+           CcbId of 0 does not mean that a shared read lock must be set for 
the object.
+           It only means that zero or more shared read locks *could* be set 
for this object.
+         */
+        obj->mCcbId = 0;
+    } else {
+        TRACE_7("Safe-read: Object '%s' is currently *exclusive* locked by 
*same* ccb %u",
+            objectName.c_str(), ccbId);
+        /* Read-lock succeeds as a no-op.
+           Actual access will fail if it is delete locked in same ccb.
+           We could fail here for the delete locked case, but it is actually 
not the
+           requested locking for the ccb that fails, it is the 
safe-read-access done later.
+           The safe read of the object will fail with ERR_NOT_EXIST because 
the object is
+           scheduled for delete in the *same* ccb as the reader. If the object 
was locked
+           for delete in a different ccb then the safe-reader in this ccb 
would get ERR_BUSY.
+        */
+        osafassert(err == SA_AIS_OK);
+        goto done;
+    }
+
+    if(ccb->isReadLockedByThisCcb(obj)) {
+        TRACE("Object %s is already safe-read-locked by this ccb %u",
+            objectName.c_str(), ccbId);
+    } else {
+        ccb->addObjReadLock(obj, objectName);
+    }

*[Hung] ccb->isReadLockedByThisCcb(obj) should always return false 
here.****We only enter immModel_ccbReadLockObject() when 
immModel_objectIsLockedByCcb() returns ERR_INTERRUPT.****So the object 
must not be locked by the ccb.****I think we should put an assert 
here.****osafassert(!ccb->isReadLockedByThisCcb(obj));****ccb->addObjReadLock(obj,
 
objectName);*



+
+ done:
+    TRACE_LEAVE();
+    return err;
+}
+
+SaAisErrorT
+ImmModel::objectIsLockedByCcb(const ImmsvOmSearchInit* req)
+{
+    // ##Redo this whole method to simply check ccb and ccb mutation.?
+    TRACE_ENTER();
+    SaAisErrorT err = SA_AIS_OK;
+    SaUint32T ccbId = req->ccbId;
+    size_t sz = strnlen((char *) req->rootName.buf, 
(size_t)req->rootName.size);
+    std::string objectName((const char*)req->rootName.buf, sz);
+    ObjectMap::iterator i;
+    ObjectInfo* obj = NULL;
+    SaUint32T ccbIdOfObj=0;
+    CcbVector::iterator i1;
+    CcbInfo* ccb = 0;
+
+    if (objectName.empty()) {
+        LOG_NO("ERR_INVALID_PARAM: Empty DN is not allowed");
+        err = SA_AIS_ERR_INVALID_PARAM;
+        goto done;
+    }
+
+    // Validate object name
+    if(! (nameCheck(objectName)||nameToInternal(objectName)) ) {
+        LOG_NO("ERR_INVALID_PARAM: Not a proper object name");
+        err = SA_AIS_ERR_INVALID_PARAM;
+        goto done;
+    }
+
+    i = sObjectMap.find(objectName);
+    if (i == sObjectMap.end()) {
+        //TRACE_7("ERR_NOT_EXIST: Object '%s' does not exist", 
objectName.c_str());
+        err = SA_AIS_ERR_NOT_EXIST;
+        goto done;
+    }
+
+    obj = i->second;
+    osafassert(obj->mClassInfo);
+
+    if(obj->mClassInfo->mCategory != SA_IMM_CLASS_CONFIG) {
+        LOG_NO("ERR_INVALID_PARAM: object '%s' is not a configuration object",
+            objectName.c_str());
+        err = SA_AIS_ERR_INVALID_PARAM;
+        goto done;
+    }
+
+    i1 = std::find_if(sCcbVector.begin(), sCcbVector.end(), CcbIdIs(ccbId));
+    if (i1 == sCcbVector.end()) {
+        LOG_NO("ERR_BAD_HANDLE: ccb id %u does not exist", ccbId);
+        err = SA_AIS_ERR_BAD_HANDLE;
+        goto done;
+    }
+    ccb = *i1;
+
+    if(!ccb->isOk()) {
+        LOG_IN("ERR_FAILED_OPERATION: ccb %u is in an error state "
+            "rejecting ccbObjectRead operation ", ccbId);
+        /* !ccb->isOk(), error string with abort reason must already be set */
+        err = SA_AIS_ERR_FAILED_OPERATION;
+        goto done;
+    }
+
+    if(ccb->mState > IMM_CCB_READY) {
+        if(ccb->mAugCcbParent == NULL) {
+            LOG_WA("ERR_BAD_OPERATION: ccb %u is not in an expected state: %u "
+                "rejecting ccbObjectRead operation ", ccbId, ccb->mState);
+            err = SA_AIS_ERR_BAD_OPERATION;
+            goto done;
+        } else {
+            if(ccb->mState < IMM_CCB_CRITICAL) {
+                LOG_IN("SafeRead inside augmentation allowed");
+            } else {
+                LOG_WA("ERR_BAD_OPERATION: ccb %u is not in an expected state: 
%u "
+                    "rejecting ccbObjectRead operation ", ccbId, ccb->mState);
+                err = SA_AIS_ERR_BAD_OPERATION;
+                goto done;
+            }
+        }
+    }
+
+    ccbIdOfObj = obj->mCcbId;
+    if(ccbIdOfObj == ccbId) {
+        TRACE_7("Object '%s' is currently *exclusive* locked by same ccb %u",
+            objectName.c_str(), ccbId);
+        osafassert(err == SA_AIS_OK);
+        goto done;
+    } else if(ccb->isReadLockedByThisCcb(obj)) {
+        TRACE_7("Object '%s' is currently *shared* read locked by same ccb %u",
+            objectName.c_str(), ccbId);
+        osafassert(err == SA_AIS_OK);
+        goto done;
+    } else if(ccbIdOfObj) {
+        //CcbId is not zero.
+        TRACE_7("Object '%s' is currently NOT locked by same ccb %u",
+            objectName.c_str(), ccbId);
+        i1 = std::find_if(sCcbVector.begin(), sCcbVector.end(),
+            CcbIdIs(ccbIdOfObj));
+        if ((i1 != sCcbVector.end()) && ((*i1)->isActive())) {
+            TRACE_7("ERR_BUSY: Safe-read: ccb id %u differs from active ccb id 
on object %u",
+                ccbId, ccbIdOfObj);
+            err = SA_AIS_ERR_BUSY;
+            goto done;
+        }
+        /* clear the obsolete ccb-id */
+        ccbIdOfObj = 0;
+    }
+
+    if(ccb->mAugCcbParent) {
+        LOG_IN("This is a lock check in an augmented CCB");
+    }
+
+    /* Intentionally NO check for admin-owner. SafeRead allows read sharing 
with
+       other CCBs, => no admin-owner exclusivity.
+    */
+
+    if(ccbIdOfObj == 0) {
+        TRACE_7("Object '%s' is currently not locked by any ccb", 
objectName.c_str());

*[Hung] This trace is misleading. We also enter here when the object is 
read-locked by other ccb.****It should be.****TRACE_7("Object '%s' is 
currently not locked by this ccb %u", objectName.c_str(), ccbId);*

+        err = SA_AIS_ERR_INTERRUPT;
+        goto done;
+    }
+
+ done:
+    TRACE_LEAVE();
+    return err;
+}
+
  
  SaAisErrorT
  ImmModel::accessorGet(const ImmsvOmSearchInit* req, ImmSearchOp& op)
@@ -10724,6 +11154,14 @@ ImmModel::accessorGet(const ImmsvOmSearc
      SaImmSearchOptionsT notAllowedOptions = 0LL;
      bool nonExtendedNameCheck = req->searchParam.present > 
ImmOmSearchParameter_PR_oneAttrParam;
      bool checkAttribute = false;
+    bool isSafeRead = (req->ccbId != 0);
+    CcbInfo* ccb = NULL;
+    CcbVector::iterator i1;
+
+
+    if(isSafeRead) {
+        TRACE_7("SafeRead (accessorGet) on Object: '%s'", objectName.c_str());
+    }
  
      if(nonExtendedNameCheck) {
          op.setNonExtendedName();
@@ -10753,13 +11191,100 @@ ImmModel::accessorGet(const ImmsvOmSearc
          TRACE_7("ERR_NOT_EXIST: Object '%s' does not exist", 
objectName.c_str());
          err = SA_AIS_ERR_NOT_EXIST;
          goto accessorExit;
-    } else if(i->second->mObjFlags & IMM_CREATE_LOCK) {
-        TRACE_7("ERR_NOT_EXIST: Object '%s' is being created, but ccb "
-                "or PRTO PBE, not yet applied", objectName.c_str());
-        err = SA_AIS_ERR_NOT_EXIST;
-        goto accessorExit;
-    }
-
+    }
+    if(i->second->mObjFlags & IMM_CREATE_LOCK) {
+        /* Both regular accessor-get and safe-read affected by create lock.*/
+        if(isSafeRead) {
+            if(req->ccbId != i->second->mCcbId) {
+                TRACE_7("ERR_NOT_EXIST: Object '%s' is being created but not 
in this ccb.",
+                    objectName.c_str());
+                err = SA_AIS_ERR_NOT_EXIST;
+                goto accessorExit;
+            }
+            TRACE("req->ccbId == i->second->mCcbId  i.e. same ccb ok for 
safe-read");
+            obj = i->second;
+        } else { /* Regular accessor-get */
+            TRACE_7("ERR_NOT_EXIST: Object '%s' is being created, but ccb "
+                    "or PRTO PBE, not yet applied", objectName.c_str());
+            err = SA_AIS_ERR_NOT_EXIST;
+            goto accessorExit;
+        }
+    } else if(isSafeRead) {
+        /* Interference with unapplied create taken care of above in if-branch 
for
+           both safeRead and regular accessorGet.
+
+           Regular accessor-get is not affected by open delete, modify, or 
safe-reads.
+           It simply accesses the current committed version.
+
+           Safe-read *is* affected by unapplied modify or delete, hence this 
else branch.
+
+           Mdify in same ccb => safe-read must read the after-image-version in 
the same ccb.
+           Delete in same ccb => the object becomes inaccesible 
(ERR_NOT_EXIST) for safe-read.
+           Mofify or delete by *other* ccb => safe-read should have been 
rejected (ERR_BUSY)
+           in objectIsLockedByCcb().
+
+           Safe-read is of course not affected by other safe-read operations 
on this object
+           in same or other ccbs. Latest commited version is read.
+        */
+
+        i1 = std::find_if(sCcbVector.begin(), sCcbVector.end(), 
CcbIdIs(req->ccbId));
+        osafassert(i1 != sCcbVector.end()); /*Already checked in 
objectIsLockedByCcb. */
+        ccb = *i1;
+        if((i->second->mCcbId == 0) && 
!(ccb->isReadLockedByThisCcb(i->second))) {
+            LOG_WA("Safe read on object %s NOT locked by ccb (%u). Should not 
be caught here",
+                objectName.c_str(), req->ccbId);
+            /* If we get here this may be seen as a bug to be be corrected. 
Should perhaps
+               assert here, but be lenient for now and just report "INTERRUPT" 
which should
+               result in an new attempt to lock the object for safe-read.
+               The safe-read lock should have been obtained before reacing 
accessor-get.
+             */
+            err = SA_AIS_ERR_INTERRUPT;  /* Try to lock the object for safe 
read.*/
+            goto accessorExit;
+        }
+
+        if(i->second->mCcbId != req->ccbId) {
+            if(i->second->mCcbId == 0) {
+                TRACE_7("Object %s locked for shared read in ccb (%u) and 
possibly other ccbs",
+                    objectName.c_str(), req->ccbId);
+                osafassert(i->second->mObjFlags & IMM_SHARED_READ_LOCK);
+                obj = i->second; /* Use latest committed version. */
+            } else {
+                LOG_WA("Safe read on object %s exclusive locked by other ccb 
(%u). "
+                    "Should not be caught here", objectName.c_str(), 
i->second->mCcbId);
+                /* If we get here this may be seen as a bug to be be 
corrected. Should perhaps
+                   assert here, but be lenient for now and just report "BUSY".
+                */
+                err = SA_AIS_ERR_BUSY;
+                goto accessorExit;
+            }
+        } else {/* i->second->mCcbId == req->ccbId */
+            osafassert(i->second->mCcbId != 0);
+            TRACE_7("Safe read on object *exclusive* locked by *this* ccb %u", 
req->ccbId);
+
+            /* This can not be a safe-read. */
+            osafassert(!(i->second->mObjFlags & IMM_SHARED_READ_LOCK));
+
+            /* Prior op can not be a create. Determine if it is delete or 
modify */
+
+            if(i->second->mObjFlags & IMM_DELETE_LOCK) {
+                LOG_IN("ERR_NOT_EXIST: Object '%s' is being deleted in same 
ccb %u",
+                    objectName.c_str(), req->ccbId);
+                err = SA_AIS_ERR_NOT_EXIST;
+                goto accessorExit;
+            }
+
+            /* Prior op must be a modify.in same ccb. Use latest afim for 
safe-read. */
+            TRACE_7("Safe read of Object '%s' that was modified in same ccb 
%u",
+                objectName.c_str(), req->ccbId);
+            /* Find mutation */
+            ObjectMutationMap::iterator omuti = 
ccb->mMutations.find(objectName);
+            osafassert(omuti != ccb->mMutations.end());
+            ObjectMutation* oMut = omuti->second;
+            osafassert(oMut->mOpType == IMM_MODIFY);
+            obj = oMut->mAfterImage;
+        }
+    } //if(safeRead)
+
      // Validate scope
      if (scope != SA_IMM_ONE) {
          LOG_NO("ERR_INVALID_PARAM: invalid search scope");
@@ -10784,9 +11309,15 @@ ImmModel::accessorGet(const ImmsvOmSearc
  
      //TODO: Reverse the order of matching attribute names.
      //The class should be searched first since we must return error
-    //if the attribute is not defined in the class of the object.
-
-    obj = i->second;
+    //if the attribute is not defined in the class of the object
+
+    //SafeRead works exactly like accessor get with respect to runtime 
attributes.
+
+    if(!obj) {
+        osafassert(!isSafeRead); //SafeRead checks bypassed somehow above.
+        obj = i->second;
+    }
+
      if(obj->mObjFlags & IMM_DN_INTERNAL_REP) {
          nameToExternal(objectName);
      }
@@ -11093,7 +11624,12 @@ ImmModel::searchInitialize(ImmsvOmSearch
          goto searchInitializeExit;
      }
      
-
+    if(req->ccbId) {
+        LOG_WA("SafeRead ended up in ImmModel::searchInitialize ccb:%u", 
req->ccbId);
+        err = SA_AIS_ERR_LIBRARY;
+        goto searchInitializeExit;
+    }
+
      // Validate root name
      if(! (nameCheck(rootName)||nameToInternal(rootName)) ) {
          LOG_NO("ERR_INVALID_PARAM: Not a proper root name");
diff --git a/osaf/services/saf/immsv/immnd/ImmModel.hh 
b/osaf/services/saf/immsv/immnd/ImmModel.hh
--- a/osaf/services/saf/immsv/immnd/ImmModel.hh
+++ b/osaf/services/saf/immsv/immnd/ImmModel.hh
@@ -79,6 +79,9 @@ typedef std::vector<SaInvocationT> Invoc
  /* Maps an object pointer, to a set of object pointers.*/
  typedef std::multimap<ObjectInfo*, ObjectInfo*> ObjectMMap;
  
+/**/
+typedef std::map<ObjectInfo*, uint16_t> ObjectShortCountMap;
+
  /* Object mutation */
  struct ObjectMutation;
  typedef std::map<std::string, ObjectMutation*> ObjectMutationMap;
@@ -382,6 +385,10 @@ public:
      SaAisErrorT         nextSyncResult(ImmsvOmRspSearchNext** rsp,
                                         ImmSearchOp& op);
      
+    SaAisErrorT         objectIsLockedByCcb(const ImmsvOmSearchInit* req);
+
+    SaAisErrorT         ccbReadLockObject(const ImmsvOmSearchInit* req);
+
      SaAisErrorT         accessorGet(
                                      const ImmsvOmSearchInit* req,
                                      ImmSearchOp& op);
diff --git a/osaf/services/saf/immsv/immnd/immnd_evt.c 
b/osaf/services/saf/immsv/immnd/immnd_evt.c
--- a/osaf/services/saf/immsv/immnd/immnd_evt.c
+++ b/osaf/services/saf/immsv/immnd/immnd_evt.c
@@ -217,10 +217,10 @@ static uint32_t immnd_evt_proc_search_fi
  
  static uint32_t immnd_evt_proc_accessor_get(IMMND_CB *cb, IMMND_EVT *evt, 
IMMSV_SEND_INFO *sinfo);
  
+static uint32_t immnd_evt_proc_safe_read(IMMND_CB *cb, IMMND_EVT *evt, 
IMMSV_SEND_INFO *sinfo);
+
  static uint32_t immnd_evt_proc_mds_evt(IMMND_CB *cb, IMMND_EVT *evt);
  
-/*static uint32_t immnd_evt_immd_new_active(IMMND_CB *cb);*/
-
  static void immnd_evt_ccb_abort(IMMND_CB *cb, SaUint32T ccbId, SaUint32T 
**clientArr,
                SaUint32T * clArrSize, SaUint32T *nodeId);
  
@@ -358,7 +358,8 @@ uint32_t immnd_evt_destroy(IMMSV_EVT *ev
        } else if (evt->info.immnd.type == IMMND_EVT_ND2ND_SEARCH_REMOTE_RSP) {
                freeSearchNext(&evt->info.immnd.info.rspSrchRmte.runtimeAttrs, 
false);
        } else if ((evt->info.immnd.type == IMMND_EVT_A2ND_SEARCHINIT) ||
-               (evt->info.immnd.type == IMMND_EVT_A2ND_ACCESSOR_GET)) {
+               (evt->info.immnd.type == IMMND_EVT_A2ND_ACCESSOR_GET) ||
+               (evt->info.immnd.type == IMMND_EVT_A2ND_OBJ_SAFE_READ)) {
                free(evt->info.immnd.info.searchInit.rootName.buf);
                evt->info.immnd.info.searchInit.rootName.buf = NULL;
                evt->info.immnd.info.searchInit.rootName.size = 0;
@@ -606,6 +607,10 @@ void immnd_process_evt(void)
                rc = immnd_evt_proc_search_next(cb, &evt->info.immnd, 
&evt->sinfo);
                break;
  
+       case IMMND_EVT_A2ND_OBJ_SAFE_READ:
+               rc = immnd_evt_proc_safe_read(cb, &evt->info.immnd, 
&evt->sinfo);
+               break;
+
        case IMMND_EVT_A2ND_ACCESSOR_GET:
                rc = immnd_evt_proc_accessor_get(cb, &evt->info.immnd, 
&evt->sinfo);
                break;
@@ -1795,6 +1800,80 @@ static uint32_t immnd_evt_proc_search_fi
  }
  
  /****************************************************************************
+ * Name          : immnd_evt_proc_safe_read
+ *
+ * Description   : Function to process the saImmOmCcbObjectRead call.
+ *                 This call can in some cases be resolved with success 
locally.
+ *                 This will be the case if the object to be accessed is 
already
+ *                 locked previously by this CCB.
+ *
+ *
+ * Arguments     : IMMND_CB *cb - IMMND CB pointer
+ *                 IMMND_EVT *evt - Received Event structure
+ *                 IMMSV_SEND_INFO *sinfo - sender info
+ *****************************************************************************/
+static uint32_t immnd_evt_proc_safe_read(IMMND_CB *cb, IMMND_EVT *evt, 
IMMSV_SEND_INFO *sinfo)
+{
+       IMMSV_EVT send_evt;
+       uint32_t rc = NCSCC_RC_SUCCESS;
+       SaAisErrorT err = SA_AIS_OK;
+       TRACE_ENTER();
+       memset(&send_evt, 0, sizeof(IMMSV_EVT));
+
+       if(evt->info.searchInit.ccbId == 0) {
+               err = SA_AIS_ERR_LIBRARY;
+               LOG_WA("ERR_LIBRARY: Received zero ccb-id for safe-read");
+               goto error;
+       }
+
+       /* Dive into ImmModel to do locking checks.
+          If already locked in this ccb continue with accessor.
+          If locked by other ccb then reject with ERR_BUSY
+          If not locked, then generate fevs message for locking AND read.
+        */
+
+       err = immModel_objectIsLockedByCcb(cb, &(evt->info.searchInit));
+
+       switch (err) {
+               case SA_AIS_OK:
+                       TRACE("Safe read: Object is locked by this CCB(%u). Go 
ahead and safe-read",
+                               evt->info.searchInit.ccbId);
+                       /* Invoke accessor_get which will send the reply. */
+                       break;
+
+               case SA_AIS_ERR_BUSY:
+                       TRACE("Object is locked by some other CCB than this 
CCB(%u). Reply with BUSY",
+                               evt->info.searchInit.ccbId);
+                       goto error;
+
+               case SA_AIS_ERR_INTERRUPT:
+                       TRACE("Object not locked. Ccb (%u) needs to lock it 
over fevs. Reply with INTERRUPT",
+                               evt->info.searchInit.ccbId);
+                       /* Should result in fevs message to read-lock object. */
+                       goto error;
+
+               default:
+                       TRACE("Unusual error from immModel_isLockedByMe: %u", 
err);

*[Hung] The function is immModel_objectIsLockedByCcb, not 
immModel_isLockedByMe.*

+                       goto error;
+       }
+
+       rc = immnd_evt_proc_accessor_get(cb, evt, sinfo);
+       goto done;
+
+ error:
+       send_evt.type = IMMSV_EVT_TYPE_IMMA;
+       send_evt.info.imma.type = IMMA_EVT_ND2A_IMM_ERROR;
+       send_evt.info.imma.info.errRsp.error = err;
+       send_evt.info.imma.info.errRsp.errStrings = NULL;
+
+       rc = immnd_mds_send_rsp(cb, sinfo, &send_evt);
+
+ done:
+       TRACE_LEAVE();
+       return rc;
+}
+
+/****************************************************************************
   * Name          : immnd_evt_proc_accessor_get
   *
   * Description   : Function to process the saImmOmAccessorGet call.
@@ -1822,7 +1901,13 @@ static uint32_t immnd_evt_proc_accessor_
        /* Search Init */
  
        memset(&send_evt, 0, sizeof(IMMSV_EVT));
-       TRACE_2("ACCESSOR GET:%s", evt->info.searchInit.rootName.buf);
+
+       if(evt->info.searchInit.ccbId != 0) {
+               TRACE_2("SAFE READ :%s CcbId: %u", 
evt->info.searchInit.rootName.buf, evt->info.searchInit.ccbId);
+       } else {
+               TRACE_2("ACCESSOR GET:%s", evt->info.searchInit.rootName.buf);
+       }
+
  
        /*Look up client-node */
        immnd_client_node_get(cb, evt->info.searchInit.client_hdl, &cl_node);
@@ -3205,6 +3290,20 @@ static SaAisErrorT immnd_fevs_local_chec
                }
                break;
  
+       case IMMND_EVT_A2ND_OBJ_SAFE_READ:
+               TRACE("IMMND_EVT_A2ND_OBJ_SAFE_READ noted in 
fevs_local_checks");
+               if(!immModel_protocol50Allowed(cb) || 
immModel_pbeNotWritable(cb)) {
+                       error = SA_AIS_ERR_TRY_AGAIN;
+               }
+               /*
+               Change signature of method to take name and ccbID
+
+               error = immModel_objectIsLockedByCcb(cb, 
&(evt->info.searchInit));
+               if(error == SA_AIS_OK) { error = SA_AIS_ERR_NO_BINDINGS;}
+               */

*[Hung] The above comment should be removed?*

+
+               break;
+
        case IMMND_EVT_A2ND_OBJ_CREATE_2:
          if(!immModel_protocol46Allowed(cb) || immModel_pbeNotWritable(cb)) {
              error = SA_AIS_ERR_TRY_AGAIN;
@@ -3548,7 +3647,6 @@ static SaAisErrorT immnd_fevs_local_chec
                }
                break;
  
-
        case IMMND_EVT_A2ND_PBE_ADMOP_RSP:
                if(fevsReq->sender_count != 0x0) {
                        LOG_WA("ERR_LIBRARY: IMMND_EVT_A2ND_PBE_ADMOP_RSP 
fevsReq->sender_count != 0x0");
@@ -3562,7 +3660,7 @@ static SaAisErrorT immnd_fevs_local_chec
                break;
  
        default:
-               LOG_ER("UNPACK FAILURE, unrecognized message type: %u over 
FEVS",
+               LOG_ER("UNPACK FAILURE, unrecognized message type: %u caught in 
fevs_local_checks",
                        frwrd_evt.info.immnd.type);
                error = SA_AIS_ERR_LIBRARY;
                break;
@@ -4378,6 +4476,58 @@ static void immnd_evt_sync_fevs_base(IMM
  
  }
  
+static void immnd_evt_safe_read_lock(IMMND_CB *cb, IMMND_EVT *evt,
+       SaBoolT originatedAtThisNd, SaImmHandleT clnt_hdl, MDS_DEST reply_dest)
+{
+       IMMSV_EVT send_evt;
+       IMMND_IMM_CLIENT_NODE *cl_node = NULL;
+       SaAisErrorT err = SA_AIS_OK;
+       TRACE_ENTER();
+
+       err = immModel_objectIsLockedByCcb(cb, &(evt->info.searchInit));
+
+       switch (err) {
+               case SA_AIS_OK:
+                       TRACE("safe_read_lock: Object is already locked by this 
CCB(%u)!!",
+                               evt->info.searchInit.ccbId);
+                       break;
+
+               case SA_AIS_ERR_BUSY:
+                       TRACE("Object is locked by some other CCB than this 
CCB(%u). Reply with BUSY",
+                               evt->info.searchInit.ccbId);
+                       break;
+
+               case SA_AIS_ERR_INTERRUPT:
+                       TRACE("Object not locked. Ccb (%u) will now lock it",
+                               evt->info.searchInit.ccbId);
+                       err = immModel_ccbReadLockObject(cb, 
&(evt->info.searchInit));
+                       break;
+
+               default:
+                       TRACE("Unusual error from immModel_isLockedByMe: %u", 
err);

*[Hung] The function is immModel_objectIsLockedByCcb, not 
immModel_isLockedByMe.*

+       }
+
+       if(originatedAtThisNd) {
+               immnd_client_node_get(cb, clnt_hdl, &cl_node);
+               if (cl_node == NULL || cl_node->mIsStale) {
+                       LOG_WA("IMMND - Client went down so no response");
+                       return;
+               }
+               TRACE_2("Send immediate reply to client");
+
+               memset(&send_evt, '\0', sizeof(IMMSV_EVT));
+               send_evt.type = IMMSV_EVT_TYPE_IMMA;
+               send_evt.info.imma.info.errRsp.error = err;
+               send_evt.info.imma.type = IMMA_EVT_ND2A_IMM_ERROR;
+
+               if (immnd_mds_send_rsp(cb, &(cl_node->tmpSinfo), &send_evt) != 
NCSCC_RC_SUCCESS) {
+                       LOG_WA("immnd_evt_class_delete: SENDRSP FAILED TO 
SEND");
+               }
+       }
+
+       TRACE_LEAVE();
+}
+
  
  /****************************************************************************
   * Name          : immnd_evt_pbe_admop_rsp
@@ -5141,7 +5291,7 @@ static void immnd_evt_proc_admop(IMMND_C
        if (originatedAtThisNd) {
                immnd_client_node_get(cb, clnt_hdl, &cl_node);
                if (cl_node == NULL || cl_node->mIsStale) {
-                       LOG_ER("IMMND - Client went down so no response");
+                       LOG_WA("IMMND - Client went down so no response");
                        return;
                }
  
@@ -8293,6 +8443,11 @@ immnd_evt_proc_fevs_dispatch(IMMND_CB *c
                immnd_evt_sync_fevs_base(cb, &frwrd_evt.info.immnd, 
originatedAtThisNd, clnt_hdl, reply_dest);
                break;
  
+       case IMMND_EVT_A2ND_OBJ_SAFE_READ:
+               TRACE("IMMND_EVT_A2ND_OBJ_SAFE_READ Received over fevs");
+               immnd_evt_safe_read_lock(cb, &frwrd_evt.info.immnd, 
originatedAtThisNd, clnt_hdl, reply_dest);
+               break;
+
        default:
                LOG_ER("UNPACK FAILURE, unrecognized message type: %u over 
FEVS", frwrd_evt.info.immnd.type);
                break;
diff --git a/osaf/services/saf/immsv/immnd/immnd_init.h 
b/osaf/services/saf/immsv/immnd/immnd_init.h
--- a/osaf/services/saf/immsv/immnd/immnd_init.h
+++ b/osaf/services/saf/immsv/immnd/immnd_init.h
@@ -202,6 +202,9 @@ extern "C" {
  
        SaAisErrorT immModel_searchInitialize(IMMND_CB *cb, struct 
ImmsvOmSearchInit *req, void **searchOp, SaBoolT isSync, SaBoolT isAccessor);
  
+       SaAisErrorT immModel_objectIsLockedByCcb(IMMND_CB *cb, struct 
ImmsvOmSearchInit *req);
+       SaAisErrorT immModel_ccbReadLockObject(IMMND_CB *cb, struct 
ImmsvOmSearchInit *req);
+
        SaAisErrorT immModel_testTopResult(void *searchOp, SaUint32T 
*implNodeId, SaBoolT *bRtAttrsToFetch);
  
        SaAisErrorT

------------------------------------------------------------------------------
Transform Data into Opportunity.
Accelerate data analysis in your applications with
Intel Data Analytics Acceleration Library.
Click to learn more.
http://pubads.g.doubleclick.net/gampad/clk?id=278785351&iu=/4140
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel


------------------------------------------------------------------------------
Transform Data into Opportunity.
Accelerate data analysis in your applications with
Intel Data Analytics Acceleration Library.
Click to learn more.
http://pubads.g.doubleclick.net/gampad/clk?id=278785471&iu=/4140
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to