commit ac9d49b8727b953c12a76e3645fe71a9ec3aab75
Author: Serene H <g...@keroserene.net>
Date:   Mon Aug 1 12:17:28 2016 -0700

    ensure closing stale remotes from the client side
---
 client/snowflake.go |  3 +-
 client/webrtc.go    | 81 ++++++++++++++++++++++++++++++++++-------------------
 2 files changed, 54 insertions(+), 30 deletions(-)

diff --git a/client/snowflake.go b/client/snowflake.go
index 8dfa390..75e999f 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -20,6 +20,7 @@ import (
 const (
        ReconnectTimeout         = 10
        DefaultSnowflakeCapacity = 1
+       SnowflakeTimeout         = 30
 )
 
 // When a connection handler starts, +1 is written to this channel; when it
@@ -81,7 +82,7 @@ func handler(socks SocksConnector, snowflakes 
SnowflakeCollector) error {
                return errors.New("handler: Received invalid Snowflake")
        }
        defer socks.Close()
-       defer snowflake.Reset()
+       defer snowflake.Close()
        log.Println("---- Handler: snowflake assigned ----")
        err := socks.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0})
        if err != nil {
diff --git a/client/webrtc.go b/client/webrtc.go
index 1f7ac00..0492466 100644
--- a/client/webrtc.go
+++ b/client/webrtc.go
@@ -29,6 +29,7 @@ type WebRTCPeer struct {
        errorChannel  chan error
        recvPipe      *io.PipeReader
        writePipe     *io.PipeWriter
+       lastReceive   time.Time
        buffer        bytes.Buffer
        reset         chan struct{}
 
@@ -37,6 +38,28 @@ type WebRTCPeer struct {
        BytesLogger
 }
 
+// Construct a WebRTC PeerConnection.
+func NewWebRTCPeer(config *webrtc.Configuration,
+       broker *BrokerChannel) *WebRTCPeer {
+       connection := new(WebRTCPeer)
+       connection.id = "snowflake-" + uniuri.New()
+       connection.config = config
+       connection.broker = broker
+       connection.offerChannel = make(chan *webrtc.SessionDescription, 1)
+       connection.answerChannel = make(chan *webrtc.SessionDescription, 1)
+       // Error channel is mostly for reporting during the initial SDP offer
+       // creation & local description setting, which happens asynchronously.
+       connection.errorChannel = make(chan error, 1)
+       connection.reset = make(chan struct{}, 1)
+
+       // Override with something that's not NullLogger to have real logging.
+       connection.BytesLogger = &BytesNullLogger{}
+
+       // Pipes remain the same even when DataChannel gets switched.
+       connection.recvPipe, connection.writePipe = io.Pipe()
+       return connection
+}
+
 // Read bytes from local SOCKS.
 // As part of |io.ReadWriter|
 func (c *WebRTCPeer) Read(b []byte) (int, error) {
@@ -47,6 +70,7 @@ func (c *WebRTCPeer) Read(b []byte) (int, error) {
 // As part of |io.ReadWriter|
 func (c *WebRTCPeer) Write(b []byte) (int, error) {
        c.BytesLogger.AddOutbound(len(b))
+       // TODO: Buffering could be improved / separated out of WebRTCPeer.
        if nil == c.transport {
                log.Printf("Buffered %d bytes --> WebRTC", len(b))
                c.buffer.Write(b)
@@ -61,45 +85,42 @@ func (c *WebRTCPeer) Close() error {
        if c.closed { // Skip if already closed.
                return nil
        }
-       log.Printf("WebRTC: Closing")
-       c.cleanup()
        // Mark for deletion.
        c.closed = true
+       c.cleanup()
+       c.Reset()
+       log.Printf("WebRTC: Closing")
        return nil
 }
 
 // As part of |Resetter|
 func (c *WebRTCPeer) Reset() {
-       c.Close()
-       go func() {
-               c.reset <- struct{}{}
-               log.Println("WebRTC resetting...")
-       }()
+       if nil == c.reset {
+               return
+       }
+       c.reset <- struct{}{}
 }
 
 // As part of |Resetter|
 func (c *WebRTCPeer) WaitForReset() { <-c.reset }
 
-// Construct a WebRTC PeerConnection.
-func NewWebRTCPeer(config *webrtc.Configuration,
-       broker *BrokerChannel) *WebRTCPeer {
-       connection := new(WebRTCPeer)
-       connection.id = "snowflake-" + uniuri.New()
-       connection.config = config
-       connection.broker = broker
-       connection.offerChannel = make(chan *webrtc.SessionDescription, 1)
-       connection.answerChannel = make(chan *webrtc.SessionDescription, 1)
-       // Error channel is mostly for reporting during the initial SDP offer
-       // creation & local description setting, which happens asynchronously.
-       connection.errorChannel = make(chan error, 1)
-       connection.reset = make(chan struct{}, 1)
-
-       // Override with something that's not NullLogger to have real logging.
-       connection.BytesLogger = &BytesNullLogger{}
-
-       // Pipes remain the same even when DataChannel gets switched.
-       connection.recvPipe, connection.writePipe = io.Pipe()
-       return connection
+// Prevent long-lived broken remotes.
+// Should also update the DataChannel in underlying go-webrtc's to make Closes
+// more immediate / responsive.
+func (c *WebRTCPeer) checkForStaleness() {
+       c.lastReceive = time.Now()
+       for {
+               if c.closed {
+                       return
+               }
+               if time.Since(c.lastReceive).Seconds() > SnowflakeTimeout {
+                       log.Println("WebRTC: No messages received for", 
SnowflakeTimeout,
+                               "seconds -- closing stale connection.")
+                       c.Close()
+                       return
+               }
+               <-time.After(time.Second)
+       }
 }
 
 // As part of |Connector| interface.
@@ -119,6 +140,7 @@ func (c *WebRTCPeer) Connect() error {
        if err != nil {
                return err
        }
+       go c.checkForStaleness()
        return nil
 }
 
@@ -208,7 +230,7 @@ func (c *WebRTCPeer) establishDataChannel() error {
                // Disable the DataChannel as a write destination.
                log.Println("WebRTC: DataChannel.OnClose [remotely]")
                c.transport = nil
-               c.Reset()
+               c.Close()
        }
        dc.OnMessage = func(msg []byte) {
                if len(msg) <= 0 {
@@ -225,6 +247,7 @@ func (c *WebRTCPeer) establishDataChannel() error {
                        log.Println("Error: short write")
                        panic("short write")
                }
+               c.lastReceive = time.Now()
        }
        log.Println("WebRTC: DataChannel created.")
        return nil
@@ -257,7 +280,7 @@ func (c *WebRTCPeer) exchangeSDP() error {
                }
        case err := <-c.errorChannel:
                log.Println("Failed to prepare offer", err)
-               c.Reset()
+               c.Close()
                return err
        }
        // Keep trying the same offer until a valid answer arrives.

_______________________________________________
tor-commits mailing list
tor-commits@lists.torproject.org
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits

Reply via email to