commit 0052c0e10cfd8e270a57d85711064e8d9e064bf5
Author: Cecylia Bocovich <coh...@torproject.org>
Date:   Tue Jun 16 17:49:39 2020 -0400

    Add a new heap at the broker for restricted flakes
    
    Now when proxies poll, they provide their NAT type to the broker. This
    introduces a new snowflake heap of just restricted snowflakes that the
    broker can pull from if the client has a known, unrestricted NAT. All
    other clients will pull from a heap of snowflakes with unrestricted or
    unknown NAT topologies.
---
 broker/broker.go                | 67 ++++++++++++++++++++++++++++++-----------
 broker/snowflake-broker_test.go | 14 ++++-----
 2 files changed, 57 insertions(+), 24 deletions(-)

diff --git a/broker/broker.go b/broker/broker.go
index 2d3cd4b..9297980 100644
--- a/broker/broker.go
+++ b/broker/broker.go
@@ -31,12 +31,18 @@ const (
        ClientTimeout = 10
        ProxyTimeout  = 10
        readLimit     = 100000 //Maximum number of bytes to be read from an 
HTTP request
+
+       NATUnknown      = "unknown"
+       NATRestricted   = "restricted"
+       NATUnrestricted = "unrestricted"
 )
 
 type BrokerContext struct {
-       snowflakes *SnowflakeHeap
-       // Map keeping track of snowflakeIDs required to match SDP answers from
-       // the second http POST.
+       snowflakes           *SnowflakeHeap
+       restrictedSnowflakes *SnowflakeHeap
+       // Maps keeping track of snowflakeIDs required to match SDP answers from
+       // the second http POST. Restricted snowflakes can only be matched up 
with
+       // clients behind an unrestricted NAT.
        idToSnowflake map[string]*Snowflake
        // Synchronization for the snowflake map and heap
        snowflakeLock sync.Mutex
@@ -47,6 +53,8 @@ type BrokerContext struct {
 func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
        snowflakes := new(SnowflakeHeap)
        heap.Init(snowflakes)
+       rSnowflakes := new(SnowflakeHeap)
+       heap.Init(rSnowflakes)
        metrics, err := NewMetrics(metricsLogger)
 
        if err != nil {
@@ -58,10 +66,11 @@ func NewBrokerContext(metricsLogger *log.Logger) 
*BrokerContext {
        }
 
        return &BrokerContext{
-               snowflakes:    snowflakes,
-               idToSnowflake: make(map[string]*Snowflake),
-               proxyPolls:    make(chan *ProxyPoll),
-               metrics:       metrics,
+               snowflakes:           snowflakes,
+               restrictedSnowflakes: rSnowflakes,
+               idToSnowflake:        make(map[string]*Snowflake),
+               proxyPolls:           make(chan *ProxyPoll),
+               metrics:              metrics,
        }
 }
 
@@ -79,7 +88,7 @@ type MetricsHandler struct {
 
 func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Access-Control-Allow-Origin", "*")
-       w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID")
+       w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID, 
Snowflake-NAT-Type")
        // Return early if it's CORS preflight.
        if "OPTIONS" == r.Method {
                return
@@ -101,15 +110,17 @@ func (mh MetricsHandler) ServeHTTP(w http.ResponseWriter, 
r *http.Request) {
 type ProxyPoll struct {
        id           string
        proxyType    string
+       natType      string
        offerChannel chan []byte
 }
 
 // Registers a Snowflake and waits for some Client to send an offer,
 // as part of the polling logic of the proxy handler.
-func (ctx *BrokerContext) RequestOffer(id string, proxyType string) []byte {
+func (ctx *BrokerContext) RequestOffer(id string, proxyType string, natType 
string) []byte {
        request := new(ProxyPoll)
        request.id = id
        request.proxyType = proxyType
+       request.natType = natType
        request.offerChannel = make(chan []byte)
        ctx.proxyPolls <- request
        // Block until an offer is available, or timeout which sends a nil 
offer.
@@ -122,7 +133,7 @@ func (ctx *BrokerContext) RequestOffer(id string, proxyType 
string) []byte {
 // client offer or nil on timeout / none are available.
 func (ctx *BrokerContext) Broker() {
        for request := range ctx.proxyPolls {
-               snowflake := ctx.AddSnowflake(request.id, request.proxyType)
+               snowflake := ctx.AddSnowflake(request.id, request.proxyType, 
request.natType)
                // Wait for a client to avail an offer to the snowflake.
                go func(request *ProxyPoll) {
                        select {
@@ -133,7 +144,11 @@ func (ctx *BrokerContext) Broker() {
                                ctx.snowflakeLock.Lock()
                                defer ctx.snowflakeLock.Unlock()
                                if snowflake.index != -1 {
-                                       heap.Remove(ctx.snowflakes, 
snowflake.index)
+                                       if request.natType == NATRestricted {
+                                               
heap.Remove(ctx.restrictedSnowflakes, snowflake.index)
+                                       } else {
+                                               heap.Remove(ctx.snowflakes, 
snowflake.index)
+                                       }
                                        delete(ctx.idToSnowflake, snowflake.id)
                                        close(request.offerChannel)
                                }
@@ -145,7 +160,7 @@ func (ctx *BrokerContext) Broker() {
 // Create and add a Snowflake to the heap.
 // Required to keep track of proxies between providing them
 // with an offer and awaiting their second POST with an answer.
-func (ctx *BrokerContext) AddSnowflake(id string, proxyType string) *Snowflake 
{
+func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType 
string) *Snowflake {
        snowflake := new(Snowflake)
        snowflake.id = id
        snowflake.clients = 0
@@ -153,7 +168,11 @@ func (ctx *BrokerContext) AddSnowflake(id string, 
proxyType string) *Snowflake {
        snowflake.offerChannel = make(chan []byte)
        snowflake.answerChannel = make(chan []byte)
        ctx.snowflakeLock.Lock()
-       heap.Push(ctx.snowflakes, snowflake)
+       if natType == NATRestricted {
+               heap.Push(ctx.restrictedSnowflakes, snowflake)
+       } else {
+               heap.Push(ctx.snowflakes, snowflake)
+       }
        ctx.snowflakeLock.Unlock()
        ctx.idToSnowflake[id] = snowflake
        return snowflake
@@ -170,7 +189,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, 
r *http.Request) {
                return
        }
 
-       sid, proxyType, _, err := messages.DecodePollRequest(body)
+       sid, proxyType, natType, err := messages.DecodePollRequest(body)
        if err != nil {
                w.WriteHeader(http.StatusBadRequest)
                return
@@ -187,7 +206,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, 
r *http.Request) {
        }
 
        // Wait for a client to avail an offer to the snowflake, or timeout if 
nil.
-       offer := ctx.RequestOffer(sid, proxyType)
+       offer := ctx.RequestOffer(sid, proxyType, natType)
        var b []byte
        if nil == offer {
                ctx.metrics.lock.Lock()
@@ -226,9 +245,23 @@ func clientOffers(ctx *BrokerContext, w 
http.ResponseWriter, r *http.Request) {
                w.WriteHeader(http.StatusBadRequest)
                return
        }
+
+       natType := r.Header.Get("Snowflake-NAT-Type")
+       if natType == "" {
+               natType = NATUnknown
+       }
+
+       // Only hand out known restricted snowflakes to unrestricted clients
+       var snowflakeHeap *SnowflakeHeap
+       if natType == NATUnrestricted {
+               snowflakeHeap = ctx.restrictedSnowflakes
+       } else {
+               snowflakeHeap = ctx.snowflakes
+       }
+
        // Immediately fail if there are no snowflakes available.
        ctx.snowflakeLock.Lock()
-       numSnowflakes := ctx.snowflakes.Len()
+       numSnowflakes := snowflakeHeap.Len()
        ctx.snowflakeLock.Unlock()
        if numSnowflakes <= 0 {
                ctx.metrics.lock.Lock()
@@ -240,7 +273,7 @@ func clientOffers(ctx *BrokerContext, w 
http.ResponseWriter, r *http.Request) {
        // Otherwise, find the most available snowflake proxy, and pass the 
offer to it.
        // Delete must be deferred in order to correctly process answer request 
later.
        ctx.snowflakeLock.Lock()
-       snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
+       snowflake := heap.Pop(snowflakeHeap).(*Snowflake)
        ctx.snowflakeLock.Unlock()
        snowflake.offerChannel <- offer
 
diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go
index 18b83dd..91383a1 100644
--- a/broker/snowflake-broker_test.go
+++ b/broker/snowflake-broker_test.go
@@ -29,7 +29,7 @@ func TestBroker(t *testing.T) {
                Convey("Adds Snowflake", func() {
                        So(ctx.snowflakes.Len(), ShouldEqual, 0)
                        So(len(ctx.idToSnowflake), ShouldEqual, 0)
-                       ctx.AddSnowflake("foo", "")
+                       ctx.AddSnowflake("foo", "", NATUnknown)
                        So(ctx.snowflakes.Len(), ShouldEqual, 1)
                        So(len(ctx.idToSnowflake), ShouldEqual, 1)
                })
@@ -55,7 +55,7 @@ func TestBroker(t *testing.T) {
                Convey("Request an offer from the Snowflake Heap", func() {
                        done := make(chan []byte)
                        go func() {
-                               offer := ctx.RequestOffer("test", "")
+                               offer := ctx.RequestOffer("test", "", 
NATUnknown)
                                done <- offer
                        }()
                        request := <-ctx.proxyPolls
@@ -79,7 +79,7 @@ func TestBroker(t *testing.T) {
                        Convey("with a proxy answer if available.", func() {
                                done := make(chan bool)
                                // Prepare a fake proxy to respond with.
-                               snowflake := ctx.AddSnowflake("fake", "")
+                               snowflake := ctx.AddSnowflake("fake", "", 
NATUnknown)
                                go func() {
                                        clientOffers(ctx, w, r)
                                        done <- true
@@ -97,7 +97,7 @@ func TestBroker(t *testing.T) {
                                        return
                                }
                                done := make(chan bool)
-                               snowflake := ctx.AddSnowflake("fake", "")
+                               snowflake := ctx.AddSnowflake("fake", "", 
NATUnknown)
                                go func() {
                                        clientOffers(ctx, w, r)
                                        // Takes a few seconds here...
@@ -147,7 +147,7 @@ func TestBroker(t *testing.T) {
                })
 
                Convey("Responds to proxy answers...", func() {
-                       s := ctx.AddSnowflake("test", "")
+                       s := ctx.AddSnowflake("test", "", NATUnknown)
                        w := httptest.NewRecorder()
                        data := 
bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`))
 
@@ -260,7 +260,7 @@ func TestBroker(t *testing.T) {
                        // Manually do the Broker goroutine action here for 
full control.
                        p := <-ctx.proxyPolls
                        So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp")
-                       s := ctx.AddSnowflake(p.id, "")
+                       s := ctx.AddSnowflake(p.id, "", NATUnknown)
                        go func() {
                                offer := <-s.offerChannel
                                p.offerChannel <- offer
@@ -537,7 +537,7 @@ func TestMetrics(t *testing.T) {
                        So(err, ShouldBeNil)
 
                        // Prepare a fake proxy to respond with.
-                       snowflake := ctx.AddSnowflake("fake", "")
+                       snowflake := ctx.AddSnowflake("fake", "", NATUnknown)
                        go func() {
                                clientOffers(ctx, w, r)
                                done <- true



_______________________________________________
tor-commits mailing list
tor-commits@lists.torproject.org
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits

Reply via email to