This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new a1b4211  add ctx to rmqClient & remoteClient. resolve #176 (#177)
a1b4211 is described below

commit a1b42115ec7856d66f3563bbae36512d8a1cecbc
Author: xujianhai666 <[email protected]>
AuthorDate: Mon Aug 26 10:51:54 2019 +0800

    add ctx to rmqClient & remoteClient. resolve #176 (#177)
---
 consumer/consumer.go                  | 12 ++---
 consumer/offset_store.go              |  5 +-
 consumer/offset_store_test.go         |  2 +-
 consumer/push_consumer.go             |  2 +-
 internal/client.go                    | 24 ++++-----
 internal/mock_client.go               | 91 ++++++++---------------------------
 internal/remote/future.go             | 11 +++--
 internal/remote/mock_remote_client.go | 25 +++++-----
 internal/remote/remote_client.go      | 27 ++++++-----
 internal/remote/remote_client_test.go | 21 ++++----
 internal/route.go                     |  3 +-
 internal/route_test.go                |  5 +-
 internal/trace.go                     |  2 +-
 producer/producer.go                  | 10 ++--
 producer/producer_test.go             |  8 +--
 15 files changed, 102 insertions(+), 146 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 9bed213..b3454a6 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -546,7 +546,7 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
 func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody) 
[]primitive.MessageQueue {
        data, _ := json.Marshal(body)
        request := remote.NewRemotingCommand(internal.ReqLockBatchMQ, nil, data)
-       response, err := dc.client.InvokeSync(addr, request, 1*time.Second)
+       response, err := dc.client.InvokeSync(context.Background(), addr, 
request, 1*time.Second)
        if err != nil {
                rlog.Errorf("lock mq to broker: %s error %s", addr, err.Error())
                return nil
@@ -566,12 +566,12 @@ func (dc *defaultConsumer) doUnlock(addr string, body 
*lockBatchRequestBody, one
        data, _ := json.Marshal(body)
        request := remote.NewRemotingCommand(internal.ReqUnlockBatchMQ, nil, 
data)
        if oneway {
-               err := dc.client.InvokeOneWay(addr, request, 3*time.Second)
+               err := dc.client.InvokeOneWay(context.Background(), addr, 
request, 3*time.Second)
                if err != nil {
                        rlog.Errorf("lock mq to broker with oneway: %s error 
%s", addr, err.Error())
                }
        } else {
-               response, err := dc.client.InvokeSync(addr, request, 
1*time.Second)
+               response, err := dc.client.InvokeSync(context.Background(), 
addr, request, 1*time.Second)
                if err != nil {
                        rlog.Errorf("lock mq to broker: %s error %s", addr, 
err.Error())
                }
@@ -832,7 +832,7 @@ func (dc *defaultConsumer) findConsumerList(topic string) 
[]string {
                        ConsumerGroup: dc.consumerGroup,
                }
                cmd := 
remote.NewRemotingCommand(internal.ReqGetConsumerListByGroup, req, nil)
-               res, err := dc.client.InvokeSync(brokerAddr, cmd, 
3*time.Second) // TODO 超时机制有问题
+               res, err := dc.client.InvokeSync(context.Background(), 
brokerAddr, cmd, 3*time.Second) // TODO 超时机制有问题
                if err != nil {
                        rlog.Errorf("get consumer list of [%s] from %s error: 
%s", dc.consumerGroup, brokerAddr, err.Error())
                        return nil
@@ -869,7 +869,7 @@ func (dc *defaultConsumer) queryMaxOffset(mq 
*primitive.MessageQueue) (int64, er
        }
 
        cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil)
-       response, err := dc.client.InvokeSync(brokerAddr, cmd, 3*time.Second)
+       response, err := dc.client.InvokeSync(context.Background(), brokerAddr, 
cmd, 3*time.Second)
        if err != nil {
                return -1, err
        }
@@ -899,7 +899,7 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq 
*primitive.MessageQueue, t
        }
 
        cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, 
request, nil)
-       response, err := dc.client.InvokeSync(brokerAddr, cmd, 3*time.Second)
+       response, err := dc.client.InvokeSync(context.Background(), brokerAddr, 
cmd, 3*time.Second)
        if err != nil {
                return -1, err
        }
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 4d1b7e9..71d2c87 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -18,6 +18,7 @@ limitations under the License.
 package consumer
 
 import (
+       "context"
        "encoding/json"
        "fmt"
        "os"
@@ -302,7 +303,7 @@ func (r *remoteBrokerOffsetStore) 
fetchConsumeOffsetFromBroker(group string, mq
                QueueId:       mq.QueueId,
        }
        cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, 
queryOffsetRequest, nil)
-       res, err := r.client.InvokeSync(broker, cmd, 3*time.Second)
+       res, err := r.client.InvokeSync(context.Background(), broker, cmd, 
3*time.Second)
        if err != nil {
                return -1, err
        }
@@ -336,7 +337,7 @@ func (r *remoteBrokerOffsetStore) 
updateConsumeOffsetToBroker(group, topic strin
                CommitOffset:  queue.Offset,
        }
        cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, 
updateOffsetRequest, nil)
-       return r.client.InvokeOneWay(broker, cmd, 5*time.Second)
+       return r.client.InvokeOneWay(context.Background(), broker, cmd, 
5*time.Second)
 }
 
 func readFromMemory(table map[string]map[int]*queueOffset, mq 
*primitive.MessageQueue) int64 {
diff --git a/consumer/offset_store_test.go b/consumer/offset_store_test.go
index aa81271..4f6e5ef 100644
--- a/consumer/offset_store_test.go
+++ b/consumer/offset_store_test.go
@@ -279,7 +279,7 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
                                        "offset": "1",
                                },
                        }
-                       rmqClient.EXPECT().InvokeSync(gomock.Any(), 
gomock.Any(), gomock.Any()).Return(ret, nil)
+                       rmqClient.EXPECT().InvokeSync(gomock.Any(), 
gomock.Any(), gomock.Any(), gomock.Any()).Return(ret, nil)
 
                        remoteStore.persist(queues)
                        offset := remoteStore.read(mq, _ReadFromStore)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d5a5ffc..e8aabdc 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -515,7 +515,7 @@ func (pc *pushConsumer) sendMessageBack(brokerName string, 
msg *primitive.Messag
        } else {
                brokerAddr = msg.StoreHost
        }
-       _, err := pc.client.InvokeSync(brokerAddr, pc.buildSendBackRequest(msg, 
delayLevel), 3*time.Second)
+       _, err := pc.client.InvokeSync(context.Background(), brokerAddr, 
pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
        if err != nil {
                return false
        }
diff --git a/internal/client.go b/internal/client.go
index 8822e2d..cb4cfb6 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -129,11 +129,11 @@ type RMQClient interface {
        ClientID() string
 
        RegisterProducer(group string, producer InnerProducer)
-       InvokeSync(addr string, request *remote.RemotingCommand,
+       InvokeSync(ctx context.Context, addr string, request 
*remote.RemotingCommand,
                timeoutMillis time.Duration) (*remote.RemotingCommand, error)
-       InvokeAsync(addr string, request *remote.RemotingCommand,
+       InvokeAsync(ctx context.Context, addr string, request 
*remote.RemotingCommand,
                timeoutMillis time.Duration, f func(*remote.RemotingCommand, 
error)) error
-       InvokeOneWay(addr string, request *remote.RemotingCommand,
+       InvokeOneWay(ctx context.Context, addr string, request 
*remote.RemotingCommand,
                timeoutMillis time.Duration) error
        CheckClientInBroker()
        SendHeartbeatToAllBrokerWithLock()
@@ -291,31 +291,31 @@ func (c *rmqClient) ClientID() string {
        return id
 }
 
-func (c *rmqClient) InvokeSync(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeSync(ctx context.Context, addr string, request 
*remote.RemotingCommand,
        timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
        if c.close {
                return nil, ErrServiceState
        }
-       return c.remoteClient.InvokeSync(addr, request, timeoutMillis)
+       return c.remoteClient.InvokeSync(ctx, addr, request, timeoutMillis)
 }
 
-func (c *rmqClient) InvokeAsync(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeAsync(ctx context.Context, addr string, request 
*remote.RemotingCommand,
        timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) 
error {
        if c.close {
                return ErrServiceState
        }
-       return c.remoteClient.InvokeAsync(addr, request, timeoutMillis, 
func(future *remote.ResponseFuture) {
+       return c.remoteClient.InvokeAsync(ctx, addr, request, timeoutMillis, 
func(future *remote.ResponseFuture) {
                f(future.ResponseCommand, future.Err)
        })
 
 }
 
-func (c *rmqClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeOneWay(ctx context.Context, addr string, request 
*remote.RemotingCommand,
        timeoutMillis time.Duration) error {
        if c.close {
                return ErrServiceState
        }
-       return c.remoteClient.InvokeOneWay(addr, request, timeoutMillis)
+       return c.remoteClient.InvokeOneWay(ctx, addr, request, timeoutMillis)
 }
 
 func (c *rmqClient) CheckClientInBroker() {
@@ -357,7 +357,7 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
                data := value.(*BrokerData)
                for id, addr := range data.BrokerAddresses {
                        cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, 
hbData.encode())
-                       response, err := c.remoteClient.InvokeSync(addr, cmd, 
3*time.Second)
+                       response, err := 
c.remoteClient.InvokeSync(context.Background(), addr, cmd, 3*time.Second)
                        if err != nil {
                                rlog.Warnf("send heart beat to broker error: 
%s", err.Error())
                                return true
@@ -417,7 +417,7 @@ func (c *rmqClient) SendMessageAsync(ctx context.Context, 
brokerAddrs, brokerNam
 func (c *rmqClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, 
request *SendMessageRequest,
        msgs []*primitive.Message) (*primitive.SendResult, error) {
        cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, 
encodeMessages(msgs))
-       err := c.remoteClient.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
+       err := c.remoteClient.InvokeOneWay(ctx, brokerAddrs, cmd, 3*time.Second)
        if err != nil {
                rlog.Warnf("send messages with oneway error: %v", err)
        }
@@ -473,7 +473,7 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, 
cmd *remote.RemotingC
 // PullMessage with sync
 func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest) (*primitive.PullResult, error) {
        cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
-       res, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 10*time.Second)
+       res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd, 
10*time.Second)
        if err != nil {
                return nil, err
        }
diff --git a/internal/mock_client.go b/internal/mock_client.go
index af9cf2c..a216e93 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -21,13 +21,14 @@ limitations under the License.
 package internal
 
 import (
-       context "context"
-       reflect "reflect"
-       time "time"
+       "context"
+       "reflect"
+       "time"
 
-       remote "github.com/apache/rocketmq-client-go/internal/remote"
-       primitive "github.com/apache/rocketmq-client-go/primitive"
-       gomock "github.com/golang/mock/gomock"
+       "github.com/golang/mock/gomock"
+
+       "github.com/apache/rocketmq-client-go/internal/remote"
+       "github.com/apache/rocketmq-client-go/primitive"
 )
 
 // MockInnerProducer is a mock of InnerProducer interface
@@ -55,7 +56,6 @@ func (m *MockInnerProducer) EXPECT() 
*MockInnerProducerMockRecorder {
 
 // PublishTopicList mocks base method
 func (m *MockInnerProducer) PublishTopicList() []string {
-       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "PublishTopicList")
        ret0, _ := ret[0].([]string)
        return ret0
@@ -63,25 +63,21 @@ func (m *MockInnerProducer) PublishTopicList() []string {
 
 // PublishTopicList indicates an expected call of PublishTopicList
 func (mr *MockInnerProducerMockRecorder) PublishTopicList() *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"PublishTopicList", reflect.TypeOf((*MockInnerProducer)(nil).PublishTopicList))
 }
 
 // UpdateTopicPublishInfo mocks base method
 func (m *MockInnerProducer) UpdateTopicPublishInfo(topic string, info 
*TopicPublishInfo) {
-       m.ctrl.T.Helper()
        m.ctrl.Call(m, "UpdateTopicPublishInfo", topic, info)
 }
 
 // UpdateTopicPublishInfo indicates an expected call of UpdateTopicPublishInfo
 func (mr *MockInnerProducerMockRecorder) UpdateTopicPublishInfo(topic, info 
interface{}) *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"UpdateTopicPublishInfo", 
reflect.TypeOf((*MockInnerProducer)(nil).UpdateTopicPublishInfo), topic, info)
 }
 
 // IsPublishTopicNeedUpdate mocks base method
 func (m *MockInnerProducer) IsPublishTopicNeedUpdate(topic string) bool {
-       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "IsPublishTopicNeedUpdate", topic)
        ret0, _ := ret[0].(bool)
        return ret0
@@ -89,13 +85,11 @@ func (m *MockInnerProducer) IsPublishTopicNeedUpdate(topic 
string) bool {
 
 // IsPublishTopicNeedUpdate indicates an expected call of 
IsPublishTopicNeedUpdate
 func (mr *MockInnerProducerMockRecorder) IsPublishTopicNeedUpdate(topic 
interface{}) *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"IsPublishTopicNeedUpdate", 
reflect.TypeOf((*MockInnerProducer)(nil).IsPublishTopicNeedUpdate), topic)
 }
 
 // IsUnitMode mocks base method
 func (m *MockInnerProducer) IsUnitMode() bool {
-       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "IsUnitMode")
        ret0, _ := ret[0].(bool)
        return ret0
@@ -103,7 +97,6 @@ func (m *MockInnerProducer) IsUnitMode() bool {
 
 // IsUnitMode indicates an expected call of IsUnitMode
 func (mr *MockInnerProducerMockRecorder) IsUnitMode() *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnitMode", 
reflect.TypeOf((*MockInnerProducer)(nil).IsUnitMode))
 }
 
@@ -132,7 +125,6 @@ func (m *MockInnerConsumer) EXPECT() 
*MockInnerConsumerMockRecorder {
 
 // PersistConsumerOffset mocks base method
 func (m *MockInnerConsumer) PersistConsumerOffset() error {
-       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "PersistConsumerOffset")
        ret0, _ := ret[0].(error)
        return ret0
@@ -140,25 +132,21 @@ func (m *MockInnerConsumer) PersistConsumerOffset() error 
{
 
 // PersistConsumerOffset indicates an expected call of PersistConsumerOffset
 func (mr *MockInnerConsumerMockRecorder) PersistConsumerOffset() *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"PersistConsumerOffset", 
reflect.TypeOf((*MockInnerConsumer)(nil).PersistConsumerOffset))
 }
 
 // UpdateTopicSubscribeInfo mocks base method
 func (m *MockInnerConsumer) UpdateTopicSubscribeInfo(topic string, mqs 
[]*primitive.MessageQueue) {
-       m.ctrl.T.Helper()
        m.ctrl.Call(m, "UpdateTopicSubscribeInfo", topic, mqs)
 }
 
 // UpdateTopicSubscribeInfo indicates an expected call of 
UpdateTopicSubscribeInfo
 func (mr *MockInnerConsumerMockRecorder) UpdateTopicSubscribeInfo(topic, mqs 
interface{}) *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"UpdateTopicSubscribeInfo", 
reflect.TypeOf((*MockInnerConsumer)(nil).UpdateTopicSubscribeInfo), topic, mqs)
 }
 
 // IsSubscribeTopicNeedUpdate mocks base method
 func (m *MockInnerConsumer) IsSubscribeTopicNeedUpdate(topic string) bool {
-       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "IsSubscribeTopicNeedUpdate", topic)
        ret0, _ := ret[0].(bool)
        return ret0
@@ -166,13 +154,11 @@ func (m *MockInnerConsumer) 
IsSubscribeTopicNeedUpdate(topic string) bool {
 
 // IsSubscribeTopicNeedUpdate indicates an expected call of 
IsSubscribeTopicNeedUpdate
 func (mr *MockInnerConsumerMockRecorder) IsSubscribeTopicNeedUpdate(topic 
interface{}) *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"IsSubscribeTopicNeedUpdate", 
reflect.TypeOf((*MockInnerConsumer)(nil).IsSubscribeTopicNeedUpdate), topic)
 }
 
 // SubscriptionDataList mocks base method
 func (m *MockInnerConsumer) SubscriptionDataList() []*SubscriptionData {
-       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "SubscriptionDataList")
        ret0, _ := ret[0].([]*SubscriptionData)
        return ret0
@@ -180,25 +166,21 @@ func (m *MockInnerConsumer) SubscriptionDataList() 
[]*SubscriptionData {
 
 // SubscriptionDataList indicates an expected call of SubscriptionDataList
 func (mr *MockInnerConsumerMockRecorder) SubscriptionDataList() *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"SubscriptionDataList", 
reflect.TypeOf((*MockInnerConsumer)(nil).SubscriptionDataList))
 }
 
 // Rebalance mocks base method
 func (m *MockInnerConsumer) Rebalance() {
-       m.ctrl.T.Helper()
        m.ctrl.Call(m, "Rebalance")
 }
 
 // Rebalance indicates an expected call of Rebalance
 func (mr *MockInnerConsumerMockRecorder) Rebalance() *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Rebalance", 
reflect.TypeOf((*MockInnerConsumer)(nil).Rebalance))
 }
 
 // IsUnitMode mocks base method
 func (m *MockInnerConsumer) IsUnitMode() bool {
-       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "IsUnitMode")
        ret0, _ := ret[0].(bool)
        return ret0
@@ -206,7 +188,6 @@ func (m *MockInnerConsumer) IsUnitMode() bool {
 
 // IsUnitMode indicates an expected call of IsUnitMode
 func (mr *MockInnerConsumerMockRecorder) IsUnitMode() *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnitMode", 
reflect.TypeOf((*MockInnerConsumer)(nil).IsUnitMode))
 }
 
@@ -235,31 +216,26 @@ func (m *MockRMQClient) EXPECT() 
*MockRMQClientMockRecorder {
 
 // Start mocks base method
 func (m *MockRMQClient) Start() {
-       m.ctrl.T.Helper()
        m.ctrl.Call(m, "Start")
 }
 
 // Start indicates an expected call of Start
 func (mr *MockRMQClientMockRecorder) Start() *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", 
reflect.TypeOf((*MockRMQClient)(nil).Start))
 }
 
 // Shutdown mocks base method
 func (m *MockRMQClient) Shutdown() {
-       m.ctrl.T.Helper()
        m.ctrl.Call(m, "Shutdown")
 }
 
 // Shutdown indicates an expected call of Shutdown
 func (mr *MockRMQClientMockRecorder) Shutdown() *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Shutdown", 
reflect.TypeOf((*MockRMQClient)(nil).Shutdown))
 }
 
 // ClientID mocks base method
 func (m *MockRMQClient) ClientID() string {
-       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "ClientID")
        ret0, _ := ret[0].(string)
        return ret0
@@ -267,104 +243,88 @@ func (m *MockRMQClient) ClientID() string {
 
 // ClientID indicates an expected call of ClientID
 func (mr *MockRMQClientMockRecorder) ClientID() *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientID", 
reflect.TypeOf((*MockRMQClient)(nil).ClientID))
 }
 
 // RegisterProducer mocks base method
 func (m *MockRMQClient) RegisterProducer(group string, producer InnerProducer) 
{
-       m.ctrl.T.Helper()
        m.ctrl.Call(m, "RegisterProducer", group, producer)
 }
 
 // RegisterProducer indicates an expected call of RegisterProducer
 func (mr *MockRMQClientMockRecorder) RegisterProducer(group, producer 
interface{}) *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"RegisterProducer", reflect.TypeOf((*MockRMQClient)(nil).RegisterProducer), 
group, producer)
 }
 
 // InvokeSync mocks base method
-func (m *MockRMQClient) InvokeSync(addr string, request 
*remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, 
error) {
-       m.ctrl.T.Helper()
-       ret := m.ctrl.Call(m, "InvokeSync", addr, request, timeoutMillis)
+func (m *MockRMQClient) InvokeSync(ctx context.Context, addr string, request 
*remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, 
error) {
+       ret := m.ctrl.Call(m, "InvokeSync", ctx, addr, request, timeoutMillis)
        ret0, _ := ret[0].(*remote.RemotingCommand)
        ret1, _ := ret[1].(error)
        return ret0, ret1
 }
 
 // InvokeSync indicates an expected call of InvokeSync
-func (mr *MockRMQClientMockRecorder) InvokeSync(addr, request, timeoutMillis 
interface{}) *gomock.Call {
-       mr.mock.ctrl.T.Helper()
-       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", 
reflect.TypeOf((*MockRMQClient)(nil).InvokeSync), addr, request, timeoutMillis)
+func (mr *MockRMQClientMockRecorder) InvokeSync(ctx, addr, request, 
timeoutMillis interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", 
reflect.TypeOf((*MockRMQClient)(nil).InvokeSync), ctx, addr, request, 
timeoutMillis)
 }
 
 // InvokeAsync mocks base method
-func (m *MockRMQClient) InvokeAsync(addr string, request 
*remote.RemotingCommand, timeoutMillis time.Duration, f 
func(*remote.RemotingCommand, error)) error {
-       m.ctrl.T.Helper()
-       ret := m.ctrl.Call(m, "InvokeAsync", addr, request, timeoutMillis, f)
+func (m *MockRMQClient) InvokeAsync(ctx context.Context, addr string, request 
*remote.RemotingCommand, timeoutMillis time.Duration, f 
func(*remote.RemotingCommand, error)) error {
+       ret := m.ctrl.Call(m, "InvokeAsync", ctx, addr, request, timeoutMillis, 
f)
        ret0, _ := ret[0].(error)
        return ret0
 }
 
 // InvokeAsync indicates an expected call of InvokeAsync
-func (mr *MockRMQClientMockRecorder) InvokeAsync(addr, request, timeoutMillis, 
f interface{}) *gomock.Call {
-       mr.mock.ctrl.T.Helper()
-       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", 
reflect.TypeOf((*MockRMQClient)(nil).InvokeAsync), addr, request, 
timeoutMillis, f)
+func (mr *MockRMQClientMockRecorder) InvokeAsync(ctx, addr, request, 
timeoutMillis, f interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", 
reflect.TypeOf((*MockRMQClient)(nil).InvokeAsync), ctx, addr, request, 
timeoutMillis, f)
 }
 
 // InvokeOneWay mocks base method
-func (m *MockRMQClient) InvokeOneWay(addr string, request 
*remote.RemotingCommand, timeoutMillis time.Duration) error {
-       m.ctrl.T.Helper()
-       ret := m.ctrl.Call(m, "InvokeOneWay", addr, request, timeoutMillis)
+func (m *MockRMQClient) InvokeOneWay(ctx context.Context, addr string, request 
*remote.RemotingCommand, timeoutMillis time.Duration) error {
+       ret := m.ctrl.Call(m, "InvokeOneWay", ctx, addr, request, timeoutMillis)
        ret0, _ := ret[0].(error)
        return ret0
 }
 
 // InvokeOneWay indicates an expected call of InvokeOneWay
-func (mr *MockRMQClientMockRecorder) InvokeOneWay(addr, request, timeoutMillis 
interface{}) *gomock.Call {
-       mr.mock.ctrl.T.Helper()
-       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", 
reflect.TypeOf((*MockRMQClient)(nil).InvokeOneWay), addr, request, 
timeoutMillis)
+func (mr *MockRMQClientMockRecorder) InvokeOneWay(ctx, addr, request, 
timeoutMillis interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", 
reflect.TypeOf((*MockRMQClient)(nil).InvokeOneWay), ctx, addr, request, 
timeoutMillis)
 }
 
 // CheckClientInBroker mocks base method
 func (m *MockRMQClient) CheckClientInBroker() {
-       m.ctrl.T.Helper()
        m.ctrl.Call(m, "CheckClientInBroker")
 }
 
 // CheckClientInBroker indicates an expected call of CheckClientInBroker
 func (mr *MockRMQClientMockRecorder) CheckClientInBroker() *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"CheckClientInBroker", 
reflect.TypeOf((*MockRMQClient)(nil).CheckClientInBroker))
 }
 
 // SendHeartbeatToAllBrokerWithLock mocks base method
 func (m *MockRMQClient) SendHeartbeatToAllBrokerWithLock() {
-       m.ctrl.T.Helper()
        m.ctrl.Call(m, "SendHeartbeatToAllBrokerWithLock")
 }
 
 // SendHeartbeatToAllBrokerWithLock indicates an expected call of 
SendHeartbeatToAllBrokerWithLock
 func (mr *MockRMQClientMockRecorder) SendHeartbeatToAllBrokerWithLock() 
*gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"SendHeartbeatToAllBrokerWithLock", 
reflect.TypeOf((*MockRMQClient)(nil).SendHeartbeatToAllBrokerWithLock))
 }
 
 // UpdateTopicRouteInfo mocks base method
 func (m *MockRMQClient) UpdateTopicRouteInfo() {
-       m.ctrl.T.Helper()
        m.ctrl.Call(m, "UpdateTopicRouteInfo")
 }
 
 // UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
 func (mr *MockRMQClientMockRecorder) UpdateTopicRouteInfo() *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"UpdateTopicRouteInfo", 
reflect.TypeOf((*MockRMQClient)(nil).UpdateTopicRouteInfo))
 }
 
 // ProcessSendResponse mocks base method
 func (m *MockRMQClient) ProcessSendResponse(brokerName string, cmd 
*remote.RemotingCommand, resp *primitive.SendResult, msgs 
...*primitive.Message) error {
-       m.ctrl.T.Helper()
        varargs := []interface{}{brokerName, cmd, resp}
        for _, a := range msgs {
                varargs = append(varargs, a)
@@ -376,14 +336,12 @@ func (m *MockRMQClient) ProcessSendResponse(brokerName 
string, cmd *remote.Remot
 
 // ProcessSendResponse indicates an expected call of ProcessSendResponse
 func (mr *MockRMQClientMockRecorder) ProcessSendResponse(brokerName, cmd, resp 
interface{}, msgs ...interface{}) *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        varargs := append([]interface{}{brokerName, cmd, resp}, msgs...)
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"ProcessSendResponse", 
reflect.TypeOf((*MockRMQClient)(nil).ProcessSendResponse), varargs...)
 }
 
 // RegisterConsumer mocks base method
 func (m *MockRMQClient) RegisterConsumer(group string, consumer InnerConsumer) 
error {
-       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "RegisterConsumer", group, consumer)
        ret0, _ := ret[0].(error)
        return ret0
@@ -391,25 +349,21 @@ func (m *MockRMQClient) RegisterConsumer(group string, 
consumer InnerConsumer) e
 
 // RegisterConsumer indicates an expected call of RegisterConsumer
 func (mr *MockRMQClientMockRecorder) RegisterConsumer(group, consumer 
interface{}) *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"RegisterConsumer", reflect.TypeOf((*MockRMQClient)(nil).RegisterConsumer), 
group, consumer)
 }
 
 // UnregisterConsumer mocks base method
 func (m *MockRMQClient) UnregisterConsumer(group string) {
-       m.ctrl.T.Helper()
        m.ctrl.Call(m, "UnregisterConsumer", group)
 }
 
 // UnregisterConsumer indicates an expected call of UnregisterConsumer
 func (mr *MockRMQClientMockRecorder) UnregisterConsumer(group interface{}) 
*gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"UnregisterConsumer", reflect.TypeOf((*MockRMQClient)(nil).UnregisterConsumer), 
group)
 }
 
 // PullMessage mocks base method
 func (m *MockRMQClient) PullMessage(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest) (*primitive.PullResult, error) {
-       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "PullMessage", ctx, brokerAddrs, request)
        ret0, _ := ret[0].(*primitive.PullResult)
        ret1, _ := ret[1].(error)
@@ -418,13 +372,11 @@ func (m *MockRMQClient) PullMessage(ctx context.Context, 
brokerAddrs string, req
 
 // PullMessage indicates an expected call of PullMessage
 func (mr *MockRMQClientMockRecorder) PullMessage(ctx, brokerAddrs, request 
interface{}) *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessage", 
reflect.TypeOf((*MockRMQClient)(nil).PullMessage), ctx, brokerAddrs, request)
 }
 
 // PullMessageAsync mocks base method
 func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs 
string, request *PullMessageRequest, f func(*primitive.PullResult)) error {
-       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "PullMessageAsync", ctx, brokerAddrs, request, f)
        ret0, _ := ret[0].(error)
        return ret0
@@ -432,30 +384,25 @@ func (m *MockRMQClient) PullMessageAsync(ctx 
context.Context, brokerAddrs string
 
 // PullMessageAsync indicates an expected call of PullMessageAsync
 func (mr *MockRMQClientMockRecorder) PullMessageAsync(ctx, brokerAddrs, 
request, f interface{}) *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"PullMessageAsync", reflect.TypeOf((*MockRMQClient)(nil).PullMessageAsync), 
ctx, brokerAddrs, request, f)
 }
 
 // RebalanceImmediately mocks base method
 func (m *MockRMQClient) RebalanceImmediately() {
-       m.ctrl.T.Helper()
        m.ctrl.Call(m, "RebalanceImmediately")
 }
 
 // RebalanceImmediately indicates an expected call of RebalanceImmediately
 func (mr *MockRMQClientMockRecorder) RebalanceImmediately() *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"RebalanceImmediately", 
reflect.TypeOf((*MockRMQClient)(nil).RebalanceImmediately))
 }
 
 // UpdatePublishInfo mocks base method
 func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
-       m.ctrl.T.Helper()
        m.ctrl.Call(m, "UpdatePublishInfo", topic, data)
 }
 
 // UpdatePublishInfo indicates an expected call of UpdatePublishInfo
 func (mr *MockRMQClientMockRecorder) UpdatePublishInfo(topic, data 
interface{}) *gomock.Call {
-       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"UpdatePublishInfo", reflect.TypeOf((*MockRMQClient)(nil).UpdatePublishInfo), 
topic, data)
 }
diff --git a/internal/remote/future.go b/internal/remote/future.go
index 8690644..4d3008f 100644
--- a/internal/remote/future.go
+++ b/internal/remote/future.go
@@ -18,6 +18,7 @@ limitations under the License.
 package remote
 
 import (
+       "context"
        "sync"
        "time"
 
@@ -35,16 +36,18 @@ type ResponseFuture struct {
        BeginTimestamp  time.Duration
        Done            chan bool
        callbackOnce    sync.Once
+       ctx             context.Context
 }
 
 // NewResponseFuture create ResponseFuture with opaque, timeout and callback
-func NewResponseFuture(opaque int32, timeout time.Duration, callback 
func(*ResponseFuture)) *ResponseFuture {
+func NewResponseFuture(ctx context.Context, opaque int32, timeout 
time.Duration, callback func(*ResponseFuture)) *ResponseFuture {
        return &ResponseFuture{
                Opaque:         opaque,
                Done:           make(chan bool),
                Timeout:        timeout,
                callback:       callback,
                BeginTimestamp: time.Duration(time.Now().Unix()) * time.Second,
+               ctx:            ctx,
        }
 }
 
@@ -66,19 +69,19 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, 
error) {
                cmd *RemotingCommand
                err error
        )
-       timer := time.NewTimer(r.Timeout)
+       ctx, cancel := context.WithTimeout(r.ctx, r.Timeout)
+       defer cancel()
        for {
                select {
                case <-r.Done:
                        cmd, err = r.ResponseCommand, r.Err
                        goto done
-               case <-timer.C:
+               case <-ctx.Done():
                        err = utils.ErrRequestTimeout
                        r.Err = err
                        goto done
                }
        }
 done:
-       timer.Stop()
        return cmd, err
 }
diff --git a/internal/remote/mock_remote_client.go 
b/internal/remote/mock_remote_client.go
index 62a91c5..e264ab2 100644
--- a/internal/remote/mock_remote_client.go
+++ b/internal/remote/mock_remote_client.go
@@ -21,6 +21,7 @@
 package remote
 
 import (
+       context "context"
        primitive "github.com/apache/rocketmq-client-go/primitive"
        gomock "github.com/golang/mock/gomock"
        reflect "reflect"
@@ -75,40 +76,40 @@ func (mr *MockRemotingClientMockRecorder) 
RegisterInterceptor(interceptors ...in
 }
 
 // InvokeSync mocks base method
-func (m *MockRemotingClient) InvokeSync(addr string, request *RemotingCommand, 
timeout time.Duration) (*RemotingCommand, error) {
-       ret := m.ctrl.Call(m, "InvokeSync", addr, request, timeout)
+func (m *MockRemotingClient) InvokeSync(ctx context.Context, addr string, 
request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
+       ret := m.ctrl.Call(m, "InvokeSync", ctx, addr, request, timeout)
        ret0, _ := ret[0].(*RemotingCommand)
        ret1, _ := ret[1].(error)
        return ret0, ret1
 }
 
 // InvokeSync indicates an expected call of InvokeSync
-func (mr *MockRemotingClientMockRecorder) InvokeSync(addr, request, timeout 
interface{}) *gomock.Call {
-       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", 
reflect.TypeOf((*MockRemotingClient)(nil).InvokeSync), addr, request, timeout)
+func (mr *MockRemotingClientMockRecorder) InvokeSync(ctx, addr, request, 
timeout interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", 
reflect.TypeOf((*MockRemotingClient)(nil).InvokeSync), ctx, addr, request, 
timeout)
 }
 
 // InvokeAsync mocks base method
-func (m *MockRemotingClient) InvokeAsync(addr string, request 
*RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error {
-       ret := m.ctrl.Call(m, "InvokeAsync", addr, request, timeout, callback)
+func (m *MockRemotingClient) InvokeAsync(ctx context.Context, addr string, 
request *RemotingCommand, timeout time.Duration, callback 
func(*ResponseFuture)) error {
+       ret := m.ctrl.Call(m, "InvokeAsync", ctx, addr, request, timeout, 
callback)
        ret0, _ := ret[0].(error)
        return ret0
 }
 
 // InvokeAsync indicates an expected call of InvokeAsync
-func (mr *MockRemotingClientMockRecorder) InvokeAsync(addr, request, timeout, 
callback interface{}) *gomock.Call {
-       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", 
reflect.TypeOf((*MockRemotingClient)(nil).InvokeAsync), addr, request, timeout, 
callback)
+func (mr *MockRemotingClientMockRecorder) InvokeAsync(ctx, addr, request, 
timeout, callback interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", 
reflect.TypeOf((*MockRemotingClient)(nil).InvokeAsync), ctx, addr, request, 
timeout, callback)
 }
 
 // InvokeOneWay mocks base method
-func (m *MockRemotingClient) InvokeOneWay(addr string, request 
*RemotingCommand, timeout time.Duration) error {
-       ret := m.ctrl.Call(m, "InvokeOneWay", addr, request, timeout)
+func (m *MockRemotingClient) InvokeOneWay(ctx context.Context, addr string, 
request *RemotingCommand, timeout time.Duration) error {
+       ret := m.ctrl.Call(m, "InvokeOneWay", ctx, addr, request, timeout)
        ret0, _ := ret[0].(error)
        return ret0
 }
 
 // InvokeOneWay indicates an expected call of InvokeOneWay
-func (mr *MockRemotingClientMockRecorder) InvokeOneWay(addr, request, timeout 
interface{}) *gomock.Call {
-       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", 
reflect.TypeOf((*MockRemotingClient)(nil).InvokeOneWay), addr, request, timeout)
+func (mr *MockRemotingClientMockRecorder) InvokeOneWay(ctx, addr, request, 
timeout interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", 
reflect.TypeOf((*MockRemotingClient)(nil).InvokeOneWay), ctx, addr, request, 
timeout)
 }
 
 // ShutDown mocks base method
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 01688fe..96f67a6 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -41,9 +41,9 @@ type TcpOption struct {
 type RemotingClient interface {
        RegisterRequestFunc(code int16, f ClientRequestFunc)
        RegisterInterceptor(interceptors ...primitive.Interceptor)
-       InvokeSync(addr string, request *RemotingCommand, timeout 
time.Duration) (*RemotingCommand, error)
-       InvokeAsync(addr string, request *RemotingCommand, timeout 
time.Duration, callback func(*ResponseFuture)) error
-       InvokeOneWay(addr string, request *RemotingCommand, timeout 
time.Duration) error
+       InvokeSync(ctx context.Context, addr string, request *RemotingCommand, 
timeout time.Duration) (*RemotingCommand, error)
+       InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, 
timeout time.Duration, callback func(*ResponseFuture)) error
+       InvokeOneWay(ctx context.Context, addr string, request 
*RemotingCommand, timeout time.Duration) error
        ShutDown()
 }
 
@@ -69,12 +69,12 @@ func (c *remotingClient) RegisterRequestFunc(code int16, f 
ClientRequestFunc) {
 }
 
 // TODO: merge sync and async model. sync should run on async model by 
blocking on chan
-func (c *remotingClient) InvokeSync(addr string, request *RemotingCommand, 
timeout time.Duration) (*RemotingCommand, error) {
-       conn, err := c.connect(addr)
+func (c *remotingClient) InvokeSync(ctx context.Context, addr string, request 
*RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
+       conn, err := c.connect(ctx, addr)
        if err != nil {
                return nil, err
        }
-       resp := NewResponseFuture(request.Opaque, timeout, nil)
+       resp := NewResponseFuture(ctx, request.Opaque, timeout, nil)
        c.responseTable.Store(resp.Opaque, resp)
        defer c.responseTable.Delete(request.Opaque)
        err = c.sendRequest(conn, request)
@@ -86,12 +86,12 @@ func (c *remotingClient) InvokeSync(addr string, request 
*RemotingCommand, timeo
 }
 
 // InvokeAsync send request without blocking, just return immediately.
-func (c *remotingClient) InvokeAsync(addr string, request *RemotingCommand, 
timeout time.Duration, callback func(*ResponseFuture)) error {
-       conn, err := c.connect(addr)
+func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request 
*RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error {
+       conn, err := c.connect(ctx, addr)
        if err != nil {
                return err
        }
-       resp := NewResponseFuture(request.Opaque, timeout, callback)
+       resp := NewResponseFuture(ctx, request.Opaque, timeout, callback)
        c.responseTable.Store(resp.Opaque, resp)
        err = c.sendRequest(conn, request)
        if err != nil {
@@ -109,15 +109,15 @@ func (c *remotingClient) receiveAsync(f *ResponseFuture) {
        }
 }
 
-func (c *remotingClient) InvokeOneWay(addr string, request *RemotingCommand, 
timeout time.Duration) error {
-       conn, err := c.connect(addr)
+func (c *remotingClient) InvokeOneWay(ctx context.Context, addr string, 
request *RemotingCommand, timeout time.Duration) error {
+       conn, err := c.connect(ctx, addr)
        if err != nil {
                return err
        }
        return c.sendRequest(conn, request)
 }
 
-func (c *remotingClient) connect(addr string) (net.Conn, error) {
+func (c *remotingClient) connect(ctx context.Context, addr string) (net.Conn, 
error) {
        //it needs additional locker.
        c.connectionLocker.Lock()
        defer c.connectionLocker.Unlock()
@@ -125,7 +125,8 @@ func (c *remotingClient) connect(addr string) (net.Conn, 
error) {
        if ok {
                return conn.(net.Conn), nil
        }
-       tcpConn, err := net.Dial("tcp", addr)
+       var d net.Dialer
+       tcpConn, err := d.DialContext(ctx, "tcp", addr)
        if err != nil {
                return nil, err
        }
diff --git a/internal/remote/remote_client_test.go 
b/internal/remote/remote_client_test.go
index 9cbb117..1d2c033 100644
--- a/internal/remote/remote_client_test.go
+++ b/internal/remote/remote_client_test.go
@@ -18,6 +18,7 @@ package remote
 
 import (
        "bytes"
+       "context"
        "errors"
        "math/rand"
        "net"
@@ -32,7 +33,7 @@ import (
 )
 
 func TestNewResponseFuture(t *testing.T) {
-       future := NewResponseFuture(10, time.Duration(1000), nil)
+       future := NewResponseFuture(context.Background(), 10, 
time.Duration(1000), nil)
        if future.Opaque != 10 {
                t.Errorf("wrong ResponseFuture's opaque. want=%d, got=%d", 10, 
future.Opaque)
        }
@@ -62,7 +63,7 @@ func TestResponseFutureTimeout(t *testing.T) {
                        r.ResponseCommand.Remark = r.ResponseCommand.Remark + 
"Go Client"
                }
        }
-       future := NewResponseFuture(10, time.Duration(1000), callback)
+       future := NewResponseFuture(context.Background(), 10, 
time.Duration(1000), callback)
        future.ResponseCommand = NewRemotingCommand(200,
                nil, nil)
 
@@ -83,7 +84,7 @@ func TestResponseFutureTimeout(t *testing.T) {
 }
 
 func TestResponseFutureIsTimeout(t *testing.T) {
-       future := NewResponseFuture(10, 500*time.Millisecond, nil)
+       future := NewResponseFuture(context.Background(), 10, 
500*time.Millisecond, nil)
        if future.isTimeout() != false {
                t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t", 
false, future.isTimeout())
        }
@@ -92,12 +93,12 @@ func TestResponseFutureIsTimeout(t *testing.T) {
 }
 
 func TestResponseFutureWaitResponse(t *testing.T) {
-       future := NewResponseFuture(10, 500*time.Millisecond, nil)
+       future := NewResponseFuture(context.Background(), 10, 
500*time.Millisecond, nil)
        if _, err := future.waitResponse(); err != utils.ErrRequestTimeout {
                t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v",
                        utils.ErrRequestTimeout, err)
        }
-       future = NewResponseFuture(10, 500*time.Millisecond, nil)
+       future = NewResponseFuture(context.Background(), 10, 
500*time.Millisecond, nil)
        responseError := errors.New("response error")
        go func() {
                time.Sleep(100 * time.Millisecond)
@@ -108,7 +109,7 @@ func TestResponseFutureWaitResponse(t *testing.T) {
                t.Errorf("wrong ResponseFuture waitResponse. want=%v. got=%v",
                        responseError, err)
        }
-       future = NewResponseFuture(10, 500*time.Millisecond, nil)
+       future = NewResponseFuture(context.Background(), 10, 
500*time.Millisecond, nil)
        responseRemotingCommand := NewRemotingCommand(202, nil, nil)
        go func() {
                time.Sleep(100 * time.Millisecond)
@@ -173,7 +174,7 @@ func TestInvokeSync(t *testing.T) {
 
        go func() {
                clientSend.Wait()
-               receiveCommand, err := client.InvokeSync(addr,
+               receiveCommand, err := client.InvokeSync(context.Background(), 
addr,
                        clientSendRemtingCommand, time.Second)
                if err != nil {
                        t.Fatalf("failed to invoke synchronous. %s", err)
@@ -237,7 +238,7 @@ func TestInvokeAsync(t *testing.T) {
                        time.Sleep(time.Duration(rand.Intn(100)) * 
time.Millisecond)
                        t.Logf("[Send: %d] asychronous message", index)
                        sendRemotingCommand := randomNewRemotingCommand()
-                       err := client.InvokeAsync(addr, sendRemotingCommand, 
time.Second, func(r *ResponseFuture) {
+                       err := client.InvokeAsync(context.Background(), addr, 
sendRemotingCommand, time.Second, func(r *ResponseFuture) {
                                t.Logf("[Receive: %d] asychronous message 
response", index)
                                if string(sendRemotingCommand.Body) != 
string(r.ResponseCommand.Body) {
                                        t.Errorf("wrong response message. 
want=%s, got=%s", string(sendRemotingCommand.Body),
@@ -303,7 +304,7 @@ func TestInvokeAsyncTimeout(t *testing.T) {
        clientSend.Add(1)
        go func() {
                clientSend.Wait()
-               err := client.InvokeAsync(addr, clientSendRemtingCommand,
+               err := client.InvokeAsync(context.Background(), addr, 
clientSendRemtingCommand,
                        time.Duration(1000), func(r *ResponseFuture) {
                                assert.NotNil(t, r.Err)
                                assert.Equal(t, utils.ErrRequestTimeout, r.Err)
@@ -348,7 +349,7 @@ func TestInvokeOneWay(t *testing.T) {
        clientSend.Add(1)
        go func() {
                clientSend.Wait()
-               err := client.InvokeOneWay(addr, clientSendRemtingCommand, 
3*time.Second)
+               err := client.InvokeOneWay(context.Background(), addr, 
clientSendRemtingCommand, 3*time.Second)
                if err != nil {
                        t.Fatalf("failed to invoke synchronous. %s", err)
                }
diff --git a/internal/route.go b/internal/route.go
index c6b7e70..3611bb9 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -18,6 +18,7 @@ limitations under the License.
 package internal
 
 import (
+       "context"
        "encoding/json"
        "errors"
        "math/rand"
@@ -304,7 +305,7 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic 
string) (*TopicRouteData,
        )
        for i := 0; i < s.Size(); i++ {
                rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, 
request, nil)
-               response, err = 
s.nameSrvClient.InvokeSync(s.getNameServerAddress(), rc, requestTimeout)
+               response, err = 
s.nameSrvClient.InvokeSync(context.Background(), s.getNameServerAddress(), rc, 
requestTimeout)
 
                if err == nil {
                        break
diff --git a/internal/route_test.go b/internal/route_test.go
index 77f79c0..aa944b0 100644
--- a/internal/route_test.go
+++ b/internal/route_test.go
@@ -18,6 +18,7 @@ limitations under the License.
 package internal
 
 import (
+       "context"
        "testing"
        "time"
 
@@ -48,8 +49,8 @@ func TestQueryTopicRouteInfoFromServer(t *testing.T) {
                Convey("When marshal producer trace data", func() {
 
                        count := 0
-                       remotingCli.EXPECT().InvokeSync(gomock.Any(), 
gomock.Any(), gomock.Any()).DoAndReturn(
-                               func(addr string, request 
*remote.RemotingCommand, timeout time.Duration) (*remote.RemotingCommand, 
error) {
+                       remotingCli.EXPECT().InvokeSync(gomock.Any(), 
gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+                               func(ctx context.Context, addr string, request 
*remote.RemotingCommand, timeout time.Duration) (*remote.RemotingCommand, 
error) {
                                        count++
                                        if count < 3 {
                                                return nil, errors.New("not 
existed")
diff --git a/internal/trace.go b/internal/trace.go
index ca60eea..8bc6748 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -397,7 +397,7 @@ func (td *traceDispatcher) sendTraceDataByMQ(keyset Keyset, 
regionID string, dat
        }
 
        var req = td.buildSendRequest(mq, msg)
-       td.cli.InvokeAsync(addr, req, 5000*time.Millisecond, func(command 
*remote.RemotingCommand, e error) {
+       td.cli.InvokeAsync(context.Background(), addr, req, 
5000*time.Millisecond, func(command *remote.RemotingCommand, e error) {
                if e != nil {
                        rlog.Error("send trace data ,the traceData is %v", data)
                }
diff --git a/producer/producer.go b/producer/producer.go
index 0b0367f..25e3210 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -166,7 +166,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg 
*primitive.Message,
                        producerCtx.MQ = *mq
                }
 
-               res, _err := p.client.InvokeSync(addr, p.buildSendRequest(mq, 
msg), 3*time.Second)
+               res, _err := p.client.InvokeSync(ctx, addr, 
p.buildSendRequest(mq, msg), 3*time.Second)
                if _err != nil {
                        err = _err
                        continue
@@ -205,7 +205,7 @@ func (p *defaultProducer) sendAsync(ctx context.Context, 
msg *primitive.Message,
                return errors.Errorf("topic=%s route info not found", mq.Topic)
        }
 
-       return p.client.InvokeAsync(addr, p.buildSendRequest(mq, msg), 
3*time.Second, func(command *remote.RemotingCommand, err error) {
+       return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), 
3*time.Second, func(command *remote.RemotingCommand, err error) {
                resp := new(primitive.SendResult)
                if err != nil {
                        h(ctx, nil, err)
@@ -251,7 +251,7 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, 
msg *primitive.Message
                        return fmt.Errorf("topic=%s route info not found", 
mq.Topic)
                }
 
-               _err := p.client.InvokeOneWay(addr, p.buildSendRequest(mq, 
msg), 3*time.Second)
+               _err := p.client.InvokeOneWay(ctx, addr, p.buildSendRequest(mq, 
msg), 3*time.Second)
                if _err != nil {
                        err = _err
                        continue
@@ -398,7 +398,7 @@ func (tp *transactionProducer) checkTransactionState() {
                        req := 
remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil)
                        req.Remark = tp.errRemark(nil)
 
-                       tp.producer.client.InvokeOneWay(callback.Addr.String(), 
req, tp.producer.options.SendMsgTimeout)
+                       tp.producer.client.InvokeOneWay(context.Background(), 
callback.Addr.String(), req, tp.producer.options.SendMsgTimeout)
                default:
                        rlog.Error("unknow type %v", ch)
                }
@@ -467,7 +467,7 @@ func (tp *transactionProducer) endTransaction(result 
primitive.SendResult, err e
        req := remote.NewRemotingCommand(internal.ReqENDTransaction, 
requestHeader, nil)
        req.Remark = tp.errRemark(err)
 
-       return tp.producer.client.InvokeOneWay(brokerAddr, req, 
tp.producer.options.SendMsgTimeout)
+       return tp.producer.client.InvokeOneWay(context.Background(), 
brokerAddr, req, tp.producer.options.SendMsgTimeout)
 }
 
 func (tp *transactionProducer) errRemark(err error) string {
diff --git a/producer/producer_test.go b/producer/producer_test.go
index 849978a..d4c58c2 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -130,7 +130,7 @@ func TestSync(t *testing.T) {
 
        mockB4Send(p)
 
-       client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil, nil)
+       client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil, nil)
        client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), 
gomock.Any(), gomock.Any()).Do(
                func(brokerName string, cmd *remote.RemotingCommand, resp 
*primitive.SendResult, msgs ...*primitive.Message) {
                        resp.Status = expectedResp.Status
@@ -181,8 +181,8 @@ func TestASync(t *testing.T) {
 
        mockB4Send(p)
 
-       client.EXPECT().InvokeAsync(gomock.Any(), gomock.Any(), gomock.Any(), 
gomock.Any()).DoAndReturn(
-               func(addr string, request *remote.RemotingCommand,
+       client.EXPECT().InvokeAsync(gomock.Any(), gomock.Any(), gomock.Any(), 
gomock.Any(), gomock.Any()).DoAndReturn(
+               func(ctx context.Context, addr string, request 
*remote.RemotingCommand,
                        timeoutMillis time.Duration, f 
func(*remote.RemotingCommand, error)) error {
                        // mock invoke callback
                        f(nil, nil)
@@ -226,7 +226,7 @@ func TestOneway(t *testing.T) {
 
        mockB4Send(p)
 
-       client.EXPECT().InvokeOneWay(gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil).AnyTimes()
+       client.EXPECT().InvokeOneWay(gomock.Any(), gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil).AnyTimes()
 
        err = p.SendOneWay(ctx, msg)
        assert.Nil(t, err)

Reply via email to