This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 4d788d9 [Issue #153] Fix handler memory leak (#154) 4d788d9 is described below commit 4d788d9935edbd5ac0aa3c87344e5e4208b9ed6e Author: Rui Fu <freez...@users.noreply.github.com> AuthorDate: Fri Jan 17 05:43:05 2020 +0800 [Issue #153] Fix handler memory leak (#154) * fix producer handler memory leak * consumer removed from client handler when closed * golanglint * add tests & fix deadlock when handler close * make handlers delete in closeOnce to prevent unnecessary calls * goimports --- pulsar/consumer_impl.go | 3 +++ pulsar/internal/client_handlers.go | 15 +++++++++++++-- pulsar/internal/client_handlers_test.go | 32 +++++++++++++++++++++++++++++++- pulsar/producer_impl.go | 5 ++++- 4 files changed, 51 insertions(+), 4 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 3bad95a..ede02ef 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -41,6 +41,7 @@ type acker interface { } type consumer struct { + client *client options ConsumerOptions consumers []*partitionConsumer @@ -114,6 +115,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { func internalTopicSubscribe(client *client, options ConsumerOptions, topic string, messageCh chan ConsumerMessage) (*consumer, error) { consumer := &consumer{ + client: client, options: options, messageCh: messageCh, closeCh: make(chan struct{}), @@ -316,6 +318,7 @@ func (c *consumer) Close() { } wg.Wait() close(c.closeCh) + c.client.handlers.Del(c) }) } diff --git a/pulsar/internal/client_handlers.go b/pulsar/internal/client_handlers.go index 1ecfdd9..39b9bb7 100644 --- a/pulsar/internal/client_handlers.go +++ b/pulsar/internal/client_handlers.go @@ -36,6 +36,13 @@ func (h *ClientHandlers) Add(c Closable) { defer h.l.Unlock() h.handlers[c] = true } + +func (h *ClientHandlers) Del(c Closable) { + h.l.Lock() + defer h.l.Unlock() + delete(h.handlers, c) +} + func (h *ClientHandlers) Val(c Closable) bool { h.l.RLock() defer h.l.RUnlock() @@ -44,9 +51,13 @@ func (h *ClientHandlers) Val(c Closable) bool { func (h *ClientHandlers) Close() { h.l.Lock() - defer h.l.Unlock() - + handlers := make([]Closable, 0, len(h.handlers)) for handler := range h.handlers { + handlers = append(handlers, handler) + } + h.l.Unlock() + + for _, handler := range handlers { handler.Close() } } diff --git a/pulsar/internal/client_handlers_test.go b/pulsar/internal/client_handlers_test.go index baaf899..09dcf44 100644 --- a/pulsar/internal/client_handlers_test.go +++ b/pulsar/internal/client_handlers_test.go @@ -28,7 +28,7 @@ func TestClientHandlers(t *testing.T) { assert.NotNil(t, h.l) assert.Equal(t, h.handlers, map[Closable]bool{}) - closable := &testClosable{false} + closable := &testClosable{h: &h, closed: false} h.Add(closable) assert.True(t, h.Val(closable)) @@ -37,10 +37,40 @@ func TestClientHandlers(t *testing.T) { assert.True(t, closable.closed) } +func TestClientHandlers_Del(t *testing.T) { + h := NewClientHandlers() + assert.NotNil(t, h.l) + assert.Equal(t, h.handlers, map[Closable]bool{}) + + closable1 := &testClosable{h: &h, closed: false} + h.Add(closable1) + + closable2 := &testClosable{h: &h, closed: false} + h.Add(closable2) + + assert.Len(t, h.handlers, 2) + assert.True(t, h.Val(closable1)) + assert.True(t, h.Val(closable2)) + + closable1.Close() + assert.False(t, h.Val(closable1)) + assert.True(t, h.Val(closable2)) + assert.Len(t, h.handlers, 1) + + h.Close() + t.Log("closable1 is: closed ", closable1.closed) + t.Log("closable2 is: closed ", closable2.closed) + assert.True(t, closable1.closed) + assert.True(t, closable2.closed) + assert.Len(t, h.handlers, 0) +} + type testClosable struct { + h *ClientHandlers closed bool } func (t *testClosable) Close() { t.closed = true + t.h.Del(t) } diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index 101929d..83a6f75 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -24,6 +24,7 @@ import ( ) type producer struct { + client *client topic string producers []Producer messageRouter func(*ProducerMessage, TopicMetadata) int @@ -46,7 +47,8 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { } p := &producer{ - topic: options.Topic, + topic: options.Topic, + client: client, } if options.MessageRouter == nil { @@ -160,4 +162,5 @@ func (p *producer) Close() { for _, pp := range p.producers { pp.Close() } + p.client.handlers.Del(p) }