Repository: qpid-proton Updated Branches: refs/heads/master e769f784e -> 4a9f3b986
PROTON-1953: [go] occasional client/server hang with high volume of messages Due to race/deadlock in read/write and engine main goroutine. Simplified: - start read/write goroutines as needed - handle read/write completion via Inject(), no special channels Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/486fbaf0 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/486fbaf0 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/486fbaf0 Branch: refs/heads/master Commit: 486fbaf034f2d89688bb678914d798a1a1595cc5 Parents: e769f78 Author: Alan Conway <acon...@redhat.com> Authored: Wed Oct 10 16:09:33 2018 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Thu Oct 11 11:16:37 2018 -0400 ---------------------------------------------------------------------- go/src/qpid.apache.org/proton/engine.go | 173 ++++++++++----------------- 1 file changed, 64 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/486fbaf0/go/src/qpid.apache.org/proton/engine.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/proton/engine.go b/go/src/qpid.apache.org/proton/engine.go index d28a09f..af26a5f 100644 --- a/go/src/qpid.apache.org/proton/engine.go +++ b/go/src/qpid.apache.org/proton/engine.go @@ -103,15 +103,16 @@ type Engine struct { err ErrorHolder inject chan func() - conn net.Conn - connection Connection - transport Transport - collector *C.pn_collector_t - handlers []EventHandler // Handlers for proton events. - running chan struct{} // This channel will be closed when the goroutines are done. - closeOnce sync.Once - timer *time.Timer - traceEvent bool + conn net.Conn + connection Connection + transport Transport + collector *C.pn_collector_t + handlers []EventHandler // Handlers for proton events. + running chan struct{} // This channel will be closed when the goroutines are done. + closeOnce sync.Once + timer *time.Timer + traceEvent bool + reading, writing bool } const bufferSize = 4096 @@ -255,6 +256,12 @@ func (eng *Engine) Disconnect(err error) { <-eng.running } +// For debugging purposes: like Transport.Log() but takes a format string +// and works even if the transport has been freed. +func (eng *Engine) log(format string, args ...interface{}) { + fmt.Fprintf(os.Stderr, "[%p]: %v", eng.transport, fmt.Sprintf(format, args...)) +} + // Let proton run timed activity and set up the next tick func (eng *Engine) tick() { now := time.Now() @@ -281,16 +288,50 @@ func (eng *Engine) dispatch() bool { return !eng.transport.Closed() || C.pn_collector_peek(eng.collector) != nil } -func (eng *Engine) writeBuffer() []byte { - size := eng.Transport().Pending() // Evaluate before Head(), may change buffer. - start := eng.Transport().Head() - return cByteSlice(start, size) +func (eng *Engine) write() { + if !eng.writing { + size := eng.Transport().Pending() // Evaluate before Head(), may change buffer. + start := eng.Transport().Head() + if size > 0 { + eng.writing = true + go func() { // Blocking Write() in separate goroutineb + n, err := eng.conn.Write(cByteSlice(start, size)) + eng.Inject(func() { // Inject results of Write back to engine goroutine + eng.writing = false + if n > 0 { + eng.transport.Pop(uint(n)) + } + if err != nil { + eng.Transport().Condition().SetError(err) + eng.Transport().CloseHead() + } + }) + }() + } + } } -func (eng *Engine) readBuffer() []byte { - size := eng.Transport().Capacity() - start := eng.Transport().Tail() - return cByteSlice(start, size) +func (eng *Engine) read() { + if !eng.reading { + size := eng.Transport().Capacity() + start := eng.Transport().Tail() + if size > 0 { + eng.reading = true + go func() { // Blocking Read in separate goroutine + n, err := eng.conn.Read(cByteSlice(start, size)) + eng.Inject(func() { + eng.reading = false + if n > 0 { + eng.Transport().Process(uint(n)) + } + if err != nil { + eng.Transport().Condition().SetError(err) + eng.Transport().CloseTail() + } + }) + }() + } + } } func (eng *Engine) free() { @@ -317,106 +358,20 @@ func (eng *Engine) Run() error { defer eng.free() eng.transport.Bind(eng.connection) eng.tick() // Start ticking if needed - - // Channels for read and write buffers going in and out of the read/write goroutines. - // The channels are unbuffered: we want to exchange buffers in sequence. - readsIn, writesIn := make(chan []byte), make(chan []byte) - readsOut, writesOut := make(chan []byte), make(chan []byte) - - wait := sync.WaitGroup{} - wait.Add(2) // Read and write goroutines - - go func() { // Read goroutine - defer wait.Done() - for { - rbuf, ok := <-readsIn - if !ok { - return - } - n, err := eng.conn.Read(rbuf) - if n > 0 { - readsOut <- rbuf[:n] - } else if err != nil { - _ = eng.Inject(func() { - eng.Transport().Condition().SetError(err) - eng.Transport().CloseTail() - }) - return - } - } - }() - - go func() { // Write goroutine - defer wait.Done() - for { - wbuf, ok := <-writesIn - if !ok { - return - } - n, err := eng.conn.Write(wbuf) - if n > 0 { - writesOut <- wbuf[:n] - } else if err != nil { - _ = eng.Inject(func() { - eng.Transport().Condition().SetError(err) - eng.Transport().CloseHead() - }) - return - } - } - }() - for eng.dispatch() { - readBuf := eng.readBuffer() - writeBuf := eng.writeBuffer() - // Note that getting the buffers can generate events (eg. SASL events) that - // might close the transport. Check if we are already finished before - // blocking for IO. - if !eng.dispatch() { - break - } - - // sendReads/sendWrites are nil (not sendable in select) unless we have a - // buffer to read/write - var sendReads, sendWrites chan []byte - if readBuf != nil { - sendReads = readsIn - } - if writeBuf != nil { - sendWrites = writesIn - } - - // Send buffers to the read/write goroutines if we have them. - // Get buffers from the read/write goroutines and process them - // Check for injected functions + // Initiate read/write if needed + eng.read() + eng.write() select { - - case sendReads <- readBuf: - - case sendWrites <- writeBuf: - - case buf := <-readsOut: - eng.transport.Process(uint(len(buf))) - - case buf := <-writesOut: - eng.transport.Pop(uint(len(buf))) - - case f, ok := <-eng.inject: // Function injected from another goroutine - if ok { - f() - } - + case f := <-eng.inject: // User or IO action + f() case <-eng.timer.C: eng.tick() } } - eng.err.Set(EndpointError(eng.Connection())) eng.err.Set(eng.Transport().Condition().Error()) - close(readsIn) - close(writesIn) close(eng.running) // Signal goroutines have exited and Error is set, disable Inject() - _ = eng.conn.Close() // Close conn, force read/write goroutines to exit (they will Inject) - wait.Wait() // Wait for goroutines + _ = eng.conn.Close() // Close conn, force read goroutine to exit (Inject will fail) return eng.err.Get() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org