This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d788d9  [Issue #153] Fix handler memory leak (#154)
4d788d9 is described below

commit 4d788d9935edbd5ac0aa3c87344e5e4208b9ed6e
Author: Rui Fu <freez...@users.noreply.github.com>
AuthorDate: Fri Jan 17 05:43:05 2020 +0800

    [Issue #153] Fix handler memory leak (#154)
    
    * fix producer handler memory leak
    
    * consumer removed from client handler when closed
    
    * golanglint
    
    * add tests & fix deadlock when handler close
    
    * make handlers delete in closeOnce to prevent unnecessary calls
    
    * goimports
---
 pulsar/consumer_impl.go                 |  3 +++
 pulsar/internal/client_handlers.go      | 15 +++++++++++++--
 pulsar/internal/client_handlers_test.go | 32 +++++++++++++++++++++++++++++++-
 pulsar/producer_impl.go                 |  5 ++++-
 4 files changed, 51 insertions(+), 4 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 3bad95a..ede02ef 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -41,6 +41,7 @@ type acker interface {
 }
 
 type consumer struct {
+       client    *client
        options   ConsumerOptions
        consumers []*partitionConsumer
 
@@ -114,6 +115,7 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
 func internalTopicSubscribe(client *client, options ConsumerOptions, topic 
string,
        messageCh chan ConsumerMessage) (*consumer, error) {
        consumer := &consumer{
+               client:    client,
                options:   options,
                messageCh: messageCh,
                closeCh:   make(chan struct{}),
@@ -316,6 +318,7 @@ func (c *consumer) Close() {
                }
                wg.Wait()
                close(c.closeCh)
+               c.client.handlers.Del(c)
        })
 }
 
diff --git a/pulsar/internal/client_handlers.go 
b/pulsar/internal/client_handlers.go
index 1ecfdd9..39b9bb7 100644
--- a/pulsar/internal/client_handlers.go
+++ b/pulsar/internal/client_handlers.go
@@ -36,6 +36,13 @@ func (h *ClientHandlers) Add(c Closable) {
        defer h.l.Unlock()
        h.handlers[c] = true
 }
+
+func (h *ClientHandlers) Del(c Closable) {
+       h.l.Lock()
+       defer h.l.Unlock()
+       delete(h.handlers, c)
+}
+
 func (h *ClientHandlers) Val(c Closable) bool {
        h.l.RLock()
        defer h.l.RUnlock()
@@ -44,9 +51,13 @@ func (h *ClientHandlers) Val(c Closable) bool {
 
 func (h *ClientHandlers) Close() {
        h.l.Lock()
-       defer h.l.Unlock()
-
+       handlers := make([]Closable, 0, len(h.handlers))
        for handler := range h.handlers {
+               handlers = append(handlers, handler)
+       }
+       h.l.Unlock()
+
+       for _, handler := range handlers {
                handler.Close()
        }
 }
diff --git a/pulsar/internal/client_handlers_test.go 
b/pulsar/internal/client_handlers_test.go
index baaf899..09dcf44 100644
--- a/pulsar/internal/client_handlers_test.go
+++ b/pulsar/internal/client_handlers_test.go
@@ -28,7 +28,7 @@ func TestClientHandlers(t *testing.T) {
        assert.NotNil(t, h.l)
        assert.Equal(t, h.handlers, map[Closable]bool{})
 
-       closable := &testClosable{false}
+       closable := &testClosable{h: &h, closed: false}
        h.Add(closable)
        assert.True(t, h.Val(closable))
 
@@ -37,10 +37,40 @@ func TestClientHandlers(t *testing.T) {
        assert.True(t, closable.closed)
 }
 
+func TestClientHandlers_Del(t *testing.T) {
+       h := NewClientHandlers()
+       assert.NotNil(t, h.l)
+       assert.Equal(t, h.handlers, map[Closable]bool{})
+
+       closable1 := &testClosable{h: &h, closed: false}
+       h.Add(closable1)
+
+       closable2 := &testClosable{h: &h, closed: false}
+       h.Add(closable2)
+
+       assert.Len(t, h.handlers, 2)
+       assert.True(t, h.Val(closable1))
+       assert.True(t, h.Val(closable2))
+
+       closable1.Close()
+       assert.False(t, h.Val(closable1))
+       assert.True(t, h.Val(closable2))
+       assert.Len(t, h.handlers, 1)
+
+       h.Close()
+       t.Log("closable1 is: closed ", closable1.closed)
+       t.Log("closable2 is: closed ", closable2.closed)
+       assert.True(t, closable1.closed)
+       assert.True(t, closable2.closed)
+       assert.Len(t, h.handlers, 0)
+}
+
 type testClosable struct {
+       h      *ClientHandlers
        closed bool
 }
 
 func (t *testClosable) Close() {
        t.closed = true
+       t.h.Del(t)
 }
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 101929d..83a6f75 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -24,6 +24,7 @@ import (
 )
 
 type producer struct {
+       client        *client
        topic         string
        producers     []Producer
        messageRouter func(*ProducerMessage, TopicMetadata) int
@@ -46,7 +47,8 @@ func newProducer(client *client, options *ProducerOptions) 
(*producer, error) {
        }
 
        p := &producer{
-               topic: options.Topic,
+               topic:  options.Topic,
+               client: client,
        }
 
        if options.MessageRouter == nil {
@@ -160,4 +162,5 @@ func (p *producer) Close() {
        for _, pp := range p.producers {
                pp.Close()
        }
+       p.client.handlers.Del(p)
 }

Reply via email to