Repository: incubator-mynewt-newtmgr
Updated Branches:
  refs/heads/master 5fce9f8d2 -> b6629b106


nmxact - Ensure all fsm goroutines done


Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/b6629b10
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/b6629b10
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/b6629b10

Branch: refs/heads/master
Commit: b6629b106f4a1b74b13a44c8d93dbcc670523730
Parents: 5fce9f8
Author: Christopher Collins <ccoll...@apache.org>
Authored: Tue Apr 11 16:35:58 2017 -0700
Committer: Christopher Collins <ccoll...@apache.org>
Committed: Tue Apr 11 16:36:24 2017 -0700

----------------------------------------------------------------------
 nmxact/example/ble_dual/ble_dual.go | 72 ++++++++++++++++----------------
 nmxact/nmble/ble_fsm.go             | 17 +++++++-
 nmxact/nmxutil/nmxutil.go           | 61 ++++++++++++---------------
 3 files changed, 79 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/b6629b10/nmxact/example/ble_dual/ble_dual.go
----------------------------------------------------------------------
diff --git a/nmxact/example/ble_dual/ble_dual.go 
b/nmxact/example/ble_dual/ble_dual.go
index 958aaa1..e4615fe 100644
--- a/nmxact/example/ble_dual/ble_dual.go
+++ b/nmxact/example/ble_dual/ble_dual.go
@@ -26,15 +26,18 @@ import (
        "sync"
        "syscall"
 
+       log "github.com/Sirupsen/logrus"
+
        "mynewt.apache.org/newt/util"
        "mynewt.apache.org/newtmgr/nmxact/bledefs"
        "mynewt.apache.org/newtmgr/nmxact/nmble"
+       "mynewt.apache.org/newtmgr/nmxact/nmxutil"
        "mynewt.apache.org/newtmgr/nmxact/sesn"
        "mynewt.apache.org/newtmgr/nmxact/xact"
        "mynewt.apache.org/newtmgr/nmxact/xport"
 )
 
-func configExitHandler(x xport.Xport, s sesn.Sesn) {
+func configExitHandler(x xport.Xport) {
        onExit := func() {
                x.Stop()
        }
@@ -95,6 +98,8 @@ func sendOne(s sesn.Sesn) {
 }
 
 func main() {
+       nmxutil.SetLogLevel(log.InfoLevel)
+
        // Initialize the BLE transport.
        params := nmble.NewXportCfg()
        params.SockPath = "/tmp/blehostd-uds"
@@ -116,48 +121,45 @@ func main() {
        }
        defer x.Stop()
 
-       // Prepare a BLE session:
-       //     * Plain NMP (not tunnelled over OIC).
-       //     * We use a random address.
-       //     * Peer has name "nimble-bleprph".
-       sc1 := sesn.NewSesnCfg()
-       sc1.MgmtProto = sesn.MGMT_PROTO_NMP
-       sc1.Ble.OwnAddrType = bledefs.BLE_ADDR_TYPE_RANDOM
-       sc1.Ble.PeerSpec = sesn.BlePeerSpecName("ccollins")
+       configExitHandler(x)
 
-       s1, err := x.BuildSesn(sc1)
-       if err != nil {
-               fmt.Fprintf(os.Stderr, "error creating BLE session: %s1\n", 
err.Error())
-               os.Exit(1)
+       peerNames := []string{
+               "ccollins",
+               "ccollins2",
+               "ccollins3",
        }
 
-       sc2 := sesn.NewSesnCfg()
-       sc2.MgmtProto = sesn.MGMT_PROTO_NMP
-       sc2.Ble.OwnAddrType = bledefs.BLE_ADDR_TYPE_RANDOM
-       sc2.Ble.PeerSpec = sesn.BlePeerSpecName("ccollins2")
+       sesns := []sesn.Sesn{}
+       for _, n := range peerNames {
+               // Prepare a BLE session:
+               //     * Plain NMP (not tunnelled over OIC).
+               //     * We use a random address.
+               //     * Peer has name "nimble-bleprph".
+               sc := sesn.NewSesnCfg()
+               sc.MgmtProto = sesn.MGMT_PROTO_NMP
+               sc.Ble.OwnAddrType = bledefs.BLE_ADDR_TYPE_RANDOM
+               sc.Ble.PeerSpec = sesn.BlePeerSpecName(n)
+
+               s, err := x.BuildSesn(sc)
+               if err != nil {
+                       fmt.Fprintf(os.Stderr, "error creating BLE session: 
%s\n",
+                               err.Error())
+                       os.Exit(1)
+               }
 
-       s2, err := x.BuildSesn(sc2)
-       if err != nil {
-               fmt.Fprintf(os.Stderr, "error creating BLE session: %s2\n", 
err.Error())
-               os.Exit(1)
+               sesns = append(sesns, s)
        }
 
-       configExitHandler(x, s1)
-
        var wg sync.WaitGroup
-       wg.Add(1)
-       go func() {
-               for {
-                       sendOne(s1)
-               }
-       }()
-       wg.Add(1)
 
-       go func() {
-               for {
-                       sendOne(s2)
-               }
-       }()
+       for _, s := range sesns {
+               wg.Add(1)
+               go func(x sesn.Sesn) {
+                       for {
+                               sendOne(x)
+                       }
+               }(s)
+       }
 
        wg.Wait()
 }

http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/b6629b10/nmxact/nmble/ble_fsm.go
----------------------------------------------------------------------
diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go
index ef5e69d..9a01a70 100644
--- a/nmxact/nmble/ble_fsm.go
+++ b/nmxact/nmble/ble_fsm.go
@@ -49,6 +49,11 @@ const (
        FSM_DISCONNECT_TYPE_REQUESTED
 )
 
+type BleFsmListener struct {
+       bl        *BleListener
+       abortChan chan struct{}
+}
+
 type BleRxNmpFn func(data []byte)
 type BleDisconnectFn func(dt BleFsmDisconnectType, peer BleDev, err error)
 
@@ -78,6 +83,7 @@ type BleFsm struct {
        state      BleSesnState
        errFunnel  nmxutil.ErrFunnel
        id         uint32
+       wg         sync.WaitGroup
 
        // Protects all accesses to the FSM state variable.
        stateMtx sync.Mutex
@@ -145,6 +151,7 @@ func (bf *BleFsm) addBleListener(name string, base 
BleMsgBase) (
                return nil, err
        }
 
+       bf.wg.Add(1)
        bf.bls[bl] = struct{}{}
        return bl, nil
 }
@@ -178,6 +185,8 @@ func (bf *BleFsm) removeBleListener(name string, base 
BleMsgBase) {
 
        bl := bf.params.Bx.Bd.RemoveListener(base)
        delete(bf.bls, bl)
+
+       bf.wg.Done()
 }
 
 func (bf *BleFsm) removeBleBaseListener(name string, base BleMsgBase) {
@@ -266,7 +275,10 @@ func (bf *BleFsm) processErr(err error) {
 
        bf.peerDev = nil
 
-       go bf.params.DisconnectCb(dt, peer, err)
+       bf.wg.Wait()
+
+       bf.errFunnel.Reset()
+       bf.params.DisconnectCb(dt, peer, err)
 }
 
 func (bf *BleFsm) connectListen(seq BleSeq) error {
@@ -766,9 +778,10 @@ func (bf *BleFsm) executeState() (bool, error) {
 }
 
 func (bf *BleFsm) startOnce() (bool, error) {
-       bf.errFunnel.BlockUntilReset()
+       bf.errFunnel.Start()
 
        if !bf.IsClosed() {
+               bf.errFunnel.Reset()
                return false, nmxutil.NewSesnAlreadyOpenError(fmt.Sprintf(
                        "Attempt to open an already-open BLE session 
(state=%d)",
                        bf.getState()))

http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/b6629b10/nmxact/nmxutil/nmxutil.go
----------------------------------------------------------------------
diff --git a/nmxact/nmxutil/nmxutil.go b/nmxact/nmxutil/nmxutil.go
index 4b232e9..2f967a0 100644
--- a/nmxact/nmxutil/nmxutil.go
+++ b/nmxact/nmxutil/nmxutil.go
@@ -51,17 +51,6 @@ func NewSingleResource() SingleResource {
        }
 }
 
-func (s *SingleResource) removeWaiter(waiter chan error) {
-       s.mtx.Lock()
-       defer s.mtx.Unlock()
-
-       for i, w := range s.waitQueue {
-               if w == waiter {
-                       s.waitQueue = append(s.waitQueue[:i], 
s.waitQueue[i+1:]...)
-               }
-       }
-}
-
 func (s *SingleResource) Acquire() error {
        s.mtx.Lock()
 
@@ -78,7 +67,6 @@ func (s *SingleResource) Acquire() error {
 
        err := <-w
        if err != nil {
-               s.removeWaiter(w)
                return err
        }
 
@@ -89,6 +77,7 @@ func (s *SingleResource) Release() {
        s.mtx.Lock()
 
        if !s.acquired {
+               panic("SingleResource release without acquire")
                s.mtx.Unlock()
                return
        }
@@ -114,6 +103,7 @@ func (s *SingleResource) Abort(err error) {
        for _, w := range s.waitQueue {
                w <- err
        }
+       s.waitQueue = [](chan error){}
 }
 
 type ErrLessFn func(a error, b error) bool
@@ -130,6 +120,16 @@ type ErrFunnel struct {
        resetMtx sync.Mutex
        curErr   error
        errTimer *time.Timer
+       started  bool
+}
+
+func (f *ErrFunnel) Start() {
+       f.resetMtx.Lock()
+
+       f.mtx.Lock()
+       defer f.mtx.Unlock()
+
+       f.started = true
 }
 
 func (f *ErrFunnel) Insert(err error) {
@@ -140,10 +140,11 @@ func (f *ErrFunnel) Insert(err error) {
        f.mtx.Lock()
        defer f.mtx.Unlock()
 
-       if f.curErr == nil {
-               // Subsequent use attempts will block until the funnel is 
inactive.
-               f.resetMtx.Lock()
+       if !f.started {
+               panic("ErrFunnel insert without start")
+       }
 
+       if f.curErr == nil {
                f.curErr = err
                f.errTimer = time.AfterFunc(f.AccumDelay, func() {
                        f.timerExp()
@@ -159,35 +160,27 @@ func (f *ErrFunnel) Insert(err error) {
        }
 }
 
-func (f *ErrFunnel) resetNoLock() {
-       if f.curErr != nil {
-               f.curErr = nil
-               f.errTimer.Stop()
-               f.resetMtx.Unlock()
-       }
-}
-
 func (f *ErrFunnel) Reset() {
        f.mtx.Lock()
        defer f.mtx.Unlock()
 
-       f.resetNoLock()
-}
-
-func (f *ErrFunnel) BlockUntilReset() {
-       f.resetMtx.Lock()
-       f.resetMtx.Unlock()
+       if f.started {
+               f.started = false
+               f.curErr = nil
+               f.errTimer.Stop()
+               f.resetMtx.Unlock()
+       }
 }
 
 func (f *ErrFunnel) timerExp() {
        f.mtx.Lock()
-       defer f.mtx.Unlock()
+       err := f.curErr
+       f.curErr = nil
+       f.mtx.Unlock()
 
-       if f.curErr == nil {
+       if err == nil {
                panic("ErrFunnel timer expired but no error")
        }
 
-       f.ProcCb(f.curErr)
-
-       f.resetNoLock()
+       f.ProcCb(err)
 }

Reply via email to