Repository: incubator-mynewt-newtmgr Updated Branches: refs/heads/master 5fce9f8d2 -> b6629b106
nmxact - Ensure all fsm goroutines done Project: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/commit/b6629b10 Tree: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/tree/b6629b10 Diff: http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/diff/b6629b10 Branch: refs/heads/master Commit: b6629b106f4a1b74b13a44c8d93dbcc670523730 Parents: 5fce9f8 Author: Christopher Collins <ccoll...@apache.org> Authored: Tue Apr 11 16:35:58 2017 -0700 Committer: Christopher Collins <ccoll...@apache.org> Committed: Tue Apr 11 16:36:24 2017 -0700 ---------------------------------------------------------------------- nmxact/example/ble_dual/ble_dual.go | 72 ++++++++++++++++---------------- nmxact/nmble/ble_fsm.go | 17 +++++++- nmxact/nmxutil/nmxutil.go | 61 ++++++++++++--------------- 3 files changed, 79 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/b6629b10/nmxact/example/ble_dual/ble_dual.go ---------------------------------------------------------------------- diff --git a/nmxact/example/ble_dual/ble_dual.go b/nmxact/example/ble_dual/ble_dual.go index 958aaa1..e4615fe 100644 --- a/nmxact/example/ble_dual/ble_dual.go +++ b/nmxact/example/ble_dual/ble_dual.go @@ -26,15 +26,18 @@ import ( "sync" "syscall" + log "github.com/Sirupsen/logrus" + "mynewt.apache.org/newt/util" "mynewt.apache.org/newtmgr/nmxact/bledefs" "mynewt.apache.org/newtmgr/nmxact/nmble" + "mynewt.apache.org/newtmgr/nmxact/nmxutil" "mynewt.apache.org/newtmgr/nmxact/sesn" "mynewt.apache.org/newtmgr/nmxact/xact" "mynewt.apache.org/newtmgr/nmxact/xport" ) -func configExitHandler(x xport.Xport, s sesn.Sesn) { +func configExitHandler(x xport.Xport) { onExit := func() { x.Stop() } @@ -95,6 +98,8 @@ func sendOne(s sesn.Sesn) { } func main() { + nmxutil.SetLogLevel(log.InfoLevel) + // Initialize the BLE transport. params := nmble.NewXportCfg() params.SockPath = "/tmp/blehostd-uds" @@ -116,48 +121,45 @@ func main() { } defer x.Stop() - // Prepare a BLE session: - // * Plain NMP (not tunnelled over OIC). - // * We use a random address. - // * Peer has name "nimble-bleprph". - sc1 := sesn.NewSesnCfg() - sc1.MgmtProto = sesn.MGMT_PROTO_NMP - sc1.Ble.OwnAddrType = bledefs.BLE_ADDR_TYPE_RANDOM - sc1.Ble.PeerSpec = sesn.BlePeerSpecName("ccollins") + configExitHandler(x) - s1, err := x.BuildSesn(sc1) - if err != nil { - fmt.Fprintf(os.Stderr, "error creating BLE session: %s1\n", err.Error()) - os.Exit(1) + peerNames := []string{ + "ccollins", + "ccollins2", + "ccollins3", } - sc2 := sesn.NewSesnCfg() - sc2.MgmtProto = sesn.MGMT_PROTO_NMP - sc2.Ble.OwnAddrType = bledefs.BLE_ADDR_TYPE_RANDOM - sc2.Ble.PeerSpec = sesn.BlePeerSpecName("ccollins2") + sesns := []sesn.Sesn{} + for _, n := range peerNames { + // Prepare a BLE session: + // * Plain NMP (not tunnelled over OIC). + // * We use a random address. + // * Peer has name "nimble-bleprph". + sc := sesn.NewSesnCfg() + sc.MgmtProto = sesn.MGMT_PROTO_NMP + sc.Ble.OwnAddrType = bledefs.BLE_ADDR_TYPE_RANDOM + sc.Ble.PeerSpec = sesn.BlePeerSpecName(n) + + s, err := x.BuildSesn(sc) + if err != nil { + fmt.Fprintf(os.Stderr, "error creating BLE session: %s\n", + err.Error()) + os.Exit(1) + } - s2, err := x.BuildSesn(sc2) - if err != nil { - fmt.Fprintf(os.Stderr, "error creating BLE session: %s2\n", err.Error()) - os.Exit(1) + sesns = append(sesns, s) } - configExitHandler(x, s1) - var wg sync.WaitGroup - wg.Add(1) - go func() { - for { - sendOne(s1) - } - }() - wg.Add(1) - go func() { - for { - sendOne(s2) - } - }() + for _, s := range sesns { + wg.Add(1) + go func(x sesn.Sesn) { + for { + sendOne(x) + } + }(s) + } wg.Wait() } http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/b6629b10/nmxact/nmble/ble_fsm.go ---------------------------------------------------------------------- diff --git a/nmxact/nmble/ble_fsm.go b/nmxact/nmble/ble_fsm.go index ef5e69d..9a01a70 100644 --- a/nmxact/nmble/ble_fsm.go +++ b/nmxact/nmble/ble_fsm.go @@ -49,6 +49,11 @@ const ( FSM_DISCONNECT_TYPE_REQUESTED ) +type BleFsmListener struct { + bl *BleListener + abortChan chan struct{} +} + type BleRxNmpFn func(data []byte) type BleDisconnectFn func(dt BleFsmDisconnectType, peer BleDev, err error) @@ -78,6 +83,7 @@ type BleFsm struct { state BleSesnState errFunnel nmxutil.ErrFunnel id uint32 + wg sync.WaitGroup // Protects all accesses to the FSM state variable. stateMtx sync.Mutex @@ -145,6 +151,7 @@ func (bf *BleFsm) addBleListener(name string, base BleMsgBase) ( return nil, err } + bf.wg.Add(1) bf.bls[bl] = struct{}{} return bl, nil } @@ -178,6 +185,8 @@ func (bf *BleFsm) removeBleListener(name string, base BleMsgBase) { bl := bf.params.Bx.Bd.RemoveListener(base) delete(bf.bls, bl) + + bf.wg.Done() } func (bf *BleFsm) removeBleBaseListener(name string, base BleMsgBase) { @@ -266,7 +275,10 @@ func (bf *BleFsm) processErr(err error) { bf.peerDev = nil - go bf.params.DisconnectCb(dt, peer, err) + bf.wg.Wait() + + bf.errFunnel.Reset() + bf.params.DisconnectCb(dt, peer, err) } func (bf *BleFsm) connectListen(seq BleSeq) error { @@ -766,9 +778,10 @@ func (bf *BleFsm) executeState() (bool, error) { } func (bf *BleFsm) startOnce() (bool, error) { - bf.errFunnel.BlockUntilReset() + bf.errFunnel.Start() if !bf.IsClosed() { + bf.errFunnel.Reset() return false, nmxutil.NewSesnAlreadyOpenError(fmt.Sprintf( "Attempt to open an already-open BLE session (state=%d)", bf.getState())) http://git-wip-us.apache.org/repos/asf/incubator-mynewt-newtmgr/blob/b6629b10/nmxact/nmxutil/nmxutil.go ---------------------------------------------------------------------- diff --git a/nmxact/nmxutil/nmxutil.go b/nmxact/nmxutil/nmxutil.go index 4b232e9..2f967a0 100644 --- a/nmxact/nmxutil/nmxutil.go +++ b/nmxact/nmxutil/nmxutil.go @@ -51,17 +51,6 @@ func NewSingleResource() SingleResource { } } -func (s *SingleResource) removeWaiter(waiter chan error) { - s.mtx.Lock() - defer s.mtx.Unlock() - - for i, w := range s.waitQueue { - if w == waiter { - s.waitQueue = append(s.waitQueue[:i], s.waitQueue[i+1:]...) - } - } -} - func (s *SingleResource) Acquire() error { s.mtx.Lock() @@ -78,7 +67,6 @@ func (s *SingleResource) Acquire() error { err := <-w if err != nil { - s.removeWaiter(w) return err } @@ -89,6 +77,7 @@ func (s *SingleResource) Release() { s.mtx.Lock() if !s.acquired { + panic("SingleResource release without acquire") s.mtx.Unlock() return } @@ -114,6 +103,7 @@ func (s *SingleResource) Abort(err error) { for _, w := range s.waitQueue { w <- err } + s.waitQueue = [](chan error){} } type ErrLessFn func(a error, b error) bool @@ -130,6 +120,16 @@ type ErrFunnel struct { resetMtx sync.Mutex curErr error errTimer *time.Timer + started bool +} + +func (f *ErrFunnel) Start() { + f.resetMtx.Lock() + + f.mtx.Lock() + defer f.mtx.Unlock() + + f.started = true } func (f *ErrFunnel) Insert(err error) { @@ -140,10 +140,11 @@ func (f *ErrFunnel) Insert(err error) { f.mtx.Lock() defer f.mtx.Unlock() - if f.curErr == nil { - // Subsequent use attempts will block until the funnel is inactive. - f.resetMtx.Lock() + if !f.started { + panic("ErrFunnel insert without start") + } + if f.curErr == nil { f.curErr = err f.errTimer = time.AfterFunc(f.AccumDelay, func() { f.timerExp() @@ -159,35 +160,27 @@ func (f *ErrFunnel) Insert(err error) { } } -func (f *ErrFunnel) resetNoLock() { - if f.curErr != nil { - f.curErr = nil - f.errTimer.Stop() - f.resetMtx.Unlock() - } -} - func (f *ErrFunnel) Reset() { f.mtx.Lock() defer f.mtx.Unlock() - f.resetNoLock() -} - -func (f *ErrFunnel) BlockUntilReset() { - f.resetMtx.Lock() - f.resetMtx.Unlock() + if f.started { + f.started = false + f.curErr = nil + f.errTimer.Stop() + f.resetMtx.Unlock() + } } func (f *ErrFunnel) timerExp() { f.mtx.Lock() - defer f.mtx.Unlock() + err := f.curErr + f.curErr = nil + f.mtx.Unlock() - if f.curErr == nil { + if err == nil { panic("ErrFunnel timer expired but no error") } - f.ProcCb(f.curErr) - - f.resetNoLock() + f.ProcCb(err) }