This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit 050d47dcb1e3138d093941f9a00641c7caf7c3fd
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Thu Sep 6 07:40:24 2018 -0700

    [go] Ensure producer/consumer/reader keep a ref of client instance so it 
won't be finalized (#2527)
---
 pulsar-client-go/pulsar/c_consumer.go |  3 ++-
 pulsar-client-go/pulsar/c_producer.go | 12 +++++++-----
 pulsar-client-go/pulsar/c_reader.go   |  3 ++-
 3 files changed, 11 insertions(+), 7 deletions(-)

diff --git a/pulsar-client-go/pulsar/c_consumer.go 
b/pulsar-client-go/pulsar/c_consumer.go
index 1b41a71..c78a58e 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -32,6 +32,7 @@ import (
 )
 
 type consumer struct {
+       client         *client
        ptr            *C.pulsar_consumer_t
        defaultChannel chan ConsumerMessage
 }
@@ -76,7 +77,7 @@ func subscribeAsync(client *client, options ConsumerOptions, 
callback func(Consu
 
        conf := C.pulsar_consumer_configuration_create()
 
-       consumer := &consumer{}
+       consumer := &consumer{client: client}
 
        if options.MessageChannel == nil {
                // If there is no message listener, set a default channel so 
that we can have receive to
diff --git a/pulsar-client-go/pulsar/c_producer.go 
b/pulsar-client-go/pulsar/c_producer.go
index 284315d..620b64d 100644
--- a/pulsar-client-go/pulsar/c_producer.go
+++ b/pulsar-client-go/pulsar/c_producer.go
@@ -24,13 +24,14 @@ package pulsar
 */
 import "C"
 import (
+       "context"
        "runtime"
-       "unsafe"
        "time"
-       "context"
+       "unsafe"
 )
 
 type createProducerCtx struct {
+       client   *client
        callback func(producer Producer, err error)
        conf     *C.pulsar_producer_configuration_t
 }
@@ -44,7 +45,7 @@ func pulsarCreateProducerCallbackProxy(res C.pulsar_result, 
ptr *C.pulsar_produc
        if res != C.pulsar_result_Ok {
                producerCtx.callback(nil, newError(res, "Failed to create 
Producer"))
        } else {
-               p := &producer{ptr: ptr}
+               p := &producer{client: producerCtx.client, ptr: ptr}
                runtime.SetFinalizer(p, producerFinalizer)
                producerCtx.callback(p, nil)
        }
@@ -140,7 +141,7 @@ func createProducerAsync(client *client, options 
ProducerOptions, callback func(
        defer C.free(unsafe.Pointer(topicName))
 
        C._pulsar_client_create_producer_async(client.ptr, topicName, conf,
-               savePointer(createProducerCtx{callback, conf}))
+               savePointer(createProducerCtx{client,callback, conf}))
 }
 
 type topicMetadata struct {
@@ -161,7 +162,8 @@ func pulsarRouterCallbackProxy(msg *C.pulsar_message_t, 
metadata *C.pulsar_topic
 /// Producer
 
 type producer struct {
-       ptr *C.pulsar_producer_t
+       client *client
+       ptr    *C.pulsar_producer_t
 }
 
 func producerFinalizer(p *producer) {
diff --git a/pulsar-client-go/pulsar/c_reader.go 
b/pulsar-client-go/pulsar/c_reader.go
index 04bb5cf..7336c1a 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -31,6 +31,7 @@ import (
 )
 
 type reader struct {
+       client         *client
        ptr            *C.pulsar_reader_t
        defaultChannel chan ReaderMessage
 }
@@ -73,7 +74,7 @@ func createReaderAsync(client *client, options ReaderOptions, 
callback func(Read
                return
        }
 
-       reader := &reader{}
+       reader := &reader{client: client}
 
        if options.MessageChannel == nil {
                // If there is no message listener, set a default channel so 
that we can have receive to

Reply via email to