Hi,

Since the protocol50 was not implemented when Anders finished the code, the 
comment also contained the check for protocol50.
I didn't have enough time for testing the commented code, so I only uncommented 
the check for protocol50, and left the rest of the code in the comment.

I'm going to check both cases.... the existing one, and the code within the 
comment.

There is one issue that I was unsure before sending the code to the review, and 
you can find it in the inline comment.
It would be good if you can review it.


-----Original Message-----
From: [email protected] [mailto:[email protected]] 
Sent: Tuesday, March 29, 2016 11:09 AM
To: Hung Duc Nguyen; [email protected]; Zoran Milinkovic
Cc: [email protected]
Subject: Re: [devel] [PATCH 2 of 3] imm: add support to IMM service for 
transactional safe read [#48]

Hi Hung,

Your comments seem ok to me.

I am a bit unsure about the commented out code.
I added a comment there below.




>----Ursprungligt meddelande----
>Från : [email protected]
>Datum : 2016-03-29 - 10:39 (CEST)
>Till : [email protected], [email protected]
>Kopia : [email protected]
>Ämne : Re: [devel] [PATCH 2 of 3] imm: add support to IMM service for 
>transactional safe read [#48]
>
>Hi Zoran,
>
>Please see my comments inline.
>
>BR,
>
>Hung Nguyen - DEK Technologies
>
>
>--------------------------------------------------------------------------------
>From: Zoran Milinkovic [email protected]
>Sent: Wednesday, March 23, 2016 11:51PM
>To: Neelakanta Reddy
>     [email protected]
>Cc: Opensaf-devel
>     [email protected]
>Subject: [devel] [PATCH 2 of 3] imm: add support to IMM service for 
>transactional safe read [#48]
>
>
>  osaf/services/saf/immsv/README             |   44 ++
>  osaf/services/saf/immsv/immnd/ImmModel.cc  |  608 
> +++++++++++++++++++++++++++-
>  osaf/services/saf/immsv/immnd/ImmModel.hh  |    7 +
>  osaf/services/saf/immsv/immnd/immnd_evt.c  |  169 +++++++-
>  osaf/services/saf/immsv/immnd/immnd_init.h |    3 +
>  5 files changed, 788 insertions(+), 43 deletions(-)
>
>
>The patch contains code for IMM service part of transactional safe read
>
>diff --git a/osaf/services/saf/immsv/README b/osaf/services/saf/immsv/README
>--- a/osaf/services/saf/immsv/README
>+++ b/osaf/services/saf/immsv/README
>@@ -2704,6 +2704,50 @@ Bit 5 controls OpenSAF4.5 protocols allo
>  Bit 6 controls OpenSAF4.6 protocols allowed or not (normally on/1).
>  Bit 7 controls OpenSAF4.7 protocols allowed or not (normally on/1).
>  
>+
>+Safe-read (transactional read) (5.0)
>+=============================================
>+http://sourceforge.net/p/opensaf/tickets/48/
>+
>+Adds support for an OM CCB client to read-access a config object 
>transactionally.
>+The API works exactly the same way as saImmOmAccessorGet, except that
>+
>+   a) The API takes a SaImmCcbHandleT instead of a SaImmAccessorHandleT
>+
>+   b) The values returned for the objects config attributes are from a version
>+      of the object consistent with the CCB/transaction. This means either the
>+      latest applied version or a newer version created in the same CCB but
>+      not yet applied.
>+
>+   c) Access to an object that has been deleted in the same CCB but not 
>applied
>+      is rejected with ERR_NOT_EXIST.
>+
>+   d) Access to an object that has been created in the same CCB but not 
>applied
>+      is allowed, providing the latest version of the config attributes in 
>that CCB.
>+
>+   e) Safe read is not allowed using a runtime object as target.
>+
>+Runtime attributes residing in a config object are handled exactly the same 
>as for
>+saImmOmAccessorGet. The reason a safe-read call is not allowed on a runtime 
>*object*
>+is that a runtime object *only* contains runtime attributes. Performing a 
>safe-read
>+on a runtime object makes no sense.
>+
>+
>+saImmOmCcbObjectRead(SaImmCcbHandleT ccbHandle, /* in */
>+                     SaConstStringT objectName, /* in */
>+                     const SaImmAttrNameT *attributeNames, /* in */
>+                     SaImmAttrValuesT_2 ***attributes); /* out */
>+
>+
>+Return Values :   SA_AIS_ERR_BUSY - The object targeted by the request is 
>already
>+                  the target of conflicting/exclusive operation in another 
>CCB.
>+                  The conflicting operation would be a modify or delete.
>+
>+                  SA_AIS_ERR_INVALID_PARAM - The objectName parameter 
>identifies a
>+                  runtime object.
>+
>+                  Returncodes otherwise identical to saImmOmAccessorGet.
>+
>  ----------------------------------------
>  DEPENDENCIES
>  ============
>diff --git a/osaf/services/saf/immsv/immnd/ImmModel.cc 
>b/osaf/services/saf/immsv/immnd/ImmModel.cc
>--- a/osaf/services/saf/immsv/immnd/ImmModel.cc
>+++ b/osaf/services/saf/immsv/immnd/ImmModel.cc
>@@ -36,8 +36,8 @@
>  #define SEARCH_TIMEOUT_SEC 600 /* Search timeout */
>  
>  // Same strings exists in immnd_evt.c
>-#define IMM_VALIDATION_ABORT  "IMM: Validation abort: "
>-#define IMM_RESOURCE_ABORT            "IMM: Resource abort: "
>+#define IMM_VALIDATION_ABORT  "IMM: Validation abort: "
>+#define IMM_RESOURCE_ABORT    "IMM: Resource abort: "
>  
>  struct ContinuationInfo2
>  {
>@@ -203,6 +203,16 @@ typedef SaUint32T ImmObjectFlags;
>  // c) modifies a NO_DANGLING attribute,
>  // d) deletes an object that is referred to by some NO_DANGLING attribute.
>  
>+#define IMM_SHARED_READ_LOCK 0x00000100
>+//If shared read lock is on, it signifies that one or more ccbs have locked 
>the
>+//object for safe-read. This prevents the object from being modified or 
>deleted
>+//by any other ccb until safe-readers have released the read lock.
>+//If only one single ccb has a read lock on an object, i.e. the lock is 
>currently
>+//not actually shared, then that ccb may upgrade the read-lock to an 
>exclusive lock
>+//for delete or modify.
>+//See also the data structure sObjectReadLockCount.
>+
>+
>  
>  struct ObjectInfo
>  {
>@@ -217,7 +227,8 @@ struct ObjectInfo
>      void             getAdminOwnerName(std::string *str) const;
>      
>      ImmAttrValue*    mAdminOwnerAttrVal; //Pointer INTO mAttrValueMap
>-    SaUint32T        mCcbId;
>+    SaUint32T        mCcbId;   //Zero => may be read-locked 
>see:IMM_SHARED_READ_LOCK
>+                               //Nonzero => may be exclusive lock if id is 
>active ccb
>      ImmAttrValueMap  mAttrValueMap; //<-Each ImmAttrValue needs explicit 
> delete
>      ClassInfo*       mClassInfo;    //<-Points INTO ClassMap. Not own copy!
>      ImplementerInfo* mImplementer;  //<-Points INTO ImplementerVector
>@@ -332,6 +343,8 @@ struct AugCcbParent
>      SaUint32T         mAugmentAdmo; /* Aug admo with ROF==true for aug 
> ccbCreates #2428 */
>  };
>  
>+static ObjectShortCountMap sObjectReadLockCount; //Needs to be declared above 
>CcbInfo
>+
>  struct CcbInfo
>  {
>      CcbInfo(): mId(0), mAdminOwnerId(0), mCcbFlags(0), mOriginatingConn(0),
>@@ -340,6 +353,11 @@ struct CcbInfo
>                 mErrorStrings(NULL), mAugCcbParent(NULL) {}
>      bool isOk() {return mVeto == SA_AIS_OK;}
>      bool isActive() {return (mState < IMM_CCB_COMMITTED);}
>+    void addObjReadLock(ObjectInfo* obj, std::string& objName);
>+    void removeObjReadLock(ObjectInfo* obj, bool removeObject = true);
>+    void removeAllObjReadLocks(); // Removes all locks held by *this* ccb.
>+    bool isReadLockedByThisCcb(ObjectInfo* obj);
>+
>      SaUint32T         mId;
>      SaUint32T         mAdminOwnerId;
>      SaUint32T         mCcbFlags;
>@@ -357,9 +375,97 @@ struct CcbInfo
>      ImplementerSet    mLocalAppliers;
>      ImmsvAttrNameList* mErrorStrings;/*Error strings generated by current op 
> */
>      AugCcbParent*     mAugCcbParent;
>+    ObjectSet         mSafeReadSet;
>  };
>  typedef std::vector<CcbInfo*> CcbVector;
>  
>+void CcbInfo::addObjReadLock(ObjectInfo* obj, std::string& objName)
>+{
>+    ObjectShortCountMap::iterator oscm;
>+    if(obj->mObjFlags & IMM_SHARED_READ_LOCK) {
>+        TRACE_7("Object '%s' already locked for safe-read by some ccb(s)", 
>objName.c_str());
>+        //Read-lock is a shared lock, join the group of lockers.
>+    } else {
>+        osafassert(!(obj->mObjFlags & IMM_CREATE_LOCK ));
>+        osafassert(!(obj->mObjFlags & IMM_DELETE_LOCK ));
>+        /* IMM_RT_UPDATE_LOCK does not conflict with safe read because it 
>deals with
>+           PRTO/PRTA updates, i.e. runtime data and not config data.
>+        */
>+
>+        osafassert(obj->mCcbId == 0);
>+        /* A non-zero mCcbId for an object indicates exclusive lock 
>(create/delete/modify)
>+           if and only if the identified ccb is still *active*. A zero mCcbId 
>for an object
>+           *guarantees* that it is not exclusive locked.
>+           This function *requires* that the invoker has done the pre-check 
>of no interference
>+           of a proposed shared reader with any pre-existing exclusive op and 
>zeroed the ccbId
>+           if and only if an existing ccbId was no longer identifying an 
>active ccb.
>+
>+           There is no explicit ccb-modify lock since it can be inferred by 
>(a) the ccb-id
>+           of an object identifying an active CCB and (b) neither the create 
>or modify
>+           flag being set on the object.
>+        */
>+
>+        obj->mObjFlags |= IMM_SHARED_READ_LOCK;
>+    }
>+
>+    this->mSafeReadSet.insert(obj);  /* Ccb keeps track of its safe-readers. 
>*/
>+
>+    oscm = sObjectReadLockCount.find(obj);
>+    if(oscm == sObjectReadLockCount.end()) {
>+        sObjectReadLockCount[obj] = 1;
>+    } else {
>+        // TODO: Should check that we dont increment beyond maxshort.
>+        (oscm->second)++;
>+        LOG_IN("Incremented read-lock count for %s to %u", objName.c_str(), 
>oscm->second);
>+    }
>+
>+}
>+
>+void CcbInfo::removeAllObjReadLocks()
>+{
>+    /* Removes all the read-locks set by this ccb.
>+       Must be executed with ccb-commit or ccb-abort.
>+    */
>+    ObjectSet::iterator osi;
>+
>+    for(osi = this->mSafeReadSet.begin(); osi != this->mSafeReadSet.end(); 
>++osi)
>+    {
>+        removeObjReadLock(*osi, false);
>+    }
>+    this->mSafeReadSet.clear();
>+}
>+
>+void CcbInfo::removeObjReadLock(ObjectInfo* obj, bool removeObject /* = 
>true*/)
>+{
>+    ObjectShortCountMap::iterator oscm = sObjectReadLockCount.find(obj);
>+    osafassert(oscm != sObjectReadLockCount.end());
>+    osafassert(oscm->second > 0);
>+    (oscm->second)--;
>+    TRACE("CcbInfo::removeObjReadLock decremented safe read count for %p to 
>%u",
>+        obj, oscm->second);
>+    if(oscm->second == 0) {
>+        sObjectReadLockCount.erase(obj);
>+        TRACE("Object %p no longer has any read locks", obj);
>+        obj->mObjFlags &= ~IMM_SHARED_READ_LOCK;
>+    } else {
>+        TRACE("Object %p still has read lock count of %u", obj, oscm->second);
>+    }
>+
>+    if(removeObject) {
>+        this->mSafeReadSet.erase(obj);
>+    }
>+}
>+
>+bool CcbInfo::isReadLockedByThisCcb(ObjectInfo* obj)
>+{
>+    TRACE_ENTER();
>+    ObjectSet::iterator osi = this->mSafeReadSet.find(obj);
>+    bool result = (osi != this->mSafeReadSet.end());
>+    TRACE_LEAVE();
>+    return result;
>+}
>+
>+
>  struct AdminOwnerInfo
>  {
>      AdminOwnerInfo(): mId(0), mConn(0), mNodeId(0), 
> mReleaseOnFinalize(false),
>@@ -454,7 +560,7 @@ static const std::string saImmOiTimeout(
>  
>  static SaImmRepositoryInitModeT immInitMode = SA_IMM_INIT_FROM_FILE;
>  
>-static SaUint32T ccbIdLongDnGuard  = 0; /* Disallow long DN additions if 
>longDnsAllowed is being changed in ccb*/
>+static SaUint32T sCcbIdLongDnGuard  = 0; /* Disallow long DN additions if 
>longDnsAllowed is being changed in ccb*/
>  static bool      sIsLongDnLoaded   = false; /* track long DNs before 
> opensafImm=opensafImm,safApp=safImmService is created */
>  static bool      sAbortNonCriticalCcbs = false; /* Set to true at coord by 
> the special imm admin-op to abort ccbs #1107 */
>  
>@@ -1355,7 +1461,7 @@ immModel_adminOperationInvoke(IMMND_CB *
>          implNodeId, pbeExpected, displayRes, cb->mIsCoord);
>  
>      if(sAbortNonCriticalCcbs && !wasAbortNonCritical) {
>-        LOG_IN("ABT cb->mForceClean set to true");
>+        TRACE("cb->mForceClean set to true");
>          cb->mForceClean = true;
>      }
>      return err;
>@@ -1476,6 +1582,18 @@ immModel_ccbAugmentAdmo(IMMND_CB *cb, Sa
>  }
>  
>  SaAisErrorT
>+immModel_objectIsLockedByCcb(IMMND_CB *cb, struct ImmsvOmSearchInit* req)
>+{
>+   return ImmModel::instance(&cb->immModel)->objectIsLockedByCcb(req);
>+}
>+
>+SaAisErrorT
>+immModel_ccbReadLockObject(IMMND_CB *cb, struct ImmsvOmSearchInit* req)
>+{
>+   return ImmModel::instance(&cb->immModel)->ccbReadLockObject(req);
>+}
>+
>+SaAisErrorT
>  immModel_searchInitialize(IMMND_CB *cb, struct ImmsvOmSearchInit* req,
>      void** searchOp, SaBoolT isSync, SaBoolT isAccessor)
>  {
>@@ -2892,7 +3010,7 @@ ImmModel::getLongDnsAllowed(ObjectInfo*
>  {
>      TRACE_ENTER();
>      bool longDnsAllowed = false;
>-    if(ccbIdLongDnGuard)  {
>+    if(sCcbIdLongDnGuard)  {
>          /* A ccb is currently mutating longDnsAllowed */
>          return false;
>      }
>@@ -5856,8 +5974,9 @@ ImmModel::ccbCommit(SaUint32T ccbId, Con
>      }//for
>      
>      ccb->mMutations.clear();
>-
>-    if(ccbIdLongDnGuard == ccbId) {ccbIdLongDnGuard= 0;}
>+    ccb->removeAllObjReadLocks();
>+
>+    if(sCcbIdLongDnGuard == ccbId) {sCcbIdLongDnGuard = 0;}
>      
>      //If there are implementers involved then send the final apply callback
>      //to them and remove the implementers from the Ccb.
>@@ -6069,7 +6188,7 @@ ImmModel::ccbAbort(SaUint32T ccbId, Conn
>          }
>      }
>  
>-    if(ccbIdLongDnGuard == ccbId) {ccbIdLongDnGuard= 0;}
>+    if(sCcbIdLongDnGuard == ccbId) {sCcbIdLongDnGuard = 0;}
>  
>      return true;
>  }
>@@ -6259,6 +6378,7 @@ ImmModel::ccbTerminate(SaUint32T ccbId)
>          createsAbortedInCcb.clear();
>  
>          ccb->mMutations.clear();
>+        ccb->removeAllObjReadLocks();
>          if(ccb->mImplementers.size()) {
>              LOG_WA("Ccb destroyed without notifying some implementers from 
> IMMND.");
>              CcbImplementerMap::iterator ix;
>@@ -6357,14 +6477,20 @@ ImmModel::ccbAugmentInit(immsv_oi_ccb_up
>              case IMM_CCB_READY:
>              case IMM_CCB_COMMITTED:
>              case IMM_CCB_ABORTED:
>-            case IMM_CCB_PREPARE:
>-            case IMM_CCB_VALIDATING:
>              case IMM_CCB_VALIDATED:
>              case IMM_CCB_PBE_ABORT:
>              case IMM_CCB_CRITICAL:
>-                LOG_WA("Ccb Augment attempted in wrong CCB state");
>+                LOG_WA("Ccb Augment attempted in wrong CCB state %u", 
>ccb->mState);
>                  err = SA_AIS_ERR_BAD_OPERATION;
>-            goto done;
>+                goto done;
>+
>+            case IMM_CCB_VALIDATING:
>+                LOG_IN("Augment CCB in state VALIDATING");
>+                goto augment_for_safe_read;
>+
>+            case IMM_CCB_PREPARE:
>+                LOG_IN("Augment CCB in state PREPARE %u",  IMM_CCB_PREPARE);
>+                goto augment_for_safe_read;
>  
>              case IMM_CCB_CREATE_OP:
>                  TRACE("Augment CCB in state CREATE_OP");
>@@ -6378,7 +6504,6 @@ ImmModel::ccbAugmentInit(immsv_oi_ccb_up
>  
>              case IMM_CCB_MODIFY_OP:
>                  TRACE("Augment CCB in state MODIFY_OP");
>-
>                  omuti =  ccb->mMutations.find(objectName);
>                  if(omuti != ccb->mMutations.end()){
>                      obj = omuti->second->mAfterImage;
>@@ -6421,9 +6546,11 @@ ImmModel::ccbAugmentInit(immsv_oi_ccb_up
>      osafassert(omuti->second->mContinuationId == rsp->inv);
>      osafassert(omuti->second->mWaitForImplAck);
>  
>+ augment_for_safe_read:
>+
>      TRACE("obj:%p", obj);
>-    TRACE("obj->mImplementer:%p", obj->mImplementer);
>-    osafassert(obj && obj->mImplementer);
>+    //TRACE("obj->mImplementer:%p", obj->mImplementer);
>+    //osafassert(obj && obj->mImplementer);
>  
>  
>      if(ccb->mVeto != SA_AIS_OK) {
>@@ -6466,12 +6593,15 @@ ImmModel::ccbAugmentInit(immsv_oi_ccb_up
>      ccb->mAugCcbParent->mState = ccb->mState;
>      ccb->mAugCcbParent->mErrorStrings = ccb->mErrorStrings;
>      ccb->mAugCcbParent->mContinuationId = rsp->inv;
>-    ccb->mAugCcbParent->mImplId = obj->mImplementer->mId;
>+    ccb->mAugCcbParent->mImplId = (obj)?obj->mImplementer->mId:0;
>      ccb->mAugCcbParent->mAugmentAdmo = 0;
>  
>      ccb->mOriginatingConn = originatingConn;
>      ccb->mOriginatingNode = originatingNode;
>-    ccb->mState = IMM_CCB_READY;
>+    if(ccb->mState < IMM_CCB_VALIDATING) {
>+        ccb->mState = IMM_CCB_READY;
>+    }
>+
>      ccb->mWaitStartTime = 0;
>      ccb->mErrorStrings = NULL;
>  
>@@ -8491,6 +8621,7 @@ ImmModel::ccbObjectModify(const ImmsvOmC
>      ObjectNameSet afimPreOpNDRefs;  // Set of NO_DANGLING references from 
> after image before CCB operation
>      bool hasNoDanglingRefs = false;
>      bool modifiedImmMngt = false;  /* true => modification of the SAF 
> immManagement object. */
>+    bool upgradeReadLock = false;
>      
>      if(sz >= SA_MAX_UNEXTENDED_NAME_LENGTH) {
>          if(longDnsPermitted) {
>@@ -8577,7 +8708,7 @@ ImmModel::ccbObjectModify(const ImmsvOmC
>          err = SA_AIS_ERR_BAD_OPERATION;
>          goto ccbObjectModifyExit;
>      }
>-
>+
>      ccbIdOfObj = object->mCcbId;
>      if(ccbIdOfObj != ccbId) {
>          i1 = std::find_if(sCcbVector.begin(), sCcbVector.end(),
>@@ -8588,6 +8719,25 @@ ImmModel::ccbObjectModify(const ImmsvOmC
>              err = SA_AIS_ERR_BUSY;
>              goto ccbObjectModifyExit;
>          }
>+
>+        if(ccbIdOfObj == 0) {
>+            ObjectShortCountMap::iterator oscm  = 
>sObjectReadLockCount.find(object);
>+            if(oscm != sObjectReadLockCount.end())
>+            {
>+                if((oscm->second > 1) || 
>!(ccb->isReadLockedByThisCcb(object)))
>+                {
>+                    TRACE("ERR_BUSY: object '%s' is safe-read-locked by %u 
>ccb(s)",
>+                        objectName.c_str(), oscm->second);
>+                    err = SA_AIS_ERR_BUSY;
>+                    goto ccbObjectModifyExit;
>+                } else {
>+                    osafassert((oscm->second == 1) && 
>(ccb->isReadLockedByThisCcb(object)));
>+                    TRACE("CcbModify: Object '%s' safe-read-locked only by 
>this ccb (%u) => "
>+                         "upgrade to exclusive lock is possible", 
>objectName.c_str(), ccbId);
>+                    upgradeReadLock = true;
>+                }
>+            }
>+        }
>      }
>  
>      if(object->mObjFlags & IMM_RT_UPDATE_LOCK) {
>@@ -9208,12 +9358,12 @@ ImmModel::ccbObjectModify(const ImmsvOmC
>  
>              /* Check if *this* ccb is attempting to alter longDnsAllowed.*/
>              if(longDnsPermitted != longDnsAllowedAfter) {
>-                if(ccbIdLongDnGuard) {
>+                if(sCcbIdLongDnGuard) {
>                      /* This case should never happen since it is guarded by 
> regular ccb handling. */
>                      setCcbErrorString(ccb, "IMM: ERR_BUSY: Other Ccb (%u) 
> already using %s",
>-                        ccbIdLongDnGuard, immLongDnsAllowed.c_str());
>+                        sCcbIdLongDnGuard, immLongDnsAllowed.c_str());
>                      LOG_IN("IMM: ERR_BUSY: Other Ccb (%u) already using %s",
>-                        ccbIdLongDnGuard, immLongDnsAllowed.c_str());
>+                        sCcbIdLongDnGuard, immLongDnsAllowed.c_str());
>                      err = SA_AIS_ERR_BUSY;
>                  } else {
>                      if(!longDnsAllowedAfter) {
>@@ -9281,7 +9431,7 @@ ImmModel::ccbObjectModify(const ImmsvOmC
>                          }
>                      } /* End of LONG DN check */
>  
>-                    ccbIdLongDnGuard = ccbId;
>+                    sCcbIdLongDnGuard = ccbId;
>                  }
>              }
>          }
>@@ -9368,7 +9518,7 @@ ImmModel::ccbObjectModify(const ImmsvOmC
>      if(err == SA_AIS_OK) {
>          object->mCcbId = ccbId; //Overwrite any old obsolete ccb id.
>          osafassert(oMut);
>-        if(!chainedOp) {
>+        if(!chainedOp) {
>              ccb->mMutations[objectName] = oMut;
>              if(adminOwner->mReleaseOnFinalize) {
>                  //NOTE: this may not be good enough. For a modify the
>@@ -9377,6 +9527,9 @@ ImmModel::ccbObjectModify(const ImmsvOmC
>              }
>          }
>          ccb->mOpCount++;
>+        if(upgradeReadLock) {
>+            ccb->removeObjReadLock(object);
>+        }
>      } else {
>          //err != SA_AIS_OK
>          if(ccb->mState == IMM_CCB_MODIFY_OP) {ccb->mState = IMM_CCB_READY;}
>@@ -9500,7 +9653,7 @@ ImmModel::ccbObjectDelete(const ImmsvOmC
>          setCcbErrorString(ccb, IMM_RESOURCE_ABORT "CCB is not in an expected 
> state");
>          goto ccbObjectDeleteExit;
>      }
>-
>+
>      if(reqConn && (ccb->mOriginatingConn != reqConn)) {
>          LOG_NO("ERR_BAD_HANDLE: Missmatch on connection for ccb id %u", 
> ccbId);
>          err = SA_AIS_ERR_BAD_HANDLE;
>@@ -9650,6 +9803,23 @@ ImmModel::deleteObject(ObjectMap::iterat
>                  ccb->mId, ccbIdOfObj);
>              return SA_AIS_ERR_BUSY;
>          }
>+
>+        if(ccbIdOfObj == 0) {
>+            ObjectShortCountMap::iterator oscm = 
>sObjectReadLockCount.find(oi->second);
>+            if(oscm != sObjectReadLockCount.end()) {
>+                if((oscm->second > 1) || 
>!(ccb->isReadLockedByThisCcb(oi->second))) {
>+                    TRACE("ERR_BUSY: object '%s' is read-locked by %u other 
>ccb(s)",
>+                        oi->first.c_str(), oscm->second);
>+                    osafassert(!doIt);
>+                    return SA_AIS_ERR_BUSY;
>+                } else if (doIt) {
>+                    osafassert((oscm->second == 1) && 
>(ccb->isReadLockedByThisCcb(oi->second)));
>+                    TRACE("CcbDelete: Object '%s' safe-read-locked only by 
>this ccb (%u) => "
>+                         "upgrade to exclusive lock is possible", 
>oi->first.c_str(), ccb->mId);
>+                    ccb->removeObjReadLock(oi->second);

[Zoran] removeObjReadLock() can be called before the exit of the function, if 
all checks are successful. Similar like it's done in ccbObjModify.

Thanks,
Zoran 

>+                }
>+            }
>+        }
>      }
>      
>      if(oi->second->mClassInfo->mCategory != SA_IMM_CLASS_CONFIG) {
>@@ -10400,6 +10570,20 @@ ImmModel::ccbCompletedContinuation(immsv
>      }
>      ccb = *i1;
>  
>+    if(ccb->mAugCcbParent &&
>+      (ccb->mAugCcbParent->mContinuationId == rsp->inv)) {
>+        ccb->mOriginatingConn = ccb->mAugCcbParent->mOriginatingConn;
>+        ccb->mOriginatingNode = ccb->mAugCcbParent->mOriginatingNode;
>+        ccb->mState = ccb->mAugCcbParent->mState;
>+        ccb->mErrorStrings = ccb->mAugCcbParent->mErrorStrings;
>+
>+        ccb->mAugCcbParent->mErrorStrings = NULL;
>+        ccb->mAugCcbParent->mContinuationId = 0;
>+        free(ccb->mAugCcbParent);
>+        ccb->mAugCcbParent = NULL;
>+    }
>+
>+
>      if(ccb->mState == IMM_CCB_VALIDATED) {
>          LOG_IN("GOING FROM IMM_CCB_VALIDATED to IMM_CCB_PREPARE Ccb:%u", 
> ccbId);
>          ccb->mState = IMM_CCB_PREPARE;
>@@ -10700,6 +10884,252 @@ ImmModel::ccbObjModifyContinuation(SaUin
>      TRACE_LEAVE();
>  }
>  
>+SaAisErrorT
>+ImmModel::ccbReadLockObject(const ImmsvOmSearchInit* req)
>+{
>+    TRACE_ENTER();
>+    SaAisErrorT err = SA_AIS_OK;
>+    SaUint32T ccbId = req->ccbId;
>+    size_t sz = strnlen((char *) req->rootName.buf, 
>(size_t)req->rootName.size);
>+    std::string objectName((const char*)req->rootName.buf, sz);
>+    ObjectMap::iterator i;
>+    ObjectInfo* obj = NULL;
>+    SaUint32T ccbIdOfObj=0;
>+    CcbVector::iterator i1;
>+    CcbInfo* ccb = 0;
>+
>+    if (objectName.empty()) {
>+        LOG_NO("ERR_INVALID_PARAM: Empty DN is not allowed");
>+        err = SA_AIS_ERR_INVALID_PARAM;
>+        goto done;
>+    }
>+
>+    // Validate object name
>+    if(! (nameCheck(objectName)||nameToInternal(objectName)) ) {
>+        LOG_NO("ERR_INVALID_PARAM: Not a proper object name");
>+        err = SA_AIS_ERR_INVALID_PARAM;
>+        goto done;
>+    }
>+
>+    i = sObjectMap.find(objectName);
>+    if (i == sObjectMap.end()) {
>+        //TRACE_7("ERR_NOT_EXIST: Object '%s' does not exist", 
>objectName.c_str());
>+        err = SA_AIS_ERR_NOT_EXIST;
>+        goto done;
>+    }
>+
>+    obj = i->second;
>+
>+    osafassert(obj->mClassInfo);
>+
>+    if(obj->mClassInfo->mCategory != SA_IMM_CLASS_CONFIG) {
>+        LOG_NO("ERR_INVALID_PARAM: object '%s' is not a configuration object",
>+            objectName.c_str());
>+        err = SA_AIS_ERR_INVALID_PARAM;
>+        goto done;
>+    }
>+
>+    i1 = std::find_if(sCcbVector.begin(), sCcbVector.end(), CcbIdIs(ccbId));
>+    if (i1 == sCcbVector.end()) {
>+        LOG_NO("ERR_BAD_HANDLE: ccb id %u does not exist", ccbId);
>+        err = SA_AIS_ERR_BAD_HANDLE;
>+        goto done;
>+    }
>+    ccb = *i1;
>+
>+    if(!ccb->isOk()) {
>+        LOG_IN("ERR_FAILED_OPERATION: ccb %u is in an error state "
>+            "rejecting ccbObjectRead operation ", ccbId);
>+        /* !ccb->isOk(), error string with abort reason must already be set */
>+        err = SA_AIS_ERR_FAILED_OPERATION;
>+        goto done;
>+    }
>+
>+    if(ccb->mState > IMM_CCB_READY) {
>+        if(ccb->mAugCcbParent == NULL) {
>+            LOG_WA("ERR_BAD_OPERATION: ccb %u is not in an expected state: %u 
>"
>+                "rejecting ccbObjectRead operation ", ccbId, ccb->mState);
>+            err = SA_AIS_ERR_BAD_OPERATION;
>+            goto done;
>+        } else {
>+            if(ccb->mState < IMM_CCB_CRITICAL) {
>+                LOG_IN("SafeRead inside augmentation allowed");
>+            } else {
>+                LOG_WA("ERR_BAD_OPERATION: ccb %u is not in an expected 
>state: %u "
>+                    "rejecting ccbObjectRead operation ", ccbId, ccb->mState);
>+                err = SA_AIS_ERR_BAD_OPERATION;
>+                goto done;
>+            }
>+        }
>+    }
>+
>+    ccbIdOfObj = obj->mCcbId;
>+    if(ccbIdOfObj != ccbId) {
>+       i1 = std::find_if(sCcbVector.begin(), sCcbVector.end(),
>+            CcbIdIs(ccbIdOfObj));
>+        if ((i1 != sCcbVector.end()) && ((*i1)->isActive())) {
>+            TRACE_7("ERR_BUSY: ccb id %u differs from active ccb id on object 
>%u",
>+                ccbId, ccbIdOfObj);
>+            err = SA_AIS_ERR_BUSY;
>+            goto done;
>+        }
>+        /* Set ccb-id to zero to signify no exclusive lock set.
>+           CcbId of 0 does not mean that a shared read lock must be set for 
>the object.
>+           It only means that zero or more shared read locks *could* be set 
>for this object.
>+         */
>+        obj->mCcbId = 0;
>+    } else {
>+        TRACE_7("Safe-read: Object '%s' is currently *exclusive* locked by 
>*same* ccb %u",
>+            objectName.c_str(), ccbId);
>+        /* Read-lock succeeds as a no-op.
>+           Actual access will fail if it is delete locked in same ccb.
>+           We could fail here for the delete locked case, but it is actually 
>not the
>+           requested locking for the ccb that fails, it is the 
>safe-read-access done later.
>+           The safe read of the object will fail with ERR_NOT_EXIST because 
>the object is
>+           scheduled for delete in the *same* ccb as the reader. If the 
>object was locked
>+           for delete in a different ccb then the safe-reader in this ccb 
>would get ERR_BUSY.
>+        */
>+        osafassert(err == SA_AIS_OK);
>+        goto done;
>+    }
>+
>+    if(ccb->isReadLockedByThisCcb(obj)) {
>+        TRACE("Object %s is already safe-read-locked by this ccb %u",
>+            objectName.c_str(), ccbId);
>+    } else {
>+        ccb->addObjReadLock(obj, objectName);
>+    }
>
>*[Hung] ccb->isReadLockedByThisCcb(obj) should always return false 
>here.****We only enter immModel_ccbReadLockObject() when 
>immModel_objectIsLockedByCcb() returns ERR_INTERRUPT.****So the object 
>must not be locked by the ccb.****I think we should put an assert 
>here.****osafassert(!ccb->isReadLockedByThisCcb(obj));****ccb->addObjReadLock(obj,
> 
>objectName);*
>
>
>
>+
>+ done:
>+    TRACE_LEAVE();
>+    return err;
>+}
>+
>+SaAisErrorT
>+ImmModel::objectIsLockedByCcb(const ImmsvOmSearchInit* req)
>+{
>+    // ##Redo this whole method to simply check ccb and ccb mutation.?
>+    TRACE_ENTER();
>+    SaAisErrorT err = SA_AIS_OK;
>+    SaUint32T ccbId = req->ccbId;
>+    size_t sz = strnlen((char *) req->rootName.buf, 
>(size_t)req->rootName.size);
>+    std::string objectName((const char*)req->rootName.buf, sz);
>+    ObjectMap::iterator i;
>+    ObjectInfo* obj = NULL;
>+    SaUint32T ccbIdOfObj=0;
>+    CcbVector::iterator i1;
>+    CcbInfo* ccb = 0;
>+
>+    if (objectName.empty()) {
>+        LOG_NO("ERR_INVALID_PARAM: Empty DN is not allowed");
>+        err = SA_AIS_ERR_INVALID_PARAM;
>+        goto done;
>+    }
>+
>+    // Validate object name
>+    if(! (nameCheck(objectName)||nameToInternal(objectName)) ) {
>+        LOG_NO("ERR_INVALID_PARAM: Not a proper object name");
>+        err = SA_AIS_ERR_INVALID_PARAM;
>+        goto done;
>+    }
>+
>+    i = sObjectMap.find(objectName);
>+    if (i == sObjectMap.end()) {
>+        //TRACE_7("ERR_NOT_EXIST: Object '%s' does not exist", 
>objectName.c_str());
>+        err = SA_AIS_ERR_NOT_EXIST;
>+        goto done;
>+    }
>+
>+    obj = i->second;
>+    osafassert(obj->mClassInfo);
>+
>+    if(obj->mClassInfo->mCategory != SA_IMM_CLASS_CONFIG) {
>+        LOG_NO("ERR_INVALID_PARAM: object '%s' is not a configuration object",
>+            objectName.c_str());
>+        err = SA_AIS_ERR_INVALID_PARAM;
>+        goto done;
>+    }
>+
>+    i1 = std::find_if(sCcbVector.begin(), sCcbVector.end(), CcbIdIs(ccbId));
>+    if (i1 == sCcbVector.end()) {
>+        LOG_NO("ERR_BAD_HANDLE: ccb id %u does not exist", ccbId);
>+        err = SA_AIS_ERR_BAD_HANDLE;
>+        goto done;
>+    }
>+    ccb = *i1;
>+
>+    if(!ccb->isOk()) {
>+        LOG_IN("ERR_FAILED_OPERATION: ccb %u is in an error state "
>+            "rejecting ccbObjectRead operation ", ccbId);
>+        /* !ccb->isOk(), error string with abort reason must already be set */
>+        err = SA_AIS_ERR_FAILED_OPERATION;
>+        goto done;
>+    }
>+
>+    if(ccb->mState > IMM_CCB_READY) {
>+        if(ccb->mAugCcbParent == NULL) {
>+            LOG_WA("ERR_BAD_OPERATION: ccb %u is not in an expected state: %u 
>"
>+                "rejecting ccbObjectRead operation ", ccbId, ccb->mState);
>+            err = SA_AIS_ERR_BAD_OPERATION;
>+            goto done;
>+        } else {
>+            if(ccb->mState < IMM_CCB_CRITICAL) {
>+                LOG_IN("SafeRead inside augmentation allowed");
>+            } else {
>+                LOG_WA("ERR_BAD_OPERATION: ccb %u is not in an expected 
>state: %u "
>+                    "rejecting ccbObjectRead operation ", ccbId, ccb->mState);
>+                err = SA_AIS_ERR_BAD_OPERATION;
>+                goto done;
>+            }
>+        }
>+    }
>+
>+    ccbIdOfObj = obj->mCcbId;
>+    if(ccbIdOfObj == ccbId) {
>+        TRACE_7("Object '%s' is currently *exclusive* locked by same ccb %u",
>+            objectName.c_str(), ccbId);
>+        osafassert(err == SA_AIS_OK);
>+        goto done;
>+    } else if(ccb->isReadLockedByThisCcb(obj)) {
>+        TRACE_7("Object '%s' is currently *shared* read locked by same ccb 
>%u",
>+            objectName.c_str(), ccbId);
>+        osafassert(err == SA_AIS_OK);
>+        goto done;
>+    } else if(ccbIdOfObj) {
>+        //CcbId is not zero.
>+        TRACE_7("Object '%s' is currently NOT locked by same ccb %u",
>+            objectName.c_str(), ccbId);
>+        i1 = std::find_if(sCcbVector.begin(), sCcbVector.end(),
>+            CcbIdIs(ccbIdOfObj));
>+        if ((i1 != sCcbVector.end()) && ((*i1)->isActive())) {
>+            TRACE_7("ERR_BUSY: Safe-read: ccb id %u differs from active ccb 
>id on object %u",
>+                ccbId, ccbIdOfObj);
>+            err = SA_AIS_ERR_BUSY;
>+            goto done;
>+        }
>+        /* clear the obsolete ccb-id */
>+        ccbIdOfObj = 0;
>+    }
>+
>+    if(ccb->mAugCcbParent) {
>+        LOG_IN("This is a lock check in an augmented CCB");
>+    }
>+
>+    /* Intentionally NO check for admin-owner. SafeRead allows read sharing 
>with
>+       other CCBs, => no admin-owner exclusivity.
>+    */
>+
>+    if(ccbIdOfObj == 0) {
>+        TRACE_7("Object '%s' is currently not locked by any ccb", 
>objectName.c_str());
>
>*[Hung] This trace is misleading. We also enter here when the object is 
>read-locked by other ccb.****It should be.****TRACE_7("Object '%s' is 
>currently not locked by this ccb %u", objectName.c_str(), ccbId);*
>
>+        err = SA_AIS_ERR_INTERRUPT;
>+        goto done;
>+    }
>+
>+ done:
>+    TRACE_LEAVE();
>+    return err;
>+}
>+
>  
>  SaAisErrorT
>  ImmModel::accessorGet(const ImmsvOmSearchInit* req, ImmSearchOp& op)
>@@ -10724,6 +11154,14 @@ ImmModel::accessorGet(const ImmsvOmSearc
>      SaImmSearchOptionsT notAllowedOptions = 0LL;
>      bool nonExtendedNameCheck = req->searchParam.present > 
> ImmOmSearchParameter_PR_oneAttrParam;
>      bool checkAttribute = false;
>+    bool isSafeRead = (req->ccbId != 0);
>+    CcbInfo* ccb = NULL;
>+    CcbVector::iterator i1;
>+
>+
>+    if(isSafeRead) {
>+        TRACE_7("SafeRead (accessorGet) on Object: '%s'", objectName.c_str());
>+    }
>  
>      if(nonExtendedNameCheck) {
>          op.setNonExtendedName();
>@@ -10753,13 +11191,100 @@ ImmModel::accessorGet(const ImmsvOmSearc
>          TRACE_7("ERR_NOT_EXIST: Object '%s' does not exist", 
> objectName.c_str());
>          err = SA_AIS_ERR_NOT_EXIST;
>          goto accessorExit;
>-    } else if(i->second->mObjFlags & IMM_CREATE_LOCK) {
>-        TRACE_7("ERR_NOT_EXIST: Object '%s' is being created, but ccb "
>-                "or PRTO PBE, not yet applied", objectName.c_str());
>-        err = SA_AIS_ERR_NOT_EXIST;
>-        goto accessorExit;
>-    }
>-
>+    }
>+    if(i->second->mObjFlags & IMM_CREATE_LOCK) {
>+        /* Both regular accessor-get and safe-read affected by create lock.*/
>+        if(isSafeRead) {
>+            if(req->ccbId != i->second->mCcbId) {
>+                TRACE_7("ERR_NOT_EXIST: Object '%s' is being created but not 
>in this ccb.",
>+                    objectName.c_str());
>+                err = SA_AIS_ERR_NOT_EXIST;
>+                goto accessorExit;
>+            }
>+            TRACE("req->ccbId == i->second->mCcbId  i.e. same ccb ok for 
>safe-read");
>+            obj = i->second;
>+        } else { /* Regular accessor-get */
>+            TRACE_7("ERR_NOT_EXIST: Object '%s' is being created, but ccb "
>+                    "or PRTO PBE, not yet applied", objectName.c_str());
>+            err = SA_AIS_ERR_NOT_EXIST;
>+            goto accessorExit;
>+        }
>+    } else if(isSafeRead) {
>+        /* Interference with unapplied create taken care of above in 
>if-branch for
>+           both safeRead and regular accessorGet.
>+
>+           Regular accessor-get is not affected by open delete, modify, or 
>safe-reads.
>+           It simply accesses the current committed version.
>+
>+           Safe-read *is* affected by unapplied modify or delete, hence this 
>else branch.
>+
>+           Mdify in same ccb => safe-read must read the after-image-version 
>in the same ccb.
>+           Delete in same ccb => the object becomes inaccesible 
>(ERR_NOT_EXIST) for safe-read.
>+           Mofify or delete by *other* ccb => safe-read should have been 
>rejected (ERR_BUSY)
>+           in objectIsLockedByCcb().
>+
>+           Safe-read is of course not affected by other safe-read operations 
>on this object
>+           in same or other ccbs. Latest commited version is read.
>+        */
>+
>+        i1 = std::find_if(sCcbVector.begin(), sCcbVector.end(), 
>CcbIdIs(req->ccbId));
>+        osafassert(i1 != sCcbVector.end()); /*Already checked in 
>objectIsLockedByCcb. */
>+        ccb = *i1;
>+        if((i->second->mCcbId == 0) && 
>!(ccb->isReadLockedByThisCcb(i->second))) {
>+            LOG_WA("Safe read on object %s NOT locked by ccb (%u). Should not 
>be caught here",
>+                objectName.c_str(), req->ccbId);
>+            /* If we get here this may be seen as a bug to be be corrected. 
>Should perhaps
>+               assert here, but be lenient for now and just report 
>"INTERRUPT" which should
>+               result in an new attempt to lock the object for safe-read.
>+               The safe-read lock should have been obtained before reacing 
>accessor-get.
>+             */
>+            err = SA_AIS_ERR_INTERRUPT;  /* Try to lock the object for safe 
>read.*/
>+            goto accessorExit;
>+        }
>+
>+        if(i->second->mCcbId != req->ccbId) {
>+            if(i->second->mCcbId == 0) {
>+                TRACE_7("Object %s locked for shared read in ccb (%u) and 
>possibly other ccbs",
>+                    objectName.c_str(), req->ccbId);
>+                osafassert(i->second->mObjFlags & IMM_SHARED_READ_LOCK);
>+                obj = i->second; /* Use latest committed version. */
>+            } else {
>+                LOG_WA("Safe read on object %s exclusive locked by other ccb 
>(%u). "
>+                    "Should not be caught here", objectName.c_str(), 
>i->second->mCcbId);
>+                /* If we get here this may be seen as a bug to be be 
>corrected. Should perhaps
>+                   assert here, but be lenient for now and just report "BUSY".
>+                */
>+                err = SA_AIS_ERR_BUSY;
>+                goto accessorExit;
>+            }
>+        } else {/* i->second->mCcbId == req->ccbId */
>+            osafassert(i->second->mCcbId != 0);
>+            TRACE_7("Safe read on object *exclusive* locked by *this* ccb 
>%u", req->ccbId);
>+
>+            /* This can not be a safe-read. */
>+            osafassert(!(i->second->mObjFlags & IMM_SHARED_READ_LOCK));
>+
>+            /* Prior op can not be a create. Determine if it is delete or 
>modify */
>+
>+            if(i->second->mObjFlags & IMM_DELETE_LOCK) {
>+                LOG_IN("ERR_NOT_EXIST: Object '%s' is being deleted in same 
>ccb %u",
>+                    objectName.c_str(), req->ccbId);
>+                err = SA_AIS_ERR_NOT_EXIST;
>+                goto accessorExit;
>+            }
>+
>+            /* Prior op must be a modify.in same ccb. Use latest afim for 
>safe-read. */
>+            TRACE_7("Safe read of Object '%s' that was modified in same ccb 
>%u",
>+                objectName.c_str(), req->ccbId);
>+            /* Find mutation */
>+            ObjectMutationMap::iterator omuti = 
>ccb->mMutations.find(objectName);
>+            osafassert(omuti != ccb->mMutations.end());
>+            ObjectMutation* oMut = omuti->second;
>+            osafassert(oMut->mOpType == IMM_MODIFY);
>+            obj = oMut->mAfterImage;
>+        }
>+    } //if(safeRead)
>+
>      // Validate scope
>      if (scope != SA_IMM_ONE) {
>          LOG_NO("ERR_INVALID_PARAM: invalid search scope");
>@@ -10784,9 +11309,15 @@ ImmModel::accessorGet(const ImmsvOmSearc
>  
>      //TODO: Reverse the order of matching attribute names.
>      //The class should be searched first since we must return error
>-    //if the attribute is not defined in the class of the object.
>-
>-    obj = i->second;
>+    //if the attribute is not defined in the class of the object
>+
>+    //SafeRead works exactly like accessor get with respect to runtime 
>attributes.
>+
>+    if(!obj) {
>+        osafassert(!isSafeRead); //SafeRead checks bypassed somehow above.
>+        obj = i->second;
>+    }
>+
>      if(obj->mObjFlags & IMM_DN_INTERNAL_REP) {
>          nameToExternal(objectName);
>      }
>@@ -11093,7 +11624,12 @@ ImmModel::searchInitialize(ImmsvOmSearch
>          goto searchInitializeExit;
>      }
>      
>-
>+    if(req->ccbId) {
>+        LOG_WA("SafeRead ended up in ImmModel::searchInitialize ccb:%u", 
>req->ccbId);
>+        err = SA_AIS_ERR_LIBRARY;
>+        goto searchInitializeExit;
>+    }
>+
>      // Validate root name
>      if(! (nameCheck(rootName)||nameToInternal(rootName)) ) {
>          LOG_NO("ERR_INVALID_PARAM: Not a proper root name");
>diff --git a/osaf/services/saf/immsv/immnd/ImmModel.hh 
>b/osaf/services/saf/immsv/immnd/ImmModel.hh
>--- a/osaf/services/saf/immsv/immnd/ImmModel.hh
>+++ b/osaf/services/saf/immsv/immnd/ImmModel.hh
>@@ -79,6 +79,9 @@ typedef std::vector<SaInvocationT> Invoc
>  /* Maps an object pointer, to a set of object pointers.*/
>  typedef std::multimap<ObjectInfo*, ObjectInfo*> ObjectMMap;
>  
>+/**/
>+typedef std::map<ObjectInfo*, uint16_t> ObjectShortCountMap;
>+
>  /* Object mutation */
>  struct ObjectMutation;
>  typedef std::map<std::string, ObjectMutation*> ObjectMutationMap;
>@@ -382,6 +385,10 @@ public:
>      SaAisErrorT         nextSyncResult(ImmsvOmRspSearchNext** rsp,
>                                         ImmSearchOp& op);
>      
>+    SaAisErrorT         objectIsLockedByCcb(const ImmsvOmSearchInit* req);
>+
>+    SaAisErrorT         ccbReadLockObject(const ImmsvOmSearchInit* req);
>+
>      SaAisErrorT         accessorGet(
>                                      const ImmsvOmSearchInit* req,
>                                      ImmSearchOp& op);
>diff --git a/osaf/services/saf/immsv/immnd/immnd_evt.c 
>b/osaf/services/saf/immsv/immnd/immnd_evt.c
>--- a/osaf/services/saf/immsv/immnd/immnd_evt.c
>+++ b/osaf/services/saf/immsv/immnd/immnd_evt.c
>@@ -217,10 +217,10 @@ static uint32_t immnd_evt_proc_search_fi
>  
>  static uint32_t immnd_evt_proc_accessor_get(IMMND_CB *cb, IMMND_EVT *evt, 
> IMMSV_SEND_INFO *sinfo);
>  
>+static uint32_t immnd_evt_proc_safe_read(IMMND_CB *cb, IMMND_EVT *evt, 
>IMMSV_SEND_INFO *sinfo);
>+
>  static uint32_t immnd_evt_proc_mds_evt(IMMND_CB *cb, IMMND_EVT *evt);
>  
>-/*static uint32_t immnd_evt_immd_new_active(IMMND_CB *cb);*/
>-
>  static void immnd_evt_ccb_abort(IMMND_CB *cb, SaUint32T ccbId, SaUint32T 
> **clientArr,
>               SaUint32T * clArrSize, SaUint32T *nodeId);
>  
>@@ -358,7 +358,8 @@ uint32_t immnd_evt_destroy(IMMSV_EVT *ev
>       } else if (evt->info.immnd.type == IMMND_EVT_ND2ND_SEARCH_REMOTE_RSP) {
>               freeSearchNext(&evt->info.immnd.info.rspSrchRmte.runtimeAttrs, 
> false);
>       } else if ((evt->info.immnd.type == IMMND_EVT_A2ND_SEARCHINIT) ||
>-              (evt->info.immnd.type == IMMND_EVT_A2ND_ACCESSOR_GET)) {
>+              (evt->info.immnd.type == IMMND_EVT_A2ND_ACCESSOR_GET) ||
>+              (evt->info.immnd.type == IMMND_EVT_A2ND_OBJ_SAFE_READ)) {
>               free(evt->info.immnd.info.searchInit.rootName.buf);
>               evt->info.immnd.info.searchInit.rootName.buf = NULL;
>               evt->info.immnd.info.searchInit.rootName.size = 0;
>@@ -606,6 +607,10 @@ void immnd_process_evt(void)
>               rc = immnd_evt_proc_search_next(cb, &evt->info.immnd, 
> &evt->sinfo);
>               break;
>  
>+      case IMMND_EVT_A2ND_OBJ_SAFE_READ:
>+              rc = immnd_evt_proc_safe_read(cb, &evt->info.immnd, 
>&evt->sinfo);
>+              break;
>+
>       case IMMND_EVT_A2ND_ACCESSOR_GET:
>               rc = immnd_evt_proc_accessor_get(cb, &evt->info.immnd, 
> &evt->sinfo);
>               break;
>@@ -1795,6 +1800,80 @@ static uint32_t immnd_evt_proc_search_fi
>  }
>  
>  /****************************************************************************
>+ * Name          : immnd_evt_proc_safe_read
>+ *
>+ * Description   : Function to process the saImmOmCcbObjectRead call.
>+ *                 This call can in some cases be resolved with success 
>locally.
>+ *                 This will be the case if the object to be accessed is 
>already
>+ *                 locked previously by this CCB.
>+ *
>+ *
>+ * Arguments     : IMMND_CB *cb - IMMND CB pointer
>+ *                 IMMND_EVT *evt - Received Event structure
>+ *                 IMMSV_SEND_INFO *sinfo - sender info
>+ 
>*****************************************************************************/
>+static uint32_t immnd_evt_proc_safe_read(IMMND_CB *cb, IMMND_EVT *evt, 
>IMMSV_SEND_INFO *sinfo)
>+{
>+      IMMSV_EVT send_evt;
>+      uint32_t rc = NCSCC_RC_SUCCESS;
>+      SaAisErrorT err = SA_AIS_OK;
>+      TRACE_ENTER();
>+      memset(&send_evt, 0, sizeof(IMMSV_EVT));
>+
>+      if(evt->info.searchInit.ccbId == 0) {
>+              err = SA_AIS_ERR_LIBRARY;
>+              LOG_WA("ERR_LIBRARY: Received zero ccb-id for safe-read");
>+              goto error;
>+      }
>+
>+      /* Dive into ImmModel to do locking checks.
>+         If already locked in this ccb continue with accessor.
>+         If locked by other ccb then reject with ERR_BUSY
>+         If not locked, then generate fevs message for locking AND read.
>+       */
>+
>+      err = immModel_objectIsLockedByCcb(cb, &(evt->info.searchInit));
>+
>+      switch (err) {
>+              case SA_AIS_OK:
>+                      TRACE("Safe read: Object is locked by this CCB(%u). Go 
>ahead and safe-read",
>+                              evt->info.searchInit.ccbId);
>+                      /* Invoke accessor_get which will send the reply. */
>+                      break;
>+
>+              case SA_AIS_ERR_BUSY:
>+                      TRACE("Object is locked by some other CCB than this 
>CCB(%u). Reply with BUSY",
>+                              evt->info.searchInit.ccbId);
>+                      goto error;
>+
>+              case SA_AIS_ERR_INTERRUPT:
>+                      TRACE("Object not locked. Ccb (%u) needs to lock it 
>over fevs. Reply with INTERRUPT",
>+                              evt->info.searchInit.ccbId);
>+                      /* Should result in fevs message to read-lock object. */
>+                      goto error;
>+
>+              default:
>+                      TRACE("Unusual error from immModel_isLockedByMe: %u", 
>err);
>
>*[Hung] The function is immModel_objectIsLockedByCcb, not 
>immModel_isLockedByMe.*
>
>+                      goto error;
>+      }
>+
>+      rc = immnd_evt_proc_accessor_get(cb, evt, sinfo);
>+      goto done;
>+
>+ error:
>+      send_evt.type = IMMSV_EVT_TYPE_IMMA;
>+      send_evt.info.imma.type = IMMA_EVT_ND2A_IMM_ERROR;
>+      send_evt.info.imma.info.errRsp.error = err;
>+      send_evt.info.imma.info.errRsp.errStrings = NULL;
>+
>+      rc = immnd_mds_send_rsp(cb, sinfo, &send_evt);
>+
>+ done:
>+      TRACE_LEAVE();
>+      return rc;
>+}
>+
>+/****************************************************************************
>   * Name          : immnd_evt_proc_accessor_get
>   *
>   * Description   : Function to process the saImmOmAccessorGet call.
>@@ -1822,7 +1901,13 @@ static uint32_t immnd_evt_proc_accessor_
>       /* Search Init */
>  
>       memset(&send_evt, 0, sizeof(IMMSV_EVT));
>-      TRACE_2("ACCESSOR GET:%s", evt->info.searchInit.rootName.buf);
>+
>+      if(evt->info.searchInit.ccbId != 0) {
>+              TRACE_2("SAFE READ :%s CcbId: %u", 
>evt->info.searchInit.rootName.buf, evt->info.searchInit.ccbId);
>+      } else {
>+              TRACE_2("ACCESSOR GET:%s", evt->info.searchInit.rootName.buf);
>+      }
>+
>  
>       /*Look up client-node */
>       immnd_client_node_get(cb, evt->info.searchInit.client_hdl, &cl_node);
>@@ -3205,6 +3290,20 @@ static SaAisErrorT immnd_fevs_local_chec
>               }
>               break;
>  
>+      case IMMND_EVT_A2ND_OBJ_SAFE_READ:
>+              TRACE("IMMND_EVT_A2ND_OBJ_SAFE_READ noted in 
>fevs_local_checks");
>+              if(!immModel_protocol50Allowed(cb) || 
>immModel_pbeNotWritable(cb)) {
>+                      error = SA_AIS_ERR_TRY_AGAIN;
>+              }
>+              /*
>+              Change signature of method to take name and ccbID
>+
>+              error = immModel_objectIsLockedByCcb(cb, 
>&(evt->info.searchInit));
>+              if(error == SA_AIS_OK) { error = SA_AIS_ERR_NO_BINDINGS;}
>+              */
>
>*[Hung] The above comment should be removed?*

Yes the comment and the code it contains should be removed, alternatively fixed.

In principle on could catch the bulk of locking conflicts at the local IMMND, 
avoiding load on IMMD/fevs and the other IMMNDs.
Only conflics that arise close in realtime from different nodes would need to 
be caught after going over fevs.
But apparently this requires more work for safe-read.

/AndersBj

>
>+
>+              break;
>+
>       case IMMND_EVT_A2ND_OBJ_CREATE_2:
>          if(!immModel_protocol46Allowed(cb) || immModel_pbeNotWritable(cb)) {
>              error = SA_AIS_ERR_TRY_AGAIN;
>@@ -3548,7 +3647,6 @@ static SaAisErrorT immnd_fevs_local_chec
>               }
>               break;
>  
>-
>       case IMMND_EVT_A2ND_PBE_ADMOP_RSP:
>               if(fevsReq->sender_count != 0x0) {
>                       LOG_WA("ERR_LIBRARY: IMMND_EVT_A2ND_PBE_ADMOP_RSP 
> fevsReq->sender_count != 0x0");
>@@ -3562,7 +3660,7 @@ static SaAisErrorT immnd_fevs_local_chec
>               break;
>  
>       default:
>-              LOG_ER("UNPACK FAILURE, unrecognized message type: %u over 
>FEVS",
>+              LOG_ER("UNPACK FAILURE, unrecognized message type: %u caught in 
>fevs_local_checks",
>                       frwrd_evt.info.immnd.type);
>               error = SA_AIS_ERR_LIBRARY;
>               break;
>@@ -4378,6 +4476,58 @@ static void immnd_evt_sync_fevs_base(IMM
>  
>  }
>  
>+static void immnd_evt_safe_read_lock(IMMND_CB *cb, IMMND_EVT *evt,
>+       SaBoolT originatedAtThisNd, SaImmHandleT clnt_hdl, MDS_DEST reply_dest)
>+{
>+      IMMSV_EVT send_evt;
>+      IMMND_IMM_CLIENT_NODE *cl_node = NULL;
>+      SaAisErrorT err = SA_AIS_OK;
>+      TRACE_ENTER();
>+
>+      err = immModel_objectIsLockedByCcb(cb, &(evt->info.searchInit));
>+
>+      switch (err) {
>+              case SA_AIS_OK:
>+                      TRACE("safe_read_lock: Object is already locked by this 
>CCB(%u)!!",
>+                              evt->info.searchInit.ccbId);
>+                      break;
>+
>+              case SA_AIS_ERR_BUSY:
>+                      TRACE("Object is locked by some other CCB than this 
>CCB(%u). Reply with BUSY",
>+                              evt->info.searchInit.ccbId);
>+                      break;
>+
>+              case SA_AIS_ERR_INTERRUPT:
>+                      TRACE("Object not locked. Ccb (%u) will now lock it",
>+                              evt->info.searchInit.ccbId);
>+                      err = immModel_ccbReadLockObject(cb, 
>&(evt->info.searchInit));
>+                      break;
>+
>+              default:
>+                      TRACE("Unusual error from immModel_isLockedByMe: %u", 
>err);
>
>*[Hung] The function is immModel_objectIsLockedByCcb, not 
>immModel_isLockedByMe.*
>
>+      }
>+
>+      if(originatedAtThisNd) {
>+              immnd_client_node_get(cb, clnt_hdl, &cl_node);
>+              if (cl_node == NULL || cl_node->mIsStale) {
>+                      LOG_WA("IMMND - Client went down so no response");
>+                      return;
>+              }
>+              TRACE_2("Send immediate reply to client");
>+
>+              memset(&send_evt, '\0', sizeof(IMMSV_EVT));
>+              send_evt.type = IMMSV_EVT_TYPE_IMMA;
>+              send_evt.info.imma.info.errRsp.error = err;
>+              send_evt.info.imma.type = IMMA_EVT_ND2A_IMM_ERROR;
>+
>+              if (immnd_mds_send_rsp(cb, &(cl_node->tmpSinfo), &send_evt) != 
>NCSCC_RC_SUCCESS) {
>+                      LOG_WA("immnd_evt_class_delete: SENDRSP FAILED TO 
>SEND");
>+              }
>+      }
>+
>+      TRACE_LEAVE();
>+}
>+
>  
>  /****************************************************************************
>   * Name          : immnd_evt_pbe_admop_rsp
>@@ -5141,7 +5291,7 @@ static void immnd_evt_proc_admop(IMMND_C
>       if (originatedAtThisNd) {
>               immnd_client_node_get(cb, clnt_hdl, &cl_node);
>               if (cl_node == NULL || cl_node->mIsStale) {
>-                      LOG_ER("IMMND - Client went down so no response");
>+                      LOG_WA("IMMND - Client went down so no response");
>                       return;
>               }
>  
>@@ -8293,6 +8443,11 @@ immnd_evt_proc_fevs_dispatch(IMMND_CB *c
>               immnd_evt_sync_fevs_base(cb, &frwrd_evt.info.immnd, 
> originatedAtThisNd, clnt_hdl, reply_dest);
>               break;
>  
>+      case IMMND_EVT_A2ND_OBJ_SAFE_READ:
>+              TRACE("IMMND_EVT_A2ND_OBJ_SAFE_READ Received over fevs");
>+              immnd_evt_safe_read_lock(cb, &frwrd_evt.info.immnd, 
>originatedAtThisNd, clnt_hdl, reply_dest);
>+              break;
>+
>       default:
>               LOG_ER("UNPACK FAILURE, unrecognized message type: %u over 
> FEVS", frwrd_evt.info.immnd.type);
>               break;
>diff --git a/osaf/services/saf/immsv/immnd/immnd_init.h 
>b/osaf/services/saf/immsv/immnd/immnd_init.h
>--- a/osaf/services/saf/immsv/immnd/immnd_init.h
>+++ b/osaf/services/saf/immsv/immnd/immnd_init.h
>@@ -202,6 +202,9 @@ extern "C" {
>  
>       SaAisErrorT immModel_searchInitialize(IMMND_CB *cb, struct 
> ImmsvOmSearchInit *req, void **searchOp, SaBoolT isSync, SaBoolT isAccessor);
>  
>+      SaAisErrorT immModel_objectIsLockedByCcb(IMMND_CB *cb, struct 
>ImmsvOmSearchInit *req);
>+      SaAisErrorT immModel_ccbReadLockObject(IMMND_CB *cb, struct 
>ImmsvOmSearchInit *req);
>+
>       SaAisErrorT immModel_testTopResult(void *searchOp, SaUint32T 
> *implNodeId, SaBoolT *bRtAttrsToFetch);
>  
>       SaAisErrorT
>
>------------------------------------------------------------------------------
>Transform Data into Opportunity.
>Accelerate data analysis in your applications with
>Intel Data Analytics Acceleration Library.
>Click to learn more.
>http://pubads.g.doubleclick.net/gampad/clk?id=278785351&iu=/4140
>_______________________________________________
>Opensaf-devel mailing list
>[email protected]
>https://lists.sourceforge.net/lists/listinfo/opensaf-devel
>
>
>------------------------------------------------------------------------------
>Transform Data into Opportunity.
>Accelerate data analysis in your applications with
>Intel Data Analytics Acceleration Library.
>Click to learn more.
>http://pubads.g.doubleclick.net/gampad/clk?id=278785471&iu=/4140
>_______________________________________________
>Opensaf-devel mailing list
>[email protected]
>https://lists.sourceforge.net/lists/listinfo/opensaf-devel
>
------------------------------------------------------------------------------
Transform Data into Opportunity.
Accelerate data analysis in your applications with
Intel Data Analytics Acceleration Library.
Click to learn more.
http://pubads.g.doubleclick.net/gampad/clk?id=278785471&iu=/4140
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to