This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new d1e89b9  [ISSUE #272] Structured Log (#273)
d1e89b9 is described below

commit d1e89b96ce4c4e201af1802e175514415b8d2aeb
Author: wenfeng <[email protected]>
AuthorDate: Fri Nov 1 10:41:32 2019 +0800

    [ISSUE #272] Structured Log (#273)
    
    * structured log
---
 consumer/consumer.go             | 134 ++++++++++++++++++------
 consumer/offset_store.go         |  41 ++++++--
 consumer/process_queue.go        |   9 +-
 consumer/pull_consumer.go        |   5 +-
 consumer/push_consumer.go        | 221 ++++++++++++++++++++++++++-------------
 consumer/statistics.go           |  28 +++--
 consumer/strategy.go             |  38 +++++--
 examples/consumer/pull/main.go   |   4 +-
 go.sum                           |   2 -
 internal/client.go               |  44 ++++----
 internal/model.go                |   6 +-
 internal/remote/remote_client.go |  37 +++++--
 internal/route.go                |  41 ++++++--
 internal/trace.go                |  26 +++--
 internal/utils/errors.go         |   7 +-
 internal/validators.go           |   4 +-
 primitive/ctx.go                 |   3 +-
 producer/producer.go             |  17 ++-
 rlog/log.go                      | 110 ++++++++++---------
 19 files changed, 528 insertions(+), 249 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 0044bf6..55fe08f 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -363,7 +363,10 @@ func (dc *defaultConsumer) doBalance() {
                topic := key.(string)
                v, exist := dc.topicSubscribeInfoTable.Load(topic)
                if !exist {
-                       rlog.Warnf("do balance of group: %s, but topic: %s does 
not exist.", dc.consumerGroup, topic)
+                       rlog.Warning("do balance in group failed, the topic 
does not exist", map[string]interface{}{
+                               rlog.LogKeyConsumerGroup: dc.consumerGroup,
+                               rlog.LogKeyTopic:         topic,
+                       })
                        return true
                }
                mqs := v.([]*primitive.MessageQueue)
@@ -372,14 +375,19 @@ func (dc *defaultConsumer) doBalance() {
                        changed := dc.updateProcessQueueTable(topic, mqs)
                        if changed {
                                dc.mqChanged(topic, mqs, mqs)
-                               rlog.Infof("messageQueueChanged, Group: %s, 
Topic: %s, MessageQueues: %v",
-                                       dc.consumerGroup, topic, mqs)
+                               rlog.Debug("MessageQueue changed", 
map[string]interface{}{
+                                       rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
+                                       rlog.LogKeyTopic:         topic,
+                                       rlog.LogKeyMessageQueue:  
fmt.Sprintf("%v", mqs),
+                               })
                        }
                case Clustering:
                        cidAll := dc.findConsumerList(topic)
                        if cidAll == nil {
-                               rlog.Warnf("do balance for Group: %s, Topic: %s 
get consumer id list failed",
-                                       dc.consumerGroup, topic)
+                               rlog.Warning("do balance in group failed, get 
consumer id list failed", map[string]interface{}{
+                                       rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
+                                       rlog.LogKeyTopic:         topic,
+                               })
                                return true
                        }
                        mqAll := make([]*primitive.MessageQueue, len(mqs))
@@ -401,10 +409,15 @@ func (dc *defaultConsumer) doBalance() {
                        changed := dc.updateProcessQueueTable(topic, 
allocateResult)
                        if changed {
                                dc.mqChanged(topic, mqAll, allocateResult)
-                               rlog.Infof("do balance result changed, 
group=%s, "+
-                                       "topic=%s, clientId=%s, mqAllSize=%d, 
cidAllSize=%d, rebalanceResultSize=%d, "+
-                                       "rebalanceResultSet=%v", 
dc.consumerGroup, topic, dc.client.ClientID(), len(mqAll),
-                                       len(cidAll), len(allocateResult), 
allocateResult)
+                               rlog.Debug("MessageQueue do balance done", 
map[string]interface{}{
+                                       rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
+                                       rlog.LogKeyTopic:         topic,
+                                       "clientID":               
dc.client.ClientID(),
+                                       "mqAllSize":              len(mqAll),
+                                       "cidAllSize":             len(cidAll),
+                                       "rebalanceResultSize":    
len(allocateResult),
+                                       "rebalanceResultSet":     
allocateResult,
+                               })
                        }
                }
                return true
@@ -460,7 +473,16 @@ func (dc *defaultConsumer) lock(mq 
*primitive.MessageQueue) bool {
                        lockOK = true
                }
        }
-       rlog.Debugf("the message queue lock %v, %s %s", lockOK, 
dc.consumerGroup, mq.String())
+       fields := map[string]interface{}{
+               "lockOK":                 lockOK,
+               rlog.LogKeyConsumerGroup: dc.consumerGroup,
+               rlog.LogKeyMessageQueue:  mq.String(),
+       }
+       if lockOK {
+               rlog.Debug("lock MessageQueue", fields)
+       } else {
+               rlog.Info("lock MessageQueue", fields)
+       }
        return lockOK
 }
 
@@ -477,8 +499,11 @@ func (dc *defaultConsumer) unlock(mq 
*primitive.MessageQueue, oneway bool) {
                MQs:           []*primitive.MessageQueue{mq},
        }
        dc.doUnlock(brokerResult.BrokerAddr, body, oneway)
-       rlog.Warnf("unlock messageQueue. group:%s, clientId:%s, mq:%s",
-               dc.consumerGroup, dc.client.ClientID(), mq.String())
+       rlog.Info("unlock MessageQueue", map[string]interface{}{
+               rlog.LogKeyConsumerGroup: dc.consumerGroup,
+               "clientID":               dc.client.ClientID(),
+               rlog.LogKeyMessageQueue:  mq.String(),
+       })
 }
 
 func (dc *defaultConsumer) lockAll() {
@@ -516,7 +541,11 @@ func (dc *defaultConsumer) lockAll() {
                                        pq := v.(*processQueue)
                                        pq.locked = true
                                        pq.lastLockTime = time.Now()
-                                       rlog.Warnf("the message queue: %s 
locked Failed, Group: %s", _mq.String(), dc.consumerGroup)
+                                       rlog.Info("lock MessageQueue", 
map[string]interface{}{
+                                               "lockOK":                 false,
+                                               rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
+                                               rlog.LogKeyMessageQueue:  
_mq.String(),
+                                       })
                                }
                        }
                }
@@ -544,7 +573,11 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
                        v, exist := dc.processQueueTable.Load(_mq)
                        if exist {
                                v.(*processQueue).locked = false
-                               rlog.Warnf("the message queue: %s locked 
Failed, Group: %s", _mq.String(), dc.consumerGroup)
+                               rlog.Info("lock MessageQueue", 
map[string]interface{}{
+                                       "lockOK":                 false,
+                                       rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
+                                       rlog.LogKeyMessageQueue:  _mq.String(),
+                               })
                        }
                }
        }
@@ -555,7 +588,10 @@ func (dc *defaultConsumer) doLock(addr string, body 
*lockBatchRequestBody) []pri
        request := remote.NewRemotingCommand(internal.ReqLockBatchMQ, nil, data)
        response, err := dc.client.InvokeSync(context.Background(), addr, 
request, 1*time.Second)
        if err != nil {
-               rlog.Errorf("lock mq to broker: %s error %s", addr, err.Error())
+               rlog.Error("lock MessageQueue to broker invoke error", 
map[string]interface{}{
+                       rlog.LogKeyBroker:        addr,
+                       rlog.LogKeyUnderlayError: err,
+               })
                return nil
        }
        lockOKMQSet := struct {
@@ -563,7 +599,9 @@ func (dc *defaultConsumer) doLock(addr string, body 
*lockBatchRequestBody) []pri
        }{}
        err = json.Unmarshal(response.Body, &lockOKMQSet)
        if err != nil {
-               rlog.Errorf("Unmarshal lock mq body error %s", err.Error())
+               rlog.Error("Unmarshal lock mq body error", 
map[string]interface{}{
+                       rlog.LogKeyUnderlayError: err,
+               })
                return nil
        }
        return lockOKMQSet.MQs
@@ -575,13 +613,17 @@ func (dc *defaultConsumer) doUnlock(addr string, body 
*lockBatchRequestBody, one
        if oneway {
                err := dc.client.InvokeOneWay(context.Background(), addr, 
request, 3*time.Second)
                if err != nil {
-                       rlog.Errorf("lock mq to broker with oneway: %s error 
%s", addr, err.Error())
+                       rlog.Error("lock MessageQueue to broker invoke oneway 
error", map[string]interface{}{
+                               rlog.LogKeyBroker:        addr,
+                               rlog.LogKeyUnderlayError: err,
+                       })
                }
        } else {
                response, err := dc.client.InvokeSync(context.Background(), 
addr, request, 1*time.Second)
-               if err != nil {
-                       rlog.Errorf("lock mq to broker: %s error %s", addr, 
err.Error())
-               }
+               rlog.Error("lock MessageQueue to broker invoke error", 
map[string]interface{}{
+                       rlog.LogKeyBroker:        addr,
+                       rlog.LogKeyUnderlayError: err,
+               })
                if response.Code != internal.ResSuccess {
                        // TODO error
                }
@@ -623,15 +665,20 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic 
string, mqs []*primitiv
                                        //delete(mqSet, mq)
                                        dc.processQueueTable.Delete(key)
                                        changed = true
-                                       rlog.Debugf("do defaultConsumer, 
Group:%s, remove unnecessary mq: %s", dc.consumerGroup, mq.String())
+                                       rlog.Debug("remove unnecessary mq when 
updateProcessQueueTable", map[string]interface{}{
+                                               rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
+                                               rlog.LogKeyMessageQueue:  
mq.String(),
+                                       })
                                }
                        } else if pq.isPullExpired() && dc.cType == 
_PushConsume {
                                pq.dropped = true
                                if dc.removeUnnecessaryMessageQueue(&mq, pq) {
                                        delete(mqSet, mq)
                                        changed = true
-                                       rlog.Debugf("do defaultConsumer, 
Group:%s, remove unnecessary mq: %s, "+
-                                               "because pull was paused, so 
try to fixed it", dc.consumerGroup, mq)
+                                       rlog.Debug("remove unnecessary mq 
because pull was paused, prepare to fix it", map[string]interface{}{
+                                               rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
+                                               rlog.LogKeyMessageQueue:  
mq.String(),
+                                       })
                                }
                        }
                }
@@ -647,8 +694,10 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic 
string, mqs []*primitiv
                                continue
                        }
                        if dc.consumeOrderly && !dc.lock(&mq) {
-                               rlog.Warnf("do defaultConsumer, Group:%s add a 
new mq failed, %s, because lock failed",
-                                       dc.consumerGroup, mq.String())
+                               rlog.Warning("do defaultConsumer, add a new mq 
failed, because lock failed", map[string]interface{}{
+                                       rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
+                                       rlog.LogKeyMessageQueue:  mq.String(),
+                               })
                                continue
                        }
                        dc.storage.remove(&mq)
@@ -656,9 +705,15 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic 
string, mqs []*primitiv
                        if nextOffset >= 0 {
                                _, exist := dc.processQueueTable.Load(mq)
                                if exist {
-                                       rlog.Debugf("do defaultConsumer, Group: 
%s, mq already exist, %s", dc.consumerGroup, mq.String())
+                                       rlog.Debug("do defaultConsumer, mq 
already exist", map[string]interface{}{
+                                               rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
+                                               rlog.LogKeyMessageQueue:  
mq.String(),
+                                       })
                                } else {
-                                       rlog.Debugf("do defaultConsumer, Group: 
%s, add a new mq, %s", dc.consumerGroup, mq.String())
+                                       rlog.Debug("do defaultConsumer, add a 
new mq", map[string]interface{}{
+                                               rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
+                                               rlog.LogKeyMessageQueue:  
mq.String(),
+                                       })
                                        pq := newProcessQueue(dc.consumeOrderly)
                                        dc.processQueueTable.Store(mq, pq)
                                        pr := PullRequest{
@@ -671,7 +726,10 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic 
string, mqs []*primitiv
                                        changed = true
                                }
                        } else {
-                               rlog.Warnf("do defaultConsumer failed, 
Group:%s, add new mq failed, {}", dc.consumerGroup, mq)
+                               rlog.Warning("do defaultConsumer, add a new mq 
failed", map[string]interface{}{
+                                       rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
+                                       rlog.LogKeyMessageQueue:  mq.String(),
+                               })
                        }
                }
        }
@@ -704,7 +762,10 @@ func (dc *defaultConsumer) computePullFromWhere(mq 
*primitive.MessageQueue) int6
                                        if err == nil {
                                                result = lastOffset
                                        } else {
-                                               rlog.Warnf("query max offset 
of: [%s:%d] error, %s", mq.Topic, mq.QueueId, err.Error())
+                                               rlog.Warning("query max offset 
error", map[string]interface{}{
+                                                       
rlog.LogKeyMessageQueue:  mq,
+                                                       
rlog.LogKeyUnderlayError: err,
+                                               })
                                        }
                                }
                        } else {
@@ -722,7 +783,10 @@ func (dc *defaultConsumer) computePullFromWhere(mq 
*primitive.MessageQueue) int6
                                                result = lastOffset
                                        } else {
                                                result = -1
-                                               rlog.Warnf("query max offset 
of: [%s:%d] error, %s", mq.Topic, mq.QueueId, err.Error())
+                                               rlog.Warning("query max offset 
error", map[string]interface{}{
+                                                       
rlog.LogKeyMessageQueue:  mq,
+                                                       
rlog.LogKeyUnderlayError: err,
+                                               })
                                        }
                                } else {
                                        t, err := time.Parse("20060102150405", 
dc.option.ConsumeTimestamp)
@@ -749,7 +813,9 @@ func (dc *defaultConsumer) pullInner(ctx context.Context, 
queue *primitive.Messa
 
        brokerResult := dc.tryFindBroker(queue)
        if brokerResult == nil {
-               rlog.Warnf("no broker found for %s", queue.String())
+               rlog.Warning("no broker found for mq", map[string]interface{}{
+                       rlog.LogKeyMessageQueue: queue,
+               })
                return nil, ErrBrokerNotFound
        }
 
@@ -838,7 +904,11 @@ func (dc *defaultConsumer) findConsumerList(topic string) 
[]string {
                cmd := 
remote.NewRemotingCommand(internal.ReqGetConsumerListByGroup, req, nil)
                res, err := dc.client.InvokeSync(context.Background(), 
brokerAddr, cmd, 3*time.Second) // TODO 超时机制有问题
                if err != nil {
-                       rlog.Errorf("get consumer list of [%s] from %s error: 
%s", dc.consumerGroup, brokerAddr, err.Error())
+                       rlog.Error("get consumer list of group from broker 
error", map[string]interface{}{
+                               rlog.LogKeyConsumerGroup: dc.consumerGroup,
+                               rlog.LogKeyBroker:        brokerAddr,
+                               rlog.LogKeyUnderlayError: err,
+                       })
                        return nil
                }
                result := gjson.ParseBytes(res.Body)
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 7b230a0..2db940c 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -123,11 +123,15 @@ func (local *localFileOffsetStore) load() {
                return
        }
        if err != nil {
-               rlog.Errorf("read from store failed. err: %v \n", err)
+               rlog.Info("read from local store error, try to use bak file", 
map[string]interface{}{
+                       rlog.LogKeyUnderlayError: err,
+               })
                data, err = utils.FileReadAll(filepath.Join(local.path, ".bak"))
        }
        if err != nil {
-               rlog.Debugf("load local offset: %s error: %s", local.path, 
err.Error())
+               rlog.Info("read from local store bak file error", 
map[string]interface{}{
+                       rlog.LogKeyUnderlayError: err,
+               })
                return
        }
        datas := make(map[MessageQueueKey]int64)
@@ -138,7 +142,10 @@ func (local *localFileOffsetStore) load() {
 
        err = json.Unmarshal(data, &wrapper)
        if err != nil {
-               rlog.Debugf("unmarshal local offset: %s error: %s", local.path, 
err.Error())
+               rlog.Warning("unmarshal local offset error", 
map[string]interface{}{
+                       "local_path":             local.path,
+                       rlog.LogKeyUnderlayError: err.Error(),
+               })
                return
        }
 
@@ -166,7 +173,10 @@ func (local *localFileOffsetStore) read(mq 
*primitive.MessageQueue, t readType)
 func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset 
int64, increaseOnly bool) {
        local.mutex.Lock()
        defer local.mutex.Unlock()
-       rlog.Debugf("update offset: %s to %d", mq, offset)
+       rlog.Debug("update offset", map[string]interface{}{
+               rlog.LogKeyMessageQueue: mq,
+               "new_offset":            offset,
+       })
        key := MessageQueueKey(*mq)
        localOffset, exist := local.OffsetTable[key]
        if !exist {
@@ -237,10 +247,18 @@ func (r *remoteBrokerOffsetStore) persist(mqs 
[]*primitive.MessageQueue) {
                }
                err := r.updateConsumeOffsetToBroker(r.group, mq, off)
                if err != nil {
-                       rlog.Warnf("update offset to broker error: %s, group: 
%s, queue: %s, offset: %d",
-                               err.Error(), r.group, mq.String(), off)
+                       rlog.Warning("update offset to broker error", 
map[string]interface{}{
+                               rlog.LogKeyConsumerGroup: r.group,
+                               rlog.LogKeyMessageQueue:  mq.String(),
+                               rlog.LogKeyUnderlayError: err.Error(),
+                               "offset":                 off,
+                       })
                } else {
-                       rlog.Debugf("update offset to broker success, group: 
%s, topic: %s, queue: %v offset: %v", r.group, mq.Topic, mq, off)
+                       rlog.Info("update offset to broker success", 
map[string]interface{}{
+                               rlog.LogKeyConsumerGroup: r.group,
+                               rlog.LogKeyMessageQueue:  mq.String(),
+                               "offset":                 off,
+                       })
                }
        }
 }
@@ -250,7 +268,9 @@ func (r *remoteBrokerOffsetStore) remove(mq 
*primitive.MessageQueue) {
        defer r.mutex.Unlock()
 
        delete(r.OffsetTable, *mq)
-       rlog.Infof("delete queueID %v of brokerName: %v \n", mq.QueueId, 
mq.BrokerName)
+       rlog.Info("delete mq from offset table", map[string]interface{}{
+               rlog.LogKeyMessageQueue: mq,
+       })
 }
 
 func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) 
int64 {
@@ -268,7 +288,10 @@ func (r *remoteBrokerOffsetStore) read(mq 
*primitive.MessageQueue, t readType) i
        case _ReadFromStore:
                off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)
                if err != nil {
-                       rlog.Errorf("fetch offset of %s error: %s", 
mq.String(), err.Error())
+                       rlog.Error("fecth offset of mq error", 
map[string]interface{}{
+                               rlog.LogKeyMessageQueue:  mq.String(),
+                               rlog.LogKeyUnderlayError: err,
+                       })
                        r.mutex.RUnlock()
                        return -1
                }
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 19fbafa..eeabbb2 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -176,7 +176,10 @@ func (pq *processQueue) cleanExpiredMsg(consumer 
defaultConsumer) {
                if startTime != "" {
                        st, err := strconv.ParseInt(startTime, 10, 64)
                        if err != nil {
-                               rlog.Warnf("parse message start consume time 
error: %s, origin str is: %s", startTime)
+                               rlog.Warning("parse message start consume time 
error", map[string]interface{}{
+                                       "time":                   startTime,
+                                       rlog.LogKeyUnderlayError: err,
+                               })
                                continue
                        }
                        if time.Now().Unix()-st <= 
int64(consumer.option.ConsumeTimeout) {
@@ -188,7 +191,9 @@ func (pq *processQueue) cleanExpiredMsg(consumer 
defaultConsumer) {
 
                err := consumer.sendBack(msg, 3)
                if err != nil {
-                       rlog.Errorf("send message back to broker error: %s when 
clean expired messages", err.Error())
+                       rlog.Error("send message back to broker error when 
clean expired messages", map[string]interface{}{
+                               rlog.LogKeyUnderlayError: err,
+                       })
                        continue
                }
                pq.removeMessage(msg)
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 7d47b8a..6a6fa64 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -132,7 +132,10 @@ func (c *defaultPullConsumer) Pull(ctx context.Context, 
topic string, selector M
 func (c *defaultPullConsumer) getNextQueueOf(topic string) 
*primitive.MessageQueue {
        queues, err := 
c.defaultConsumer.namesrv.FetchSubscribeMessageQueues(topic)
        if err != nil && len(queues) > 0 {
-               rlog.Error(err.Error())
+               rlog.Error("get next mq error", map[string]interface{}{
+                       rlog.LogKeyTopic:         topic,
+                       rlog.LogKeyUnderlayError: err.Error(),
+               })
                return nil
        }
        var index int64
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c04fd4b..d75f7da 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -111,15 +111,20 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, 
error) {
 func (pc *pushConsumer) Start() error {
        var err error
        pc.once.Do(func() {
-               rlog.Infof("the consumerGroup=%s start beginning. 
messageModel=%v, unitMode=%v",
-                       pc.consumerGroup, pc.model, pc.unitMode)
+               rlog.Info("the consumer start beginning", 
map[string]interface{}{
+                       rlog.LogKeyConsumerGroup: pc.consumerGroup,
+                       "messageModel":           pc.model,
+                       "unitMode":               pc.unitMode,
+               })
                pc.state = internal.StateStartFailed
                pc.validate()
 
                err := pc.client.RegisterConsumer(pc.consumerGroup, pc)
                if err != nil {
                        pc.state = internal.StateStartFailed
-                       rlog.Errorf("the consumer group: [%s] has been created, 
specify another name.", pc.consumerGroup)
+                       rlog.Error("the consumer group has been created, 
specify another one", map[string]interface{}{
+                               rlog.LogKeyConsumerGroup: pc.consumerGroup,
+                       })
                        err = ErrCreated
                }
 
@@ -232,7 +237,10 @@ func (pc *pushConsumer) messageQueueChanged(topic string, 
mqAll, mqDivided []*pr
        }
        data := v.(*internal.SubscriptionData)
        newVersion := time.Now().UnixNano()
-       rlog.Infof("the MessageQueue changed, also update version: %d to %d", 
data.SubVersion, newVersion)
+       rlog.Info("the MessageQueue changed, version also updated", 
map[string]interface{}{
+               rlog.LogKeyValueChangedFrom: data.SubVersion,
+               rlog.LogKeyValueChangedTo:   newVersion,
+       })
        data.SubVersion = newVersion
 
        // TODO: optimize
@@ -247,7 +255,10 @@ func (pc *pushConsumer) messageQueueChanged(topic string, 
mqAll, mqDivided []*pr
                        if newVal == 0 {
                                newVal = 1
                        }
-                       rlog.Infof("The PullThresholdForTopic is changed from 
%d to %d", pc.option.PullThresholdForTopic, newVal)
+                       rlog.Info("The PullThresholdForTopic is changed", 
map[string]interface{}{
+                               rlog.LogKeyValueChangedFrom: 
pc.option.PullThresholdForTopic,
+                               rlog.LogKeyValueChangedTo:   newVal,
+                       })
                        pc.option.PullThresholdForTopic = newVal
                }
 
@@ -256,8 +267,10 @@ func (pc *pushConsumer) messageQueueChanged(topic string, 
mqAll, mqDivided []*pr
                        if newVal == 0 {
                                newVal = 1
                        }
-                       rlog.Infof("The PullThresholdSizeForTopic is changed 
from %d to %d", pc.option.PullThresholdSizeForTopic, newVal)
-                       pc.option.PullThresholdSizeForTopic = newVal
+                       rlog.Info("The PullThresholdSizeForTopic is changed", 
map[string]interface{}{
+                               rlog.LogKeyValueChangedFrom: 
pc.option.PullThresholdSizeForTopic,
+                               rlog.LogKeyValueChangedTo:   newVal,
+                       })
                }
        }
        pc.client.SendHeartbeatToAllBrokerWithLock()
@@ -268,18 +281,18 @@ func (pc *pushConsumer) validate() {
 
        if pc.consumerGroup == internal.DefaultConsumerGroup {
                // TODO FQA
-               rlog.Errorf("consumerGroup can't equal [%s], please specify 
another one.", internal.DefaultConsumerGroup)
+               rlog.Error(fmt.Sprintf("consumerGroup can't equal [%s], please 
specify another one.", internal.DefaultConsumerGroup), nil)
        }
 
        if len(pc.subscribedTopic) == 0 {
-               rlog.Error("number of subscribed topics is 0.")
+               rlog.Error("number of subscribed topics is 0.", nil)
        }
 
        if pc.option.ConsumeConcurrentlyMaxSpan < 1 || 
pc.option.ConsumeConcurrentlyMaxSpan > 65535 {
                if pc.option.ConsumeConcurrentlyMaxSpan == 0 {
                        pc.option.ConsumeConcurrentlyMaxSpan = 1000
                } else {
-                       rlog.Error("option.ConsumeConcurrentlyMaxSpan out of 
range [1, 65535]")
+                       rlog.Error("option.ConsumeConcurrentlyMaxSpan out of 
range [1, 65535]", nil)
                }
        }
 
@@ -287,7 +300,7 @@ func (pc *pushConsumer) validate() {
                if pc.option.PullThresholdForQueue == 0 {
                        pc.option.PullThresholdForQueue = 1024
                } else {
-                       rlog.Error("option.PullThresholdForQueue out of range 
[1, 65535]")
+                       rlog.Error("option.PullThresholdForQueue out of range 
[1, 65535]", nil)
                }
        }
 
@@ -295,7 +308,7 @@ func (pc *pushConsumer) validate() {
                if pc.option.PullThresholdForTopic == 0 {
                        pc.option.PullThresholdForTopic = 102400
                } else {
-                       rlog.Error("option.PullThresholdForTopic out of range 
[1, 6553500]")
+                       rlog.Error("option.PullThresholdForTopic out of range 
[1, 6553500]", nil)
                }
        }
 
@@ -303,7 +316,7 @@ func (pc *pushConsumer) validate() {
                if pc.option.PullThresholdSizeForQueue == 0 {
                        pc.option.PullThresholdSizeForQueue = 512
                } else {
-                       rlog.Error("option.PullThresholdSizeForQueue out of 
range [1, 1024]")
+                       rlog.Error("option.PullThresholdSizeForQueue out of 
range [1, 1024]", nil)
                }
        }
 
@@ -311,19 +324,19 @@ func (pc *pushConsumer) validate() {
                if pc.option.PullThresholdSizeForTopic == 0 {
                        pc.option.PullThresholdSizeForTopic = 51200
                } else {
-                       rlog.Error("option.PullThresholdSizeForTopic out of 
range [1, 102400]")
+                       rlog.Error("option.PullThresholdSizeForTopic out of 
range [1, 102400]", nil)
                }
        }
 
        if pc.option.PullInterval < 0 || pc.option.PullInterval > 65535 {
-               rlog.Error("option.PullInterval out of range [0, 65535]")
+               rlog.Error("option.PullInterval out of range [0, 65535]", nil)
        }
 
        if pc.option.ConsumeMessageBatchMaxSize < 1 || 
pc.option.ConsumeMessageBatchMaxSize > 1024 {
                if pc.option.ConsumeMessageBatchMaxSize == 0 {
                        pc.option.ConsumeMessageBatchMaxSize = 512
                } else {
-                       rlog.Error("option.ConsumeMessageBatchMaxSize out of 
range [1, 1024]")
+                       rlog.Error("option.ConsumeMessageBatchMaxSize out of 
range [1, 1024]", nil)
                }
        }
 
@@ -331,13 +344,15 @@ func (pc *pushConsumer) validate() {
                if pc.option.PullBatchSize == 0 {
                        pc.option.PullBatchSize = 32
                } else {
-                       rlog.Error("option.PullBatchSize out of range [1, 
1024]")
+                       rlog.Error("option.PullBatchSize out of range [1, 
1024]", nil)
                }
        }
 }
 
 func (pc *pushConsumer) pullMessage(request *PullRequest) {
-       rlog.Debugf("start a new Pull Message task %s for [%s]", 
request.String(), pc.consumerGroup)
+       rlog.Debug("start a new Pull Message task for PullRequest", 
map[string]interface{}{
+               rlog.LogKeyPullRequest: request.String(),
+       })
        var sleepTime time.Duration
        pq := request.pq
        go func() {
@@ -350,11 +365,13 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
        for {
        NEXT:
                if pq.dropped {
-                       rlog.Infof("the request: [%s] was dropped, so stop 
task", request.String())
+                       rlog.Debug("the request was dropped, so stop task", 
map[string]interface{}{
+                               rlog.LogKeyPullRequest: request.String(),
+                       })
                        return
                }
                if sleepTime > 0 {
-                       rlog.Infof("pull MessageQueue: %d sleep %d ms for mq: 
%v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq)
+                       rlog.Debug(fmt.Sprintf("pull MessageQueue: %d sleep %d 
ms for mq: %v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq), 
nil)
                        time.Sleep(sleepTime)
                }
                // reset time
@@ -362,14 +379,16 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
                pq.lastPullTime = time.Now()
                err := pc.makeSureStateOK()
                if err != nil {
-                       rlog.Warnf("consumer state error: %s", err.Error())
+                       rlog.Warning("consumer state error", 
map[string]interface{}{
+                               rlog.LogKeyUnderlayError: err.Error(),
+                       })
                        sleepTime = _PullDelayTimeWhenError
                        goto NEXT
                }
 
                if pc.pause {
-                       rlog.Infof("consumer [%s] of [%s] was paused, execute 
pull request [%s] later",
-                               pc.option.InstanceName, pc.consumerGroup, 
request.String())
+                       rlog.Debug(fmt.Sprintf("consumer [%s] of [%s] was 
paused, execute pull request [%s] later",
+                               pc.option.InstanceName, pc.consumerGroup, 
request.String()), nil)
                        sleepTime = _PullDelayTimeWhenSuspend
                        goto NEXT
                }
@@ -377,10 +396,15 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
                cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb)
                if pq.cachedMsgCount > pc.option.PullThresholdForQueue {
                        if pc.queueFlowControlTimes%1000 == 0 {
-                               rlog.Warnf("the cached message count exceeds 
the threshold %d, so do flow control, "+
-                                       "minOffset=%d, maxOffset=%d, count=%d, 
size=%d MiB, pullRequest=%s, flowControlTimes=%d",
-                                       pc.option.PullThresholdForQueue, 0, 
pq.Min(), pq.Max(),
-                                       pq.msgCache, cachedMessageSizeInMiB, 
request.String(), pc.queueFlowControlTimes)
+                               rlog.Warning("the cached message count exceeds 
the threshold, so do flow control", map[string]interface{}{
+                                       "PullThresholdForQueue": 
pc.option.PullThresholdForQueue,
+                                       "minOffset":             pq.Min(),
+                                       "maxOffset":             pq.Max(),
+                                       "count":                 pq.msgCache,
+                                       "size(MiB)":             
cachedMessageSizeInMiB,
+                                       "flowControlTimes":      
pc.queueFlowControlTimes,
+                                       rlog.LogKeyPullRequest:  
request.String(),
+                               })
                        }
                        pc.queueFlowControlTimes++
                        sleepTime = _PullDelayTimeWhenFlowControl
@@ -389,10 +413,15 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
 
                if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue 
{
                        if pc.queueFlowControlTimes%1000 == 0 {
-                               rlog.Warnf("the cached message size exceeds the 
threshold %d MiB, so do flow control, "+
-                                       "minOffset=%d, maxOffset=%d, count=%d, 
size=%d MiB, pullRequest=%s, flowControlTimes=%d",
-                                       pc.option.PullThresholdSizeForQueue, 
pq.Min(), pq.Max(),
-                                       pq.msgCache, cachedMessageSizeInMiB, 
request.String(), pc.queueFlowControlTimes)
+                               rlog.Warning("the cached message size exceeds 
the threshold, so do flow control", map[string]interface{}{
+                                       "PullThresholdSizeForQueue": 
pc.option.PullThresholdSizeForQueue,
+                                       "minOffset":                 pq.Min(),
+                                       "maxOffset":                 pq.Max(),
+                                       "count":                     
pq.msgCache,
+                                       "size(MiB)":                 
cachedMessageSizeInMiB,
+                                       "flowControlTimes":          
pc.queueFlowControlTimes,
+                                       rlog.LogKeyPullRequest:      
request.String(),
+                               })
                        }
                        pc.queueFlowControlTimes++
                        sleepTime = _PullDelayTimeWhenFlowControl
@@ -401,11 +430,15 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
 
                if !pc.consumeOrderly {
                        if pq.getMaxSpan() > 
pc.option.ConsumeConcurrentlyMaxSpan {
-
                                if pc.queueMaxSpanFlowControlTimes%1000 == 0 {
-                                       rlog.Warnf("the queue's messages, span 
too long, limit=%d, so do flow control, minOffset=%d, "+
-                                               "maxOffset=%d, maxSpan=%d, 
pullRequest=%s, flowControlTimes=%d", pc.option.ConsumeConcurrentlyMaxSpan,
-                                               pq.Min(), pq.Max(), 
pq.getMaxSpan(), request.String(), pc.queueMaxSpanFlowControlTimes)
+                                       rlog.Warning("the queue's messages span 
too long, so do flow control", map[string]interface{}{
+                                               "ConsumeConcurrentlyMaxSpan": 
pc.option.ConsumeConcurrentlyMaxSpan,
+                                               "minOffset":                  
pq.Min(),
+                                               "maxOffset":                  
pq.Max(),
+                                               "maxSpan":                    
pq.getMaxSpan(),
+                                               "flowControlTimes":           
pc.queueFlowControlTimes,
+                                               rlog.LogKeyPullRequest:       
request.String(),
+                                       })
                                }
                                sleepTime = _PullDelayTimeWhenFlowControl
                                goto NEXT
@@ -415,19 +448,23 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
                                if !request.lockedFirst {
                                        offset := 
pc.computePullFromWhere(request.mq)
                                        brokerBusy := offset < 
request.nextOffset
-                                       rlog.Infof("the first time to pull 
message, so fix offset from broker. "+
-                                               "pullRequest: [%s] NewOffset: 
%d brokerBusy: %v",
-                                               request.String(), offset, 
brokerBusy)
+                                       rlog.Info("the first time to pull 
message, so fix offset from broker, offset maybe changed", 
map[string]interface{}{
+                                               rlog.LogKeyPullRequest:      
request.String(),
+                                               rlog.LogKeyValueChangedFrom: 
request.nextOffset,
+                                               rlog.LogKeyValueChangedTo:   
offset,
+                                               "brokerBusy":                
brokerBusy,
+                                       })
                                        if brokerBusy {
-                                               rlog.Infof("[NOTIFY_ME]the 
first time to pull message, but pull request offset"+
-                                                       " larger than broker 
consume offset. pullRequest: [%s] NewOffset: %d",
-                                                       request.String(), 
offset)
+                                               rlog.Info("[NOTIFY_ME] the 
first time to pull message, but pull request offset larger than "+
+                                                       "broker consume 
offset", map[string]interface{}{"offset": offset})
                                        }
                                        request.lockedFirst = true
                                        request.nextOffset = offset
                                }
                        } else {
-                               rlog.Infof("pull message later because not 
locked in broker, [%s]", request.String())
+                               rlog.Info("pull message later because not 
locked in broker", map[string]interface{}{
+                                       rlog.LogKeyPullRequest: 
request.String(),
+                               })
                                sleepTime = _PullDelayTimeWhenError
                                goto NEXT
                        }
@@ -435,7 +472,9 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 
                v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)
                if !exist {
-                       rlog.Warnf("find the consumer's subscription failed, 
%s", request.String())
+                       rlog.Info("find the consumer's subscription failed", 
map[string]interface{}{
+                               rlog.LogKeyPullRequest: request.String(),
+                       })
                        sleepTime = _PullDelayTimeWhenError
                        goto NEXT
                }
@@ -481,27 +520,34 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
 
                brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)
                if brokerResult == nil {
-                       rlog.Warnf("no broker found for %s", 
request.mq.String())
+                       rlog.Warning("no broker found for mq", 
map[string]interface{}{
+                               rlog.LogKeyPullRequest: request.mq.String(),
+                       })
                        sleepTime = _PullDelayTimeWhenError
                        goto NEXT
                }
 
                result, err := pc.client.PullMessage(context.Background(), 
brokerResult.BrokerAddr, pullRequest)
                if err != nil {
-                       rlog.Warnf("pull message from %s error: %s", 
brokerResult.BrokerAddr, err.Error())
+                       rlog.Warning("pull message from broker error", 
map[string]interface{}{
+                               rlog.LogKeyBroker:        
brokerResult.BrokerAddr,
+                               rlog.LogKeyUnderlayError: err.Error(),
+                       })
                        sleepTime = _PullDelayTimeWhenError
                        goto NEXT
                }
 
                if result.Status == primitive.PullBrokerTimeout {
-                       rlog.Warnf("pull broker: %s timeout", 
brokerResult.BrokerAddr)
+                       rlog.Warning("pull broker timeout", 
map[string]interface{}{
+                               rlog.LogKeyBroker: brokerResult.BrokerAddr,
+                       })
                        sleepTime = _PullDelayTimeWhenError
                        goto NEXT
                }
 
                switch result.Status {
                case primitive.PullFound:
-                       rlog.Debugf("Topic: %s, QueueId: %d found messages.", 
request.mq.Topic, request.mq.QueueId)
+                       rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found 
messages.", request.mq.Topic, request.mq.QueueId), nil)
                        prevRequestOffset := request.nextOffset
                        request.nextOffset = result.NextBeginOffset
 
@@ -518,16 +564,23 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
                                pq.putMessage(msgFounded...)
                        }
                        if result.NextBeginOffset < prevRequestOffset || 
firstMsgOffset < prevRequestOffset {
-                               rlog.Warnf("[BUG] pull message result maybe 
data wrong, [nextBeginOffset=%d, "+
-                                       "firstMsgOffset=%d, 
prevRequestOffset=%d]", result.NextBeginOffset, firstMsgOffset, 
prevRequestOffset)
+                               rlog.Warning("[BUG] pull message result maybe 
data wrong", map[string]interface{}{
+                                       "nextBeginOffset":   
result.NextBeginOffset,
+                                       "firstMsgOffset":    firstMsgOffset,
+                                       "prevRequestOffset": prevRequestOffset,
+                               })
                        }
                case primitive.PullNoNewMsg:
-                       rlog.Debugf("Topic: %s, QueueId: %d no more msg, 
current offset: %d, next offset: %d", request.mq.Topic, request.mq.QueueId, 
pullRequest.QueueOffset, result.NextBeginOffset)
+                       rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d no more 
msg, current offset: %d, next offset: %d",
+                               request.mq.Topic, request.mq.QueueId, 
pullRequest.QueueOffset, result.NextBeginOffset), nil)
                case primitive.PullNoMsgMatched:
                        request.nextOffset = result.NextBeginOffset
                        pc.correctTagsOffset(request)
                case primitive.PullOffsetIllegal:
-                       rlog.Warnf("the pull request offset illegal, request: 
%s, result: %s", request.String(), result.String())
+                       rlog.Warning("the pull request offset illegal", 
map[string]interface{}{
+                               rlog.LogKeyPullRequest: request.String(),
+                               "result":               result.String(),
+                       })
                        request.nextOffset = result.NextBeginOffset
                        pq.dropped = true
                        go func() {
@@ -535,10 +588,10 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
                                pc.storage.update(request.mq, 
request.nextOffset, false)
                                
pc.storage.persist([]*primitive.MessageQueue{request.mq})
                                pc.storage.remove(request.mq)
-                               rlog.Warnf("fix the pull request offset: %s", 
request.String())
+                               rlog.Warning(fmt.Sprintf("fix the pull request 
offset: %s", request.String()), nil)
                        }()
                default:
-                       rlog.Warnf("unknown pull status: %v", result.Status)
+                       rlog.Warning(fmt.Sprintf("unknown pull status: %v", 
result.Status), nil)
                        sleepTime = _PullDelayTimeWhenError
                }
        }
@@ -577,25 +630,25 @@ func (pc *pushConsumer) buildSendBackRequest(msg 
*primitive.MessageExt, delayLev
 
 func (pc *pushConsumer) suspend() {
        pc.pause = true
-       rlog.Infof("suspend consumer: %s", pc.consumerGroup)
+       rlog.Info(fmt.Sprintf("suspend consumer: %s", pc.consumerGroup), nil)
 }
 
 func (pc *pushConsumer) resume() {
        pc.pause = false
        pc.doBalance()
-       rlog.Infof("resume consumer: %s", pc.consumerGroup)
+       rlog.Info(fmt.Sprintf("resume consumer: %s", pc.consumerGroup), nil)
 }
 
 func (pc *pushConsumer) resetOffset(topic string, table 
map[primitive.MessageQueue]int64) {
        //topic := cmd.ExtFields["topic"]
        //group := cmd.ExtFields["group"]
        //if topic == "" || group == "" {
-       //      rlog.Warnf("received reset offset command from: %s, but missing 
params.", from)
+       //      rlog.Warning("received reset offset command from: %s, but 
missing params.", from)
        //      return
        //}
        //t, err := strconv.ParseInt(cmd.ExtFields["timestamp"], 10, 64)
        //if err != nil {
-       //      rlog.Warnf("received reset offset command from: %s, but parse 
time error: %s", err.Error())
+       //      rlog.Warning("received reset offset command from: %s, but parse 
time error: %s", err.Error())
        //      return
        //}
        //rlog.Infof("invoke reset offset operation from broker. brokerAddr=%s, 
topic=%s, group=%s, timestamp=%v",
@@ -604,7 +657,7 @@ func (pc *pushConsumer) resetOffset(topic string, table 
map[primitive.MessageQue
        //offsetTable := make(map[MessageQueue]int64, 0)
        //err = json.Unmarshal(cmd.Body, &offsetTable)
        //if err != nil {
-       //      rlog.Warnf("received reset offset command from: %s, but parse 
offset table: %s", err.Error())
+       //      rlog.Warning("received reset offset command from: %s, but parse 
offset table: %s", err.Error())
        //      return
        //}
        //v, exist := c.consumerMap.Load(group)
@@ -710,12 +763,13 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                go func() {
                RETRY:
                        if pq.dropped {
-                               rlog.Infof("the message queue not be able to 
consume, because it was dropped. group=%s, mq=%s",
-                                       pc.consumerGroup, mq.String())
+                               rlog.Info("the message queue not be able to 
consume, because it was dropped", map[string]interface{}{
+                                       rlog.LogKeyMessageQueue:  mq.String(),
+                                       rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
+                               })
                                return
                        }
 
-                       // TODO hook
                        beginTime := time.Now()
                        pc.resetRetryAndNamespace(subMsgs)
                        var result ConsumeResult
@@ -757,7 +811,9 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                                        
increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
                                        if pc.model == BroadCasting {
                                                for i := 0; i < len(msgs); i++ {
-                                                       
rlog.Warnf("BROADCASTING, the message=%s consume failed, drop it, {}", 
subMsgs[i])
+                                                       
rlog.Warning("BROADCASTING, the message consume failed, drop it", 
map[string]interface{}{
+                                                               "message": 
subMsgs[i],
+                                                       })
                                                }
                                        } else {
                                                for i := 0; i < len(msgs); i++ {
@@ -781,8 +837,10 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                                        goto RETRY
                                }
                        } else {
-                               rlog.Warnf("processQueue is dropped without 
process consume result. messageQueue=%s, msgs=%+v",
-                                       mq, msgs)
+                               rlog.Warning("processQueue is dropped without 
process consume result.", map[string]interface{}{
+                                       rlog.LogKeyMessageQueue: mq,
+                                       "message":               msgs,
+                               })
                        }
                }()
        }
@@ -790,7 +848,9 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
 
 func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq 
*primitive.MessageQueue) {
        if pq.dropped {
-               rlog.Warn("the message queue not be able to consume, because 
it's dropped.")
+               rlog.Warning("the message queue not be able to consume, because 
it's dropped.", map[string]interface{}{
+                       rlog.LogKeyMessageQueue: mq.String(),
+               })
                return
        }
 
@@ -803,17 +863,23 @@ func (pc *pushConsumer) consumeMessageOrderly(pq 
*processQueue, mq *primitive.Me
                continueConsume := true
                for continueConsume {
                        if pq.dropped {
-                               rlog.Warnf("the message queue not be able to 
consume, because it's dropped. %v", mq)
+                               rlog.Warning("the message queue not be able to 
consume, because it's dropped.", map[string]interface{}{
+                                       rlog.LogKeyMessageQueue: mq.String(),
+                               })
                                break
                        }
                        if pc.model == Clustering {
                                if !pq.locked {
-                                       rlog.Warnf("the message queue not 
locked, so consume later: %v", mq)
+                                       rlog.Warning("the message queue not 
locked, so consume later", map[string]interface{}{
+                                               rlog.LogKeyMessageQueue: 
mq.String(),
+                                       })
                                        pc.tryLockLaterAndReconsume(mq, 10)
                                        return
                                }
                                if pq.isLockExpired() {
-                                       rlog.Warnf("the message queue lock 
expired, so consume later: %v", mq)
+                                       rlog.Warning("the message queue lock 
expired, so consume later", map[string]interface{}{
+                                               rlog.LogKeyMessageQueue: 
mq.String(),
+                                       })
                                        pc.tryLockLaterAndReconsume(mq, 10)
                                        return
                                }
@@ -855,8 +921,11 @@ func (pc *pushConsumer) consumeMessageOrderly(pq 
*processQueue, mq *primitive.Me
                        pq.lockConsume.Unlock()
 
                        if result == Rollback || result == 
SuspendCurrentQueueAMoment {
-                               rlog.Warnf("consumeMessage Orderly return not 
OK, Group: %v Msgs: %v MQ: %v",
-                                       pc.consumerGroup, msgs, mq)
+                               rlog.Warning("consumeMessage Orderly return not 
OK", map[string]interface{}{
+                                       rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
+                                       "messages":               msgs,
+                                       rlog.LogKeyMessageQueue:  mq,
+                               })
                        }
 
                        // jsut put consumeResult in consumerMessageCtx
@@ -875,7 +944,9 @@ func (pc *pushConsumer) consumeMessageOrderly(pq 
*processQueue, mq *primitive.Me
                        if pc.option.AutoCommit {
                                switch result {
                                case Commit, Rollback:
-                                       rlog.Warnf("the message queue consume 
result is illegal, we think you want to ack these message: %v", mq)
+                                       rlog.Warning("the message queue consume 
result is illegal, we think you want to ack these message: %v", 
map[string]interface{}{
+                                               rlog.LogKeyMessageQueue: mq,
+                                       })
                                case ConsumeSuccess:
                                        commitOffset = pq.commit()
                                case SuspendCurrentQueueAMoment:
@@ -906,12 +977,14 @@ func (pc *pushConsumer) consumeMessageOrderly(pq 
*processQueue, mq *primitive.Me
                                }
                        }
                        if commitOffset > 0 && !pq.dropped {
-                               pc.updateOffset(mq, commitOffset)
+                               _ = pc.updateOffset(mq, commitOffset)
                        }
                }
        } else {
                if pq.dropped {
-                       rlog.Warnf("the message queue not be able to consume, 
because it's dropped. %v", mq)
+                       rlog.Warning("the message queue not be able to consume, 
because it's dropped.", map[string]interface{}{
+                               rlog.LogKeyMessageQueue: mq.String(),
+                       })
                }
                pc.tryLockLaterAndReconsume(mq, 100)
        }
@@ -923,7 +996,7 @@ func (pc *pushConsumer) checkReconsumeTimes(msgs 
[]*primitive.MessageExt) bool {
                maxReconsumeTimes := pc.getOrderlyMaxReconsumeTimes()
                for _, msg := range msgs {
                        if msg.ReconsumeTimes > maxReconsumeTimes {
-                               rlog.Warnf("msg will be send to retry topic due 
to ReconsumeTimes > %d, \n", maxReconsumeTimes)
+                               rlog.Warning(fmt.Sprintf("msg will be send to 
retry topic due to ReconsumeTimes > %d, \n", maxReconsumeTimes), nil)
                                msg.WithProperty("RECONSUME_TIME", 
strconv.Itoa(int(msg.ReconsumeTimes)))
                                if !pc.sendMessageBack("", msg, -1) {
                                        suspend = true
diff --git a/consumer/statistics.go b/consumer/statistics.go
index 43f547a..2448c74 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -19,6 +19,7 @@ package consumer
 
 import (
        "container/list"
+       "fmt"
        "sync"
        "sync/atomic"
        "time"
@@ -358,20 +359,35 @@ func (si *statsItem) samplingInHour() {
 
 func (si *statsItem) printAtMinutes() {
        ss := computeStatsData(si.csListMinute)
-       rlog.Infof("[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: 
%.2f",
-               si.statsName, si.statsKey, ss.sum, ss.tps, ss.avgpt)
+       rlog.Info("Stats In One Minute, SUM: %d TPS:  AVGPT: %.2f", 
map[string]interface{}{
+               "statsName": si.statsName,
+               "statsKey":  si.statsKey,
+               "SUM":       ss.sum,
+               "TPS":       fmt.Sprintf("%.2f", ss.tps),
+               "AVGPT":     ss.avgpt,
+       })
 }
 
 func (si *statsItem) printAtHour() {
        ss := computeStatsData(si.csListHour)
-       rlog.Infof("[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.2f",
-               si.statsName, si.statsKey, ss.sum, ss.tps, ss.avgpt)
+       rlog.Info("Stats In One Hour, SUM: %d TPS:  AVGPT: %.2f", 
map[string]interface{}{
+               "statsName": si.statsName,
+               "statsKey":  si.statsKey,
+               "SUM":       ss.sum,
+               "TPS":       fmt.Sprintf("%.2f", ss.tps),
+               "AVGPT":     ss.avgpt,
+       })
 }
 
 func (si *statsItem) printAtDay() {
        ss := computeStatsData(si.csListDay)
-       rlog.Infof("[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.2f",
-               si.statsName, si.statsKey, ss.sum, ss.tps, ss.avgpt)
+       rlog.Info("Stats In One Day, SUM: %d TPS:  AVGPT: %.2f", 
map[string]interface{}{
+               "statsName": si.statsName,
+               "statsKey":  si.statsKey,
+               "SUM":       ss.sum,
+               "TPS":       fmt.Sprintf("%.2f", ss.tps),
+               "AVGPT":     ss.avgpt,
+       })
 }
 
 func nextMinutesTime() time.Time {
diff --git a/consumer/strategy.go b/consumer/strategy.go
index f326f32..ff7055f 100644
--- a/consumer/strategy.go
+++ b/consumer/strategy.go
@@ -61,7 +61,11 @@ func AllocateByAveragely(consumerGroup, currentCID string, 
mqAll []*primitive.Me
                }
        }
        if !find {
-               rlog.Warnf("[BUG] ConsumerGroup=%s, ConsumerId=%s not in 
cidAll:%+v", consumerGroup, currentCID, cidAll)
+               rlog.Warning("[BUG] ConsumerId not in cidAll", 
map[string]interface{}{
+                       rlog.LogKeyConsumerGroup: consumerGroup,
+                       "consumerId":             currentCID,
+                       "cidAll":                 cidAll,
+               })
                return nil
        }
 
@@ -88,7 +92,7 @@ func AllocateByAveragely(consumerGroup, currentCID string, 
mqAll []*primitive.Me
        }
 
        num := utils.MinInt(averageSize, mqSize-startIndex)
-       result := []*primitive.MessageQueue{}
+       result := make([]*primitive.MessageQueue, 0)
        for i := 0; i < num; i++ {
                result = append(result, mqAll[(startIndex+i)%mqSize])
        }
@@ -113,11 +117,15 @@ func AllocateByAveragelyCircle(consumerGroup, currentCID 
string, mqAll []*primit
                }
        }
        if !find {
-               rlog.Warnf("[BUG] ConsumerGroup=%s, ConsumerId=%s not in 
cidAll:%+v", consumerGroup, currentCID, cidAll)
+               rlog.Warning("[BUG] ConsumerId not in cidAll", 
map[string]interface{}{
+                       rlog.LogKeyConsumerGroup: consumerGroup,
+                       "consumerId":             currentCID,
+                       "cidAll":                 cidAll,
+               })
                return nil
        }
 
-       result := []*primitive.MessageQueue{}
+       result := make([]*primitive.MessageQueue, 0)
        for i := index; i < len(mqAll); i++ {
                if i%len(cidAll) == index {
                        result = append(result, mqAll[i])
@@ -156,11 +164,15 @@ func AllocateByMachineRoom(consumeridcs []string) 
AllocateStrategy {
                        }
                }
                if !find {
-                       rlog.Warnf("[BUG] ConsumerGroup=%s, ConsumerId=%s not 
in cidAll:%+v", consumerGroup, currentCID, cidAll)
+                       rlog.Warning("[BUG] ConsumerId not in cidAll", 
map[string]interface{}{
+                               rlog.LogKeyConsumerGroup: consumerGroup,
+                               "consumerId":             currentCID,
+                               "cidAll":                 cidAll,
+                       })
                        return nil
                }
 
-               premqAll := []*primitive.MessageQueue{}
+               var premqAll []*primitive.MessageQueue
                for _, mq := range mqAll {
                        temp := strings.Split(mq.BrokerName, "@")
                        if len(temp) == 2 {
@@ -177,7 +189,7 @@ func AllocateByMachineRoom(consumeridcs []string) 
AllocateStrategy {
                startIndex := mod * index
                endIndex := startIndex + mod
 
-               result := []*primitive.MessageQueue{}
+               result := make([]*primitive.MessageQueue, 0)
                for i := startIndex; i < endIndex; i++ {
                        result = append(result, mqAll[i])
                }
@@ -204,7 +216,11 @@ func AllocateByConsistentHash(virtualNodeCnt int) 
AllocateStrategy {
                        }
                }
                if !find {
-                       rlog.Warnf("[BUG] ConsumerGroup=%s, ConsumerId=%s not 
in cidAll:%+v", consumerGroup, currentCID, cidAll)
+                       rlog.Warning("[BUG] ConsumerId not in cidAll", 
map[string]interface{}{
+                               rlog.LogKeyConsumerGroup: consumerGroup,
+                               "consumerId":             currentCID,
+                               "cidAll":                 cidAll,
+                       })
                        return nil
                }
 
@@ -214,11 +230,13 @@ func AllocateByConsistentHash(virtualNodeCnt int) 
AllocateStrategy {
                        c.Add(cid)
                }
 
-               result := []*primitive.MessageQueue{}
+               result := make([]*primitive.MessageQueue, 0)
                for _, mq := range mqAll {
                        clientNode, err := c.Get(mq.String())
                        if err != nil {
-                               rlog.Warnf("[BUG] AllocateByConsistentHash 
err:%s", err.Error())
+                               rlog.Warning("[BUG] AllocateByConsistentHash 
err: %s", map[string]interface{}{
+                                       rlog.LogKeyUnderlayError: err,
+                               })
                        }
                        if currentCID == clientNode {
                                result = append(result, mq)
diff --git a/examples/consumer/pull/main.go b/examples/consumer/pull/main.go
index 35b970c..2b0a9f2 100644
--- a/examples/consumer/pull/main.go
+++ b/examples/consumer/pull/main.go
@@ -34,11 +34,11 @@ func main() {
                consumer.WithNameServer([]string{"127.0.0.1:9876"}),
        )
        if err != nil {
-               rlog.Fatal("fail to new pullConsumer: ", err)
+               rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err), 
nil)
        }
        err = c.Start()
        if err != nil {
-               rlog.Fatal("fail to new pullConsumer: ", err)
+               rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err), 
nil)
        }
 
        ctx := context.Background()
diff --git a/go.sum b/go.sum
index b8f3ab3..9d64cc9 100644
--- a/go.sum
+++ b/go.sum
@@ -45,5 +45,3 @@ golang.org/x/text v0.3.0/go.mod 
h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod 
h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190425150028-36563e24a262 
h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg=
 golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod 
h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
-stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
-stathat.com/c/consistent v1.0.0/go.mod 
h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
diff --git a/internal/client.go b/internal/client.go
index 99227ef..debc87c 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -177,7 +177,9 @@ func GetOrNewRocketMQClient(option ClientOptions, 
callbackCh chan interface{}) *
        actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
        if !loaded {
                
client.remoteClient.RegisterRequestFunc(ReqNotifyConsumerIdsChanged, func(req 
*remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
-                       rlog.Infof("receive broker's notification, the consumer 
group: %s", req.ExtFields["consumerGroup"])
+                       rlog.Info("receive broker's notification to consumer 
group", map[string]interface{}{
+                               rlog.LogKeyConsumerGroup: 
req.ExtFields["consumerGroup"],
+                       })
                        client.RebalanceImmediately()
                        return nil
                })
@@ -186,7 +188,7 @@ func GetOrNewRocketMQClient(option ClientOptions, 
callbackCh chan interface{}) *
                        header.Decode(req.ExtFields)
                        msgExts := primitive.DecodeMessage(req.Body)
                        if len(msgExts) == 0 {
-                               rlog.Warn("checkTransactionState, decode 
message failed")
+                               rlog.Warning("checkTransactionState, decode 
message failed", nil)
                                return nil
                        }
                        msgExt := msgExts[0]
@@ -197,11 +199,11 @@ func GetOrNewRocketMQClient(option ClientOptions, 
callbackCh chan interface{}) *
                        }
                        group := 
msgExt.GetProperty(primitive.PropertyProducerGroup)
                        if group == "" {
-                               rlog.Warn("checkTransactionState, pick producer 
group failed")
+                               rlog.Warning("checkTransactionState, pick 
producer group failed", nil)
                                return nil
                        }
                        if option.GroupName != group {
-                               rlog.Warn("producer group is not equal.")
+                               rlog.Warning("producer group is not equal", nil)
                                return nil
                        }
                        callback := CheckTransactionStateCallback{
@@ -214,7 +216,7 @@ func GetOrNewRocketMQClient(option ClientOptions, 
callbackCh chan interface{}) *
                })
 
                
client.remoteClient.RegisterRequestFunc(ReqGetConsumerRunningInfo, func(req 
*remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
-                       rlog.Info("receive get consumer running info 
request...")
+                       rlog.Info("receive get consumer running info 
request...", nil)
                        res := remote.NewRemotingCommand(ResError, nil, nil)
                        res.Remark = "the go client has not supported consumer 
running info"
                        return res
@@ -261,7 +263,9 @@ func (c *rmqClient) Start() {
                                        consumer := value.(InnerConsumer)
                                        err := consumer.PersistConsumerOffset()
                                        if err != nil {
-                                               rlog.Errorf("persist offset 
failed. err: %v", err)
+                                               rlog.Error("persist offset 
failed", map[string]interface{}{
+                                                       
rlog.LogKeyUnderlayError: err,
+                                               })
                                        }
                                        return true
                                })
@@ -349,7 +353,7 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
                return true
        })
        if hbData.ProducerDatas.Len() == 0 && hbData.ConsumerDatas.Len() == 0 {
-               rlog.Info("sending heartbeat, but no producer and no consumer")
+               rlog.Info("sending heartbeat, but no producer and no consumer", 
nil)
                return
        }
        c.namesrvs.brokerAddressesMap.Range(func(key, value interface{}) bool {
@@ -359,7 +363,9 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
                        cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, 
hbData.encode())
                        response, err := 
c.remoteClient.InvokeSync(context.Background(), addr, cmd, 3*time.Second)
                        if err != nil {
-                               rlog.Warnf("send heart beat to broker error: 
%s", err.Error())
+                               rlog.Warning("send heart beat to broker error", 
map[string]interface{}{
+                                       rlog.LogKeyUnderlayError: err,
+                               })
                                return true
                        }
                        if response.Code == ResSuccess {
@@ -372,7 +378,11 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
                                        
c.namesrvs.brokerVersionMap.Store(brokerName, m)
                                }
                                m[brokerName] = int32(response.Version)
-                               rlog.Debugf("send heart beat to broker[%s %d 
%s] success", brokerName, id, addr)
+                               rlog.Debug("send heart beat to broker success", 
map[string]interface{}{
+                                       "brokerName": brokerName,
+                                       "brokerId": id,
+                                       "brokerAddr": addr,
+                               })
                        }
                }
                return true
@@ -408,22 +418,6 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
        }
 }
 
-// SendMessageAsync send message with batch by async
-func (c *rmqClient) SendMessageAsync(ctx context.Context, brokerAddrs, 
brokerName string, request *SendMessageRequest,
-       msgs []*primitive.Message, f func(result *primitive.SendResult)) error {
-       return nil
-}
-
-func (c *rmqClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, 
request *SendMessageRequest,
-       msgs []*primitive.Message) (*primitive.SendResult, error) {
-       cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, 
encodeMessages(msgs))
-       err := c.remoteClient.InvokeOneWay(ctx, brokerAddrs, cmd, 3*time.Second)
-       if err != nil {
-               rlog.Warnf("send messages with oneway error: %v", err)
-       }
-       return nil, err
-}
-
 func (c *rmqClient) ProcessSendResponse(brokerName string, cmd 
*remote.RemotingCommand, resp *primitive.SendResult, msgs 
...*primitive.Message) error {
        var status primitive.SendStatus
        switch cmd.Code {
diff --git a/internal/model.go b/internal/model.go
index ad86e2e..b7aa125 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -92,9 +92,11 @@ func NewHeartbeatData(clientID string) *heartbeatData {
 func (data *heartbeatData) encode() []byte {
        d, err := json.Marshal(data)
        if err != nil {
-               rlog.Errorf("marshal heartbeatData error: %s", err.Error())
+               rlog.Error("marshal heartbeatData error", 
map[string]interface{}{
+                       rlog.LogKeyUnderlayError:  err,
+               })
                return nil
        }
-       rlog.Debugf("heartbeat: " + string(d))
+       rlog.Debug("heartbeat: " + string(d), nil)
        return d
 }
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 703e958..6d34a8b 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -140,34 +140,44 @@ func (c *remotingClient) receiveResponse(r net.Conn) {
        header := make([]byte, 4)
        for {
                if err != nil {
-                       rlog.Errorf("conn err: %s so close", err.Error())
+                       rlog.Error("conn error, close connection", 
map[string]interface{}{
+                               rlog.LogKeyUnderlayError: err,
+                       })
                        c.closeConnection(r)
                        break
                }
 
                _, err = io.ReadFull(r, header)
                if err != nil {
-                       rlog.Errorf("io readfull error: %s", err.Error())
+                       rlog.Error("io ReadFull error", map[string]interface{}{
+                               rlog.LogKeyUnderlayError: err,
+                       })
                        continue
                }
 
                var length int32
                err = binary.Read(bytes.NewReader(header), binary.BigEndian, 
&length)
                if err != nil {
-                       rlog.Errorf("binary decode header: %s", err.Error())
+                       rlog.Error("binary decode header error", 
map[string]interface{}{
+                               rlog.LogKeyUnderlayError: err,
+                       })
                        continue
                }
 
                buf := make([]byte, length)
                _, err = io.ReadFull(r, buf)
                if err != nil {
-                       rlog.Errorf("io readfull error: %s", err.Error())
+                       rlog.Error("io ReadFull error", map[string]interface{}{
+                               rlog.LogKeyUnderlayError: err,
+                       })
                        continue
                }
 
                cmd, err := decode(buf)
                if err != nil {
-                       rlog.Errorf("decode RemotingCommand error: %s", 
err.Error())
+                       rlog.Error("decode RemotingCommand error", 
map[string]interface{}{
+                               rlog.LogKeyUnderlayError: err,
+                       })
                        continue
                }
                c.processCMD(cmd, r)
@@ -198,12 +208,17 @@ func (c *remotingClient) processCMD(cmd *RemotingCommand, 
r net.Conn) {
                                        res.Flag |= 1 << 0
                                        err := c.sendRequest(r, res)
                                        if err != nil {
-                                               rlog.Warnf("send response to 
broker error: %s, type is: %d", err, res.Code)
+                                               rlog.Warning("send response to 
broker error", map[string]interface{}{
+                                                       
rlog.LogKeyUnderlayError: err,
+                                                       "responseCode": 
res.Code,
+                                               })
                                        }
                                }
                        }()
                } else {
-                       rlog.Warnf("receive broker's requests, but no func to 
handle, code is: %d", cmd.Code)
+                       rlog.Warning("receive broker's requests, but no func to 
handle", map[string]interface{}{
+                               "responseCode": cmd.Code,
+                       })
                }
        }
 }
@@ -216,7 +231,9 @@ func (c *remotingClient) createScanner(r io.Reader) 
*bufio.Scanner {
        scanner.Split(func(data []byte, atEOF bool) (int, []byte, error) {
                defer func() {
                        if err := recover(); err != nil {
-                               rlog.Errorf("panic: %v", err)
+                               rlog.Error("scanner split panic", 
map[string]interface{}{
+                                       "panic": err,
+                               })
                        }
                }()
                if !atEOF {
@@ -224,7 +241,9 @@ func (c *remotingClient) createScanner(r io.Reader) 
*bufio.Scanner {
                                var length int32
                                err := binary.Read(bytes.NewReader(data[0:4]), 
binary.BigEndian, &length)
                                if err != nil {
-                                       rlog.Errorf("split data error: %s", 
err.Error())
+                                       rlog.Error("split data error", 
map[string]interface{}{
+                                               rlog.LogKeyUnderlayError: err,
+                                       })
                                        return 0, nil, err
                                }
 
diff --git a/internal/route.go b/internal/route.go
index 3611bb9..e05034f 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -72,12 +72,18 @@ func (s *namesrvs) cleanOfflineBroker() {
                        })
                        if !isBrokerAddrExistInTopicRoute {
                                delete(bd.BrokerAddresses, k)
-                               rlog.Infof("the broker: [name=%s, ID=%d, 
addr=%s,] is offline, remove it", brokerName, k, v)
+                               rlog.Info("the broker: [name=%s, ID=%d, 
addr=%s,] is offline, remove it", map[string]interface{}{
+                                       "brokerName": brokerName,
+                                       "brokerID": k,
+                                       "brokerAddr": v,
+                               })
                        }
                }
                if len(bd.BrokerAddresses) == 0 {
                        s.brokerAddressesMap.Delete(brokerName)
-                       rlog.Infof("the broker [name=%s] name's host is 
offline, remove it", brokerName)
+                       rlog.Info("the broker name's host is offline, remove 
it", map[string]interface{}{
+                               "brokerName": brokerName,
+                       })
                }
                return true
        })
@@ -113,12 +119,16 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string) 
*TopicRouteData {
 
        routeData, err := s.queryTopicRouteInfoFromServer(topic)
        if err != nil {
-               rlog.Warnf("query topic route from server error: %s", err)
+               rlog.Warning("query topic route from server error: %s", 
map[string]interface{}{
+                       rlog.LogKeyUnderlayError: err,
+               })
                return nil
        }
 
        if routeData == nil {
-               rlog.Warnf("queryTopicRouteInfoFromServer return nil, Topic: 
%s", topic)
+               rlog.Warning("queryTopicRouteInfoFromServer return nil", 
map[string]interface{}{
+                       rlog.LogKeyTopic: topic,
+               })
                return nil
        }
 
@@ -130,8 +140,11 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string) 
*TopicRouteData {
 
        if changed {
                s.routeDataMap.Store(topic, routeData)
-               rlog.Infof("the topic [%s] route info changed, old %v ,new %s", 
topic,
-                       oldRouteData, routeData.String())
+               rlog.Info("the topic route info changed", 
map[string]interface{}{
+                       rlog.LogKeyTopic: topic,
+                       rlog.LogKeyValueChangedFrom: oldRouteData,
+                       rlog.LogKeyValueChangedTo:  routeData.String(),
+               })
                for _, brokerData := range routeData.BrokerDataList {
                        s.brokerAddressesMap.Store(brokerData.BrokerName, 
brokerData)
                }
@@ -261,7 +274,9 @@ func (s *namesrvs) FetchPublishMessageQueues(topic string) 
([]*primitive.Message
        if !exist {
                routeData, err = s.queryTopicRouteInfoFromServer(topic)
                if err != nil {
-                       rlog.Error("queryTopicRouteInfoFromServer failed. 
topic: ", topic)
+                       rlog.Error("queryTopicRouteInfoFromServer failed", 
map[string]interface{}{
+                               rlog.LogKeyTopic: topic,
+                       })
                        return nil, err
                }
                s.routeDataMap.Store(topic, routeData)
@@ -273,9 +288,9 @@ func (s *namesrvs) FetchPublishMessageQueues(topic string) 
([]*primitive.Message
        if err != nil {
                return nil, err
        }
-       publishinfo := s.routeData2PublishInfo(topic, routeData)
+       publishInfo := s.routeData2PublishInfo(topic, routeData)
 
-       return publishinfo.MqList, nil
+       return publishInfo.MqList, nil
 }
 
 func (s *namesrvs) findBrokerVersion(brokerName, brokerAddr string) int32 {
@@ -312,7 +327,9 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic 
string) (*TopicRouteData,
                }
        }
        if err != nil {
-               rlog.Errorf("connect to namesrv: %v failed.", s)
+               rlog.Error("connect to namesrv failed.", map[string]interface{}{
+                       "namesrv": s,
+               })
                return nil, err
        }
 
@@ -325,7 +342,9 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic 
string) (*TopicRouteData,
 
                err = routeData.decode(string(response.Body))
                if err != nil {
-                       rlog.Warnf("decode TopicRouteData error: %s", err)
+                       rlog.Warning("decode TopicRouteData error: %s", 
map[string]interface{}{
+                               rlog.LogKeyUnderlayError: err,
+                       })
                        return nil, err
                }
                return routeData, nil
diff --git a/internal/trace.go b/internal/trace.go
index 0fa4307..53ff5a2 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -268,14 +268,17 @@ func (td *traceDispatcher) Close() {
 
 func (td *traceDispatcher) Append(ctx TraceContext) bool {
        if !td.running {
-               rlog.Error("traceDispatcher is closed.")
+               rlog.Error("traceDispatcher is closed.", nil)
                return false
        }
        select {
        case td.input <- ctx:
                return true
        default:
-               rlog.Warnf("buffer full: %d, ctx is %v", 
atomic.AddInt64(&td.discardCount, 1), ctx)
+               rlog.Warning("buffer full", map[string]interface{}{
+                       "discardCount": atomic.AddInt64(&td.discardCount, 1),
+                       "TraceContext": ctx,
+               })
                return false
        }
 }
@@ -319,7 +322,7 @@ func (td *traceDispatcher) process() {
                                now = time.Now().UnixNano() / 
int64(time.Millisecond)
                                runtime.Gosched()
                        }
-                       rlog.Infof("------end trace send %v %v", td.input, 
td.batchCh)
+                       rlog.Info(fmt.Sprintf("------end trace send %v %v", 
td.input, td.batchCh), nil)
                }
        }
 }
@@ -391,9 +394,9 @@ func (td *traceDispatcher) flush(topic, regionID string, 
data []TraceTransferBea
        }
 }
 
-func (td *traceDispatcher) sendTraceDataByMQ(keyset Keyset, regionID string, 
data string) {
+func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, 
data string) {
        msg := primitive.NewMessage(td.traceTopic, []byte(data))
-       msg.WithKeys(keyset.slice())
+       msg.WithKeys(keySet.slice())
 
        mq, addr := td.findMq()
        if mq == nil {
@@ -401,17 +404,24 @@ func (td *traceDispatcher) sendTraceDataByMQ(keyset 
Keyset, regionID string, dat
        }
 
        var req = td.buildSendRequest(mq, msg)
-       td.cli.InvokeAsync(context.Background(), addr, req, 
5000*time.Millisecond, func(command *remote.RemotingCommand, e error) {
+       err := td.cli.InvokeAsync(context.Background(), addr, req, 5 * 
time.Second, func(command *remote.RemotingCommand, e error) {
                if e != nil {
-                       rlog.Error("send trace data ,the traceData is %v", data)
+                       rlog.Error("send trace data error", 
map[string]interface{}{
+                               "traceData": data,
+                       })
                }
        })
+       rlog.Error("send trace data error when invoke", map[string]interface{}{
+               rlog.LogKeyUnderlayError: err,
+       })
 }
 
 func (td *traceDispatcher) findMq() (*primitive.MessageQueue, string) {
        mqs, err := td.namesrvs.FetchPublishMessageQueues(td.traceTopic)
        if err != nil {
-               rlog.Error("fetch publish message queues failed. err: %v", err)
+               rlog.Error("fetch publish message queues failed", 
map[string]interface{}{
+                       rlog.LogKeyUnderlayError: err,
+               })
                return nil, ""
        }
        i := atomic.AddInt32(&td.rrindex, 1)
diff --git a/internal/utils/errors.go b/internal/utils/errors.go
index 45dd1c8..c689f04 100644
--- a/internal/utils/errors.go
+++ b/internal/utils/errors.go
@@ -18,8 +18,9 @@ limitations under the License.
 package utils
 
 import (
+       "errors"
+
        "github.com/apache/rocketmq-client-go/rlog"
-       "github.com/pkg/errors"
 )
 
 var (
@@ -32,6 +33,8 @@ var (
 
 func CheckError(action string, err error) {
        if err != nil {
-               rlog.Errorf("%s error: %s", action, err.Error())
+               rlog.Error( action, map[string]interface{}{
+                       rlog.LogKeyUnderlayError: err.Error(),
+               })
        }
 }
diff --git a/internal/validators.go b/internal/validators.go
index 8b6f122..7753942 100644
--- a/internal/validators.go
+++ b/internal/validators.go
@@ -34,7 +34,7 @@ var (
 
 func ValidateGroup(group string) {
        if group == "" {
-               rlog.Fatal("consumerGroup is empty")
+               rlog.Fatal("consumerGroup is empty", nil)
        }
 
        //if !_Pattern.Match([]byte(group)) {
@@ -42,6 +42,6 @@ func ValidateGroup(group string) {
        //}
 
        if len(group) > _CharacterMaxLength {
-               rlog.Fatal("the specified group is longer than group max length 
255.")
+               rlog.Fatal("the specified group is longer than group max length 
255.", nil)
        }
 }
diff --git a/primitive/ctx.go b/primitive/ctx.go
index 0937498..5761bd6 100644
--- a/primitive/ctx.go
+++ b/primitive/ctx.go
@@ -22,6 +22,7 @@ package primitive
 
 import (
        "context"
+       "fmt"
        "math"
 
        "github.com/apache/rocketmq-client-go/rlog"
@@ -46,7 +47,7 @@ func (c ConsumeReturnType) Ordinal() int {
        case FailedReturn:
                return 4
        default:
-               rlog.Error("illegal ConsumeReturnType: %v", c)
+               rlog.Error(fmt.Sprintf("illegal ConsumeReturnType: %v", c), nil)
                return 0
        }
 }
diff --git a/producer/producer.go b/producer/producer.go
index 5590f76..3fb8041 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -310,7 +310,7 @@ func (p *defaultProducer) selectMessageQueue(msg 
*primitive.Message) *primitive.
        }
 
        if result.MqList != nil && len(result.MqList) <= 0 {
-               rlog.Error("can not find proper message queue")
+               rlog.Error("can not find proper message queue", nil)
                return nil
        }
 
@@ -394,9 +394,15 @@ func (tp *transactionProducer) checkTransactionState() {
                        req := 
remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil)
                        req.Remark = tp.errRemark(nil)
 
-                       tp.producer.client.InvokeOneWay(context.Background(), 
callback.Addr.String(), req, tp.producer.options.SendMsgTimeout)
+                       err := 
tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), 
req,
+                               tp.producer.options.SendMsgTimeout)
+                       rlog.Error("send ReqENDTransaction to broker error", 
map[string]interface{}{
+                               "callback": callback.Addr.String(),
+                               "request": req.String(),
+                               rlog.LogKeyUnderlayError: err,
+                       })
                default:
-                       rlog.Error("unknow type %v", ch)
+                       rlog.Error(fmt.Sprintf("unknown type %v", ch), nil)
                }
        }
 }
@@ -421,7 +427,10 @@ func (tp *transactionProducer) 
SendMessageInTransaction(ctx context.Context, msg
                }
                localTransactionState = 
tp.listener.ExecuteLocalTransaction(*msg)
                if localTransactionState != primitive.CommitMessageState {
-                       rlog.Errorf("executeLocalTransactionBranch return %v 
with msg: %v\n", localTransactionState, msg)
+                       rlog.Error("executeLocalTransaction but state 
unexpected", map[string]interface{}{
+                               "localState": localTransactionState,
+                               "message": msg,
+                       })
                }
 
        case primitive.SendFlushDiskTimeout, primitive.SendFlushSlaveTimeout, 
primitive.SendSlaveNotAvailable:
diff --git a/rlog/log.go b/rlog/log.go
index 28615e9..72a4a01 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -18,82 +18,98 @@
 package rlog
 
 import (
-       "io"
-
        "github.com/sirupsen/logrus"
 )
 
+const (
+       LogKeyConsumerGroup    = "consumerGroup"
+       LogKeyTopic            = "topic"
+       LogKeyMessageQueue     = "MessageQueue"
+       LogKeyUnderlayError    = "underlayError"
+       LogKeyBroker           = "broker"
+       LogKeyValueChangedFrom = "changedFrom"
+       LogKeyValueChangedTo   = "changeTo"
+       LogKeyPullRequest      = "PullRequest"
+)
+
 type Logger interface {
-       SetLevel(l logrus.Level)
-       SetOutput(output io.Writer)
-       Debug(i ...interface{})
-       Debugf(format string, args ...interface{})
-       Info(i ...interface{})
-       Infof(format string, args ...interface{})
-       Warn(i ...interface{})
-       Warnf(format string, args ...interface{})
-       Error(i ...interface{})
-       Errorf(format string, args ...interface{})
-       Fatal(i ...interface{})
-       Fatalf(format string, args ...interface{})
+       Debug(msg string, fields map[string]interface{})
+       Info(msg string, fields map[string]interface{})
+       Warning(msg string, fields map[string]interface{})
+       Error(msg string, fields map[string]interface{})
+       Fatal(msg string, fields map[string]interface{})
 }
 
-var rLog Logger
-
 func init() {
-       r := logrus.New()
-       r.SetLevel(logrus.InfoLevel)
+       r := &defaultLogger{
+               logger: logrus.New(),
+       }
        rLog = r
 }
 
-func SetLogger(log Logger) {
-       rLog = log
-}
-
-func SetLevel(l logrus.Level) {
-       rLog.SetLevel(l)
-}
+var rLog *defaultLogger
 
-func SetOutput(output io.Writer) {
-       rLog.SetOutput(output)
+type defaultLogger struct {
+       logger *logrus.Logger
 }
 
-func Debug(i ...interface{}) {
-       rLog.Debug(i...)
+func (l *defaultLogger) Debug(msg string, fields map[string]interface{}) {
+       if msg == "" && len(fields) == 0 {
+               return
+       }
+       rLog.logger.WithFields(fields).Debug(msg)
 }
 
-func Debugf(format string, args ...interface{}) {
-       rLog.Debugf(format, args...)
+func (l *defaultLogger) Info(msg string, fields map[string]interface{}) {
+       if msg == "" && len(fields) == 0 {
+               return
+       }
+       rLog.logger.WithFields(fields).Info(msg)
 }
 
-func Info(i ...interface{}) {
-       rLog.Info(i...)
+func (l *defaultLogger) Warning(msg string, fields map[string]interface{}) {
+       if msg == "" && len(fields) == 0 {
+               return
+       }
+       rLog.logger.WithFields(fields).Warning(msg)
 }
 
-func Infof(format string, args ...interface{}) {
-       rLog.Infof(format, args...)
+func (l *defaultLogger) Error(msg string, fields map[string]interface{}) {
+       if msg == "" && len(fields) == 0 {
+               return
+       }
+       rLog.logger.WithFields(fields).WithFields(fields).Error(msg)
 }
 
-func Warn(i ...interface{}) {
-       rLog.Warn(i...)
+func (l *defaultLogger) Fatal(msg string, fields map[string]interface{}) {
+       if msg == "" && len(fields) == 0 {
+               return
+       }
+       rLog.logger.WithFields(fields).Fatal(msg)
 }
 
-func Warnf(format string, args ...interface{}) {
-       rLog.Warnf(format, args...)
+func Debug(msg string, fields map[string]interface{}) {
+       rLog.Debug(msg, fields)
 }
 
-func Error(i ...interface{}) {
-       rLog.Error(i...)
+func Info(msg string, fields map[string]interface{}) {
+       if msg == "" && len(fields) == 0 {
+               return
+       }
+       rLog.Info(msg, fields)
 }
 
-func Errorf(format string, args ...interface{}) {
-       rLog.Errorf(format, args...)
+func Warning(msg string, fields map[string]interface{}) {
+       if msg == "" && len(fields) == 0 {
+               return
+       }
+       rLog.Warning(msg, fields)
 }
 
-func Fatal(i ...interface{}) {
-       rLog.Fatal(i...)
+func Error(msg string, fields map[string]interface{}) {
+       rLog.Error(msg, fields)
 }
 
-func Fatalf(format string, args ...interface{}) {
-       rLog.Fatalf(format, args...)
+func Fatal(msg string, fields map[string]interface{}) {
+       rLog.Fatal(msg, fields)
 }

Reply via email to