This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
commit 050d47dcb1e3138d093941f9a00641c7caf7c3fd Author: Matteo Merli <mme...@apache.org> AuthorDate: Thu Sep 6 07:40:24 2018 -0700 [go] Ensure producer/consumer/reader keep a ref of client instance so it won't be finalized (#2527) --- pulsar-client-go/pulsar/c_consumer.go | 3 ++- pulsar-client-go/pulsar/c_producer.go | 12 +++++++----- pulsar-client-go/pulsar/c_reader.go | 3 ++- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go index 1b41a71..c78a58e 100644 --- a/pulsar-client-go/pulsar/c_consumer.go +++ b/pulsar-client-go/pulsar/c_consumer.go @@ -32,6 +32,7 @@ import ( ) type consumer struct { + client *client ptr *C.pulsar_consumer_t defaultChannel chan ConsumerMessage } @@ -76,7 +77,7 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu conf := C.pulsar_consumer_configuration_create() - consumer := &consumer{} + consumer := &consumer{client: client} if options.MessageChannel == nil { // If there is no message listener, set a default channel so that we can have receive to diff --git a/pulsar-client-go/pulsar/c_producer.go b/pulsar-client-go/pulsar/c_producer.go index 284315d..620b64d 100644 --- a/pulsar-client-go/pulsar/c_producer.go +++ b/pulsar-client-go/pulsar/c_producer.go @@ -24,13 +24,14 @@ package pulsar */ import "C" import ( + "context" "runtime" - "unsafe" "time" - "context" + "unsafe" ) type createProducerCtx struct { + client *client callback func(producer Producer, err error) conf *C.pulsar_producer_configuration_t } @@ -44,7 +45,7 @@ func pulsarCreateProducerCallbackProxy(res C.pulsar_result, ptr *C.pulsar_produc if res != C.pulsar_result_Ok { producerCtx.callback(nil, newError(res, "Failed to create Producer")) } else { - p := &producer{ptr: ptr} + p := &producer{client: producerCtx.client, ptr: ptr} runtime.SetFinalizer(p, producerFinalizer) producerCtx.callback(p, nil) } @@ -140,7 +141,7 @@ func createProducerAsync(client *client, options ProducerOptions, callback func( defer C.free(unsafe.Pointer(topicName)) C._pulsar_client_create_producer_async(client.ptr, topicName, conf, - savePointer(createProducerCtx{callback, conf})) + savePointer(createProducerCtx{client,callback, conf})) } type topicMetadata struct { @@ -161,7 +162,8 @@ func pulsarRouterCallbackProxy(msg *C.pulsar_message_t, metadata *C.pulsar_topic /// Producer type producer struct { - ptr *C.pulsar_producer_t + client *client + ptr *C.pulsar_producer_t } func producerFinalizer(p *producer) { diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go index 04bb5cf..7336c1a 100644 --- a/pulsar-client-go/pulsar/c_reader.go +++ b/pulsar-client-go/pulsar/c_reader.go @@ -31,6 +31,7 @@ import ( ) type reader struct { + client *client ptr *C.pulsar_reader_t defaultChannel chan ReaderMessage } @@ -73,7 +74,7 @@ func createReaderAsync(client *client, options ReaderOptions, callback func(Read return } - reader := &reader{} + reader := &reader{client: client} if options.MessageChannel == nil { // If there is no message listener, set a default channel so that we can have receive to