This is an automated email from the ASF dual-hosted git repository.
wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new d1e89b9 [ISSUE #272] Structured Log (#273)
d1e89b9 is described below
commit d1e89b96ce4c4e201af1802e175514415b8d2aeb
Author: wenfeng <[email protected]>
AuthorDate: Fri Nov 1 10:41:32 2019 +0800
[ISSUE #272] Structured Log (#273)
* structured log
---
consumer/consumer.go | 134 ++++++++++++++++++------
consumer/offset_store.go | 41 ++++++--
consumer/process_queue.go | 9 +-
consumer/pull_consumer.go | 5 +-
consumer/push_consumer.go | 221 ++++++++++++++++++++++++++-------------
consumer/statistics.go | 28 +++--
consumer/strategy.go | 38 +++++--
examples/consumer/pull/main.go | 4 +-
go.sum | 2 -
internal/client.go | 44 ++++----
internal/model.go | 6 +-
internal/remote/remote_client.go | 37 +++++--
internal/route.go | 41 ++++++--
internal/trace.go | 26 +++--
internal/utils/errors.go | 7 +-
internal/validators.go | 4 +-
primitive/ctx.go | 3 +-
producer/producer.go | 17 ++-
rlog/log.go | 110 ++++++++++---------
19 files changed, 528 insertions(+), 249 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 0044bf6..55fe08f 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -363,7 +363,10 @@ func (dc *defaultConsumer) doBalance() {
topic := key.(string)
v, exist := dc.topicSubscribeInfoTable.Load(topic)
if !exist {
- rlog.Warnf("do balance of group: %s, but topic: %s does
not exist.", dc.consumerGroup, topic)
+ rlog.Warning("do balance in group failed, the topic
does not exist", map[string]interface{}{
+ rlog.LogKeyConsumerGroup: dc.consumerGroup,
+ rlog.LogKeyTopic: topic,
+ })
return true
}
mqs := v.([]*primitive.MessageQueue)
@@ -372,14 +375,19 @@ func (dc *defaultConsumer) doBalance() {
changed := dc.updateProcessQueueTable(topic, mqs)
if changed {
dc.mqChanged(topic, mqs, mqs)
- rlog.Infof("messageQueueChanged, Group: %s,
Topic: %s, MessageQueues: %v",
- dc.consumerGroup, topic, mqs)
+ rlog.Debug("MessageQueue changed",
map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
dc.consumerGroup,
+ rlog.LogKeyTopic: topic,
+ rlog.LogKeyMessageQueue:
fmt.Sprintf("%v", mqs),
+ })
}
case Clustering:
cidAll := dc.findConsumerList(topic)
if cidAll == nil {
- rlog.Warnf("do balance for Group: %s, Topic: %s
get consumer id list failed",
- dc.consumerGroup, topic)
+ rlog.Warning("do balance in group failed, get
consumer id list failed", map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
dc.consumerGroup,
+ rlog.LogKeyTopic: topic,
+ })
return true
}
mqAll := make([]*primitive.MessageQueue, len(mqs))
@@ -401,10 +409,15 @@ func (dc *defaultConsumer) doBalance() {
changed := dc.updateProcessQueueTable(topic,
allocateResult)
if changed {
dc.mqChanged(topic, mqAll, allocateResult)
- rlog.Infof("do balance result changed,
group=%s, "+
- "topic=%s, clientId=%s, mqAllSize=%d,
cidAllSize=%d, rebalanceResultSize=%d, "+
- "rebalanceResultSet=%v",
dc.consumerGroup, topic, dc.client.ClientID(), len(mqAll),
- len(cidAll), len(allocateResult),
allocateResult)
+ rlog.Debug("MessageQueue do balance done",
map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
dc.consumerGroup,
+ rlog.LogKeyTopic: topic,
+ "clientID":
dc.client.ClientID(),
+ "mqAllSize": len(mqAll),
+ "cidAllSize": len(cidAll),
+ "rebalanceResultSize":
len(allocateResult),
+ "rebalanceResultSet":
allocateResult,
+ })
}
}
return true
@@ -460,7 +473,16 @@ func (dc *defaultConsumer) lock(mq
*primitive.MessageQueue) bool {
lockOK = true
}
}
- rlog.Debugf("the message queue lock %v, %s %s", lockOK,
dc.consumerGroup, mq.String())
+ fields := map[string]interface{}{
+ "lockOK": lockOK,
+ rlog.LogKeyConsumerGroup: dc.consumerGroup,
+ rlog.LogKeyMessageQueue: mq.String(),
+ }
+ if lockOK {
+ rlog.Debug("lock MessageQueue", fields)
+ } else {
+ rlog.Info("lock MessageQueue", fields)
+ }
return lockOK
}
@@ -477,8 +499,11 @@ func (dc *defaultConsumer) unlock(mq
*primitive.MessageQueue, oneway bool) {
MQs: []*primitive.MessageQueue{mq},
}
dc.doUnlock(brokerResult.BrokerAddr, body, oneway)
- rlog.Warnf("unlock messageQueue. group:%s, clientId:%s, mq:%s",
- dc.consumerGroup, dc.client.ClientID(), mq.String())
+ rlog.Info("unlock MessageQueue", map[string]interface{}{
+ rlog.LogKeyConsumerGroup: dc.consumerGroup,
+ "clientID": dc.client.ClientID(),
+ rlog.LogKeyMessageQueue: mq.String(),
+ })
}
func (dc *defaultConsumer) lockAll() {
@@ -516,7 +541,11 @@ func (dc *defaultConsumer) lockAll() {
pq := v.(*processQueue)
pq.locked = true
pq.lastLockTime = time.Now()
- rlog.Warnf("the message queue: %s
locked Failed, Group: %s", _mq.String(), dc.consumerGroup)
+ rlog.Info("lock MessageQueue",
map[string]interface{}{
+ "lockOK": false,
+ rlog.LogKeyConsumerGroup:
dc.consumerGroup,
+ rlog.LogKeyMessageQueue:
_mq.String(),
+ })
}
}
}
@@ -544,7 +573,11 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
v, exist := dc.processQueueTable.Load(_mq)
if exist {
v.(*processQueue).locked = false
- rlog.Warnf("the message queue: %s locked
Failed, Group: %s", _mq.String(), dc.consumerGroup)
+ rlog.Info("lock MessageQueue",
map[string]interface{}{
+ "lockOK": false,
+ rlog.LogKeyConsumerGroup:
dc.consumerGroup,
+ rlog.LogKeyMessageQueue: _mq.String(),
+ })
}
}
}
@@ -555,7 +588,10 @@ func (dc *defaultConsumer) doLock(addr string, body
*lockBatchRequestBody) []pri
request := remote.NewRemotingCommand(internal.ReqLockBatchMQ, nil, data)
response, err := dc.client.InvokeSync(context.Background(), addr,
request, 1*time.Second)
if err != nil {
- rlog.Errorf("lock mq to broker: %s error %s", addr, err.Error())
+ rlog.Error("lock MessageQueue to broker invoke error",
map[string]interface{}{
+ rlog.LogKeyBroker: addr,
+ rlog.LogKeyUnderlayError: err,
+ })
return nil
}
lockOKMQSet := struct {
@@ -563,7 +599,9 @@ func (dc *defaultConsumer) doLock(addr string, body
*lockBatchRequestBody) []pri
}{}
err = json.Unmarshal(response.Body, &lockOKMQSet)
if err != nil {
- rlog.Errorf("Unmarshal lock mq body error %s", err.Error())
+ rlog.Error("Unmarshal lock mq body error",
map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
return nil
}
return lockOKMQSet.MQs
@@ -575,13 +613,17 @@ func (dc *defaultConsumer) doUnlock(addr string, body
*lockBatchRequestBody, one
if oneway {
err := dc.client.InvokeOneWay(context.Background(), addr,
request, 3*time.Second)
if err != nil {
- rlog.Errorf("lock mq to broker with oneway: %s error
%s", addr, err.Error())
+ rlog.Error("lock MessageQueue to broker invoke oneway
error", map[string]interface{}{
+ rlog.LogKeyBroker: addr,
+ rlog.LogKeyUnderlayError: err,
+ })
}
} else {
response, err := dc.client.InvokeSync(context.Background(),
addr, request, 1*time.Second)
- if err != nil {
- rlog.Errorf("lock mq to broker: %s error %s", addr,
err.Error())
- }
+ rlog.Error("lock MessageQueue to broker invoke error",
map[string]interface{}{
+ rlog.LogKeyBroker: addr,
+ rlog.LogKeyUnderlayError: err,
+ })
if response.Code != internal.ResSuccess {
// TODO error
}
@@ -623,15 +665,20 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic
string, mqs []*primitiv
//delete(mqSet, mq)
dc.processQueueTable.Delete(key)
changed = true
- rlog.Debugf("do defaultConsumer,
Group:%s, remove unnecessary mq: %s", dc.consumerGroup, mq.String())
+ rlog.Debug("remove unnecessary mq when
updateProcessQueueTable", map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
dc.consumerGroup,
+ rlog.LogKeyMessageQueue:
mq.String(),
+ })
}
} else if pq.isPullExpired() && dc.cType ==
_PushConsume {
pq.dropped = true
if dc.removeUnnecessaryMessageQueue(&mq, pq) {
delete(mqSet, mq)
changed = true
- rlog.Debugf("do defaultConsumer,
Group:%s, remove unnecessary mq: %s, "+
- "because pull was paused, so
try to fixed it", dc.consumerGroup, mq)
+ rlog.Debug("remove unnecessary mq
because pull was paused, prepare to fix it", map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
dc.consumerGroup,
+ rlog.LogKeyMessageQueue:
mq.String(),
+ })
}
}
}
@@ -647,8 +694,10 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic
string, mqs []*primitiv
continue
}
if dc.consumeOrderly && !dc.lock(&mq) {
- rlog.Warnf("do defaultConsumer, Group:%s add a
new mq failed, %s, because lock failed",
- dc.consumerGroup, mq.String())
+ rlog.Warning("do defaultConsumer, add a new mq
failed, because lock failed", map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
dc.consumerGroup,
+ rlog.LogKeyMessageQueue: mq.String(),
+ })
continue
}
dc.storage.remove(&mq)
@@ -656,9 +705,15 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic
string, mqs []*primitiv
if nextOffset >= 0 {
_, exist := dc.processQueueTable.Load(mq)
if exist {
- rlog.Debugf("do defaultConsumer, Group:
%s, mq already exist, %s", dc.consumerGroup, mq.String())
+ rlog.Debug("do defaultConsumer, mq
already exist", map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
dc.consumerGroup,
+ rlog.LogKeyMessageQueue:
mq.String(),
+ })
} else {
- rlog.Debugf("do defaultConsumer, Group:
%s, add a new mq, %s", dc.consumerGroup, mq.String())
+ rlog.Debug("do defaultConsumer, add a
new mq", map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
dc.consumerGroup,
+ rlog.LogKeyMessageQueue:
mq.String(),
+ })
pq := newProcessQueue(dc.consumeOrderly)
dc.processQueueTable.Store(mq, pq)
pr := PullRequest{
@@ -671,7 +726,10 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic
string, mqs []*primitiv
changed = true
}
} else {
- rlog.Warnf("do defaultConsumer failed,
Group:%s, add new mq failed, {}", dc.consumerGroup, mq)
+ rlog.Warning("do defaultConsumer, add a new mq
failed", map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
dc.consumerGroup,
+ rlog.LogKeyMessageQueue: mq.String(),
+ })
}
}
}
@@ -704,7 +762,10 @@ func (dc *defaultConsumer) computePullFromWhere(mq
*primitive.MessageQueue) int6
if err == nil {
result = lastOffset
} else {
- rlog.Warnf("query max offset
of: [%s:%d] error, %s", mq.Topic, mq.QueueId, err.Error())
+ rlog.Warning("query max offset
error", map[string]interface{}{
+
rlog.LogKeyMessageQueue: mq,
+
rlog.LogKeyUnderlayError: err,
+ })
}
}
} else {
@@ -722,7 +783,10 @@ func (dc *defaultConsumer) computePullFromWhere(mq
*primitive.MessageQueue) int6
result = lastOffset
} else {
result = -1
- rlog.Warnf("query max offset
of: [%s:%d] error, %s", mq.Topic, mq.QueueId, err.Error())
+ rlog.Warning("query max offset
error", map[string]interface{}{
+
rlog.LogKeyMessageQueue: mq,
+
rlog.LogKeyUnderlayError: err,
+ })
}
} else {
t, err := time.Parse("20060102150405",
dc.option.ConsumeTimestamp)
@@ -749,7 +813,9 @@ func (dc *defaultConsumer) pullInner(ctx context.Context,
queue *primitive.Messa
brokerResult := dc.tryFindBroker(queue)
if brokerResult == nil {
- rlog.Warnf("no broker found for %s", queue.String())
+ rlog.Warning("no broker found for mq", map[string]interface{}{
+ rlog.LogKeyMessageQueue: queue,
+ })
return nil, ErrBrokerNotFound
}
@@ -838,7 +904,11 @@ func (dc *defaultConsumer) findConsumerList(topic string)
[]string {
cmd :=
remote.NewRemotingCommand(internal.ReqGetConsumerListByGroup, req, nil)
res, err := dc.client.InvokeSync(context.Background(),
brokerAddr, cmd, 3*time.Second) // TODO 超时机制有问题
if err != nil {
- rlog.Errorf("get consumer list of [%s] from %s error:
%s", dc.consumerGroup, brokerAddr, err.Error())
+ rlog.Error("get consumer list of group from broker
error", map[string]interface{}{
+ rlog.LogKeyConsumerGroup: dc.consumerGroup,
+ rlog.LogKeyBroker: brokerAddr,
+ rlog.LogKeyUnderlayError: err,
+ })
return nil
}
result := gjson.ParseBytes(res.Body)
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 7b230a0..2db940c 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -123,11 +123,15 @@ func (local *localFileOffsetStore) load() {
return
}
if err != nil {
- rlog.Errorf("read from store failed. err: %v \n", err)
+ rlog.Info("read from local store error, try to use bak file",
map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
data, err = utils.FileReadAll(filepath.Join(local.path, ".bak"))
}
if err != nil {
- rlog.Debugf("load local offset: %s error: %s", local.path,
err.Error())
+ rlog.Info("read from local store bak file error",
map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
return
}
datas := make(map[MessageQueueKey]int64)
@@ -138,7 +142,10 @@ func (local *localFileOffsetStore) load() {
err = json.Unmarshal(data, &wrapper)
if err != nil {
- rlog.Debugf("unmarshal local offset: %s error: %s", local.path,
err.Error())
+ rlog.Warning("unmarshal local offset error",
map[string]interface{}{
+ "local_path": local.path,
+ rlog.LogKeyUnderlayError: err.Error(),
+ })
return
}
@@ -166,7 +173,10 @@ func (local *localFileOffsetStore) read(mq
*primitive.MessageQueue, t readType)
func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset
int64, increaseOnly bool) {
local.mutex.Lock()
defer local.mutex.Unlock()
- rlog.Debugf("update offset: %s to %d", mq, offset)
+ rlog.Debug("update offset", map[string]interface{}{
+ rlog.LogKeyMessageQueue: mq,
+ "new_offset": offset,
+ })
key := MessageQueueKey(*mq)
localOffset, exist := local.OffsetTable[key]
if !exist {
@@ -237,10 +247,18 @@ func (r *remoteBrokerOffsetStore) persist(mqs
[]*primitive.MessageQueue) {
}
err := r.updateConsumeOffsetToBroker(r.group, mq, off)
if err != nil {
- rlog.Warnf("update offset to broker error: %s, group:
%s, queue: %s, offset: %d",
- err.Error(), r.group, mq.String(), off)
+ rlog.Warning("update offset to broker error",
map[string]interface{}{
+ rlog.LogKeyConsumerGroup: r.group,
+ rlog.LogKeyMessageQueue: mq.String(),
+ rlog.LogKeyUnderlayError: err.Error(),
+ "offset": off,
+ })
} else {
- rlog.Debugf("update offset to broker success, group:
%s, topic: %s, queue: %v offset: %v", r.group, mq.Topic, mq, off)
+ rlog.Info("update offset to broker success",
map[string]interface{}{
+ rlog.LogKeyConsumerGroup: r.group,
+ rlog.LogKeyMessageQueue: mq.String(),
+ "offset": off,
+ })
}
}
}
@@ -250,7 +268,9 @@ func (r *remoteBrokerOffsetStore) remove(mq
*primitive.MessageQueue) {
defer r.mutex.Unlock()
delete(r.OffsetTable, *mq)
- rlog.Infof("delete queueID %v of brokerName: %v \n", mq.QueueId,
mq.BrokerName)
+ rlog.Info("delete mq from offset table", map[string]interface{}{
+ rlog.LogKeyMessageQueue: mq,
+ })
}
func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType)
int64 {
@@ -268,7 +288,10 @@ func (r *remoteBrokerOffsetStore) read(mq
*primitive.MessageQueue, t readType) i
case _ReadFromStore:
off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)
if err != nil {
- rlog.Errorf("fetch offset of %s error: %s",
mq.String(), err.Error())
+ rlog.Error("fecth offset of mq error",
map[string]interface{}{
+ rlog.LogKeyMessageQueue: mq.String(),
+ rlog.LogKeyUnderlayError: err,
+ })
r.mutex.RUnlock()
return -1
}
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 19fbafa..eeabbb2 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -176,7 +176,10 @@ func (pq *processQueue) cleanExpiredMsg(consumer
defaultConsumer) {
if startTime != "" {
st, err := strconv.ParseInt(startTime, 10, 64)
if err != nil {
- rlog.Warnf("parse message start consume time
error: %s, origin str is: %s", startTime)
+ rlog.Warning("parse message start consume time
error", map[string]interface{}{
+ "time": startTime,
+ rlog.LogKeyUnderlayError: err,
+ })
continue
}
if time.Now().Unix()-st <=
int64(consumer.option.ConsumeTimeout) {
@@ -188,7 +191,9 @@ func (pq *processQueue) cleanExpiredMsg(consumer
defaultConsumer) {
err := consumer.sendBack(msg, 3)
if err != nil {
- rlog.Errorf("send message back to broker error: %s when
clean expired messages", err.Error())
+ rlog.Error("send message back to broker error when
clean expired messages", map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
continue
}
pq.removeMessage(msg)
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 7d47b8a..6a6fa64 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -132,7 +132,10 @@ func (c *defaultPullConsumer) Pull(ctx context.Context,
topic string, selector M
func (c *defaultPullConsumer) getNextQueueOf(topic string)
*primitive.MessageQueue {
queues, err :=
c.defaultConsumer.namesrv.FetchSubscribeMessageQueues(topic)
if err != nil && len(queues) > 0 {
- rlog.Error(err.Error())
+ rlog.Error("get next mq error", map[string]interface{}{
+ rlog.LogKeyTopic: topic,
+ rlog.LogKeyUnderlayError: err.Error(),
+ })
return nil
}
var index int64
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c04fd4b..d75f7da 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -111,15 +111,20 @@ func NewPushConsumer(opts ...Option) (*pushConsumer,
error) {
func (pc *pushConsumer) Start() error {
var err error
pc.once.Do(func() {
- rlog.Infof("the consumerGroup=%s start beginning.
messageModel=%v, unitMode=%v",
- pc.consumerGroup, pc.model, pc.unitMode)
+ rlog.Info("the consumer start beginning",
map[string]interface{}{
+ rlog.LogKeyConsumerGroup: pc.consumerGroup,
+ "messageModel": pc.model,
+ "unitMode": pc.unitMode,
+ })
pc.state = internal.StateStartFailed
pc.validate()
err := pc.client.RegisterConsumer(pc.consumerGroup, pc)
if err != nil {
pc.state = internal.StateStartFailed
- rlog.Errorf("the consumer group: [%s] has been created,
specify another name.", pc.consumerGroup)
+ rlog.Error("the consumer group has been created,
specify another one", map[string]interface{}{
+ rlog.LogKeyConsumerGroup: pc.consumerGroup,
+ })
err = ErrCreated
}
@@ -232,7 +237,10 @@ func (pc *pushConsumer) messageQueueChanged(topic string,
mqAll, mqDivided []*pr
}
data := v.(*internal.SubscriptionData)
newVersion := time.Now().UnixNano()
- rlog.Infof("the MessageQueue changed, also update version: %d to %d",
data.SubVersion, newVersion)
+ rlog.Info("the MessageQueue changed, version also updated",
map[string]interface{}{
+ rlog.LogKeyValueChangedFrom: data.SubVersion,
+ rlog.LogKeyValueChangedTo: newVersion,
+ })
data.SubVersion = newVersion
// TODO: optimize
@@ -247,7 +255,10 @@ func (pc *pushConsumer) messageQueueChanged(topic string,
mqAll, mqDivided []*pr
if newVal == 0 {
newVal = 1
}
- rlog.Infof("The PullThresholdForTopic is changed from
%d to %d", pc.option.PullThresholdForTopic, newVal)
+ rlog.Info("The PullThresholdForTopic is changed",
map[string]interface{}{
+ rlog.LogKeyValueChangedFrom:
pc.option.PullThresholdForTopic,
+ rlog.LogKeyValueChangedTo: newVal,
+ })
pc.option.PullThresholdForTopic = newVal
}
@@ -256,8 +267,10 @@ func (pc *pushConsumer) messageQueueChanged(topic string,
mqAll, mqDivided []*pr
if newVal == 0 {
newVal = 1
}
- rlog.Infof("The PullThresholdSizeForTopic is changed
from %d to %d", pc.option.PullThresholdSizeForTopic, newVal)
- pc.option.PullThresholdSizeForTopic = newVal
+ rlog.Info("The PullThresholdSizeForTopic is changed",
map[string]interface{}{
+ rlog.LogKeyValueChangedFrom:
pc.option.PullThresholdSizeForTopic,
+ rlog.LogKeyValueChangedTo: newVal,
+ })
}
}
pc.client.SendHeartbeatToAllBrokerWithLock()
@@ -268,18 +281,18 @@ func (pc *pushConsumer) validate() {
if pc.consumerGroup == internal.DefaultConsumerGroup {
// TODO FQA
- rlog.Errorf("consumerGroup can't equal [%s], please specify
another one.", internal.DefaultConsumerGroup)
+ rlog.Error(fmt.Sprintf("consumerGroup can't equal [%s], please
specify another one.", internal.DefaultConsumerGroup), nil)
}
if len(pc.subscribedTopic) == 0 {
- rlog.Error("number of subscribed topics is 0.")
+ rlog.Error("number of subscribed topics is 0.", nil)
}
if pc.option.ConsumeConcurrentlyMaxSpan < 1 ||
pc.option.ConsumeConcurrentlyMaxSpan > 65535 {
if pc.option.ConsumeConcurrentlyMaxSpan == 0 {
pc.option.ConsumeConcurrentlyMaxSpan = 1000
} else {
- rlog.Error("option.ConsumeConcurrentlyMaxSpan out of
range [1, 65535]")
+ rlog.Error("option.ConsumeConcurrentlyMaxSpan out of
range [1, 65535]", nil)
}
}
@@ -287,7 +300,7 @@ func (pc *pushConsumer) validate() {
if pc.option.PullThresholdForQueue == 0 {
pc.option.PullThresholdForQueue = 1024
} else {
- rlog.Error("option.PullThresholdForQueue out of range
[1, 65535]")
+ rlog.Error("option.PullThresholdForQueue out of range
[1, 65535]", nil)
}
}
@@ -295,7 +308,7 @@ func (pc *pushConsumer) validate() {
if pc.option.PullThresholdForTopic == 0 {
pc.option.PullThresholdForTopic = 102400
} else {
- rlog.Error("option.PullThresholdForTopic out of range
[1, 6553500]")
+ rlog.Error("option.PullThresholdForTopic out of range
[1, 6553500]", nil)
}
}
@@ -303,7 +316,7 @@ func (pc *pushConsumer) validate() {
if pc.option.PullThresholdSizeForQueue == 0 {
pc.option.PullThresholdSizeForQueue = 512
} else {
- rlog.Error("option.PullThresholdSizeForQueue out of
range [1, 1024]")
+ rlog.Error("option.PullThresholdSizeForQueue out of
range [1, 1024]", nil)
}
}
@@ -311,19 +324,19 @@ func (pc *pushConsumer) validate() {
if pc.option.PullThresholdSizeForTopic == 0 {
pc.option.PullThresholdSizeForTopic = 51200
} else {
- rlog.Error("option.PullThresholdSizeForTopic out of
range [1, 102400]")
+ rlog.Error("option.PullThresholdSizeForTopic out of
range [1, 102400]", nil)
}
}
if pc.option.PullInterval < 0 || pc.option.PullInterval > 65535 {
- rlog.Error("option.PullInterval out of range [0, 65535]")
+ rlog.Error("option.PullInterval out of range [0, 65535]", nil)
}
if pc.option.ConsumeMessageBatchMaxSize < 1 ||
pc.option.ConsumeMessageBatchMaxSize > 1024 {
if pc.option.ConsumeMessageBatchMaxSize == 0 {
pc.option.ConsumeMessageBatchMaxSize = 512
} else {
- rlog.Error("option.ConsumeMessageBatchMaxSize out of
range [1, 1024]")
+ rlog.Error("option.ConsumeMessageBatchMaxSize out of
range [1, 1024]", nil)
}
}
@@ -331,13 +344,15 @@ func (pc *pushConsumer) validate() {
if pc.option.PullBatchSize == 0 {
pc.option.PullBatchSize = 32
} else {
- rlog.Error("option.PullBatchSize out of range [1,
1024]")
+ rlog.Error("option.PullBatchSize out of range [1,
1024]", nil)
}
}
}
func (pc *pushConsumer) pullMessage(request *PullRequest) {
- rlog.Debugf("start a new Pull Message task %s for [%s]",
request.String(), pc.consumerGroup)
+ rlog.Debug("start a new Pull Message task for PullRequest",
map[string]interface{}{
+ rlog.LogKeyPullRequest: request.String(),
+ })
var sleepTime time.Duration
pq := request.pq
go func() {
@@ -350,11 +365,13 @@ func (pc *pushConsumer) pullMessage(request *PullRequest)
{
for {
NEXT:
if pq.dropped {
- rlog.Infof("the request: [%s] was dropped, so stop
task", request.String())
+ rlog.Debug("the request was dropped, so stop task",
map[string]interface{}{
+ rlog.LogKeyPullRequest: request.String(),
+ })
return
}
if sleepTime > 0 {
- rlog.Infof("pull MessageQueue: %d sleep %d ms for mq:
%v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq)
+ rlog.Debug(fmt.Sprintf("pull MessageQueue: %d sleep %d
ms for mq: %v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq),
nil)
time.Sleep(sleepTime)
}
// reset time
@@ -362,14 +379,16 @@ func (pc *pushConsumer) pullMessage(request *PullRequest)
{
pq.lastPullTime = time.Now()
err := pc.makeSureStateOK()
if err != nil {
- rlog.Warnf("consumer state error: %s", err.Error())
+ rlog.Warning("consumer state error",
map[string]interface{}{
+ rlog.LogKeyUnderlayError: err.Error(),
+ })
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if pc.pause {
- rlog.Infof("consumer [%s] of [%s] was paused, execute
pull request [%s] later",
- pc.option.InstanceName, pc.consumerGroup,
request.String())
+ rlog.Debug(fmt.Sprintf("consumer [%s] of [%s] was
paused, execute pull request [%s] later",
+ pc.option.InstanceName, pc.consumerGroup,
request.String()), nil)
sleepTime = _PullDelayTimeWhenSuspend
goto NEXT
}
@@ -377,10 +396,15 @@ func (pc *pushConsumer) pullMessage(request *PullRequest)
{
cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb)
if pq.cachedMsgCount > pc.option.PullThresholdForQueue {
if pc.queueFlowControlTimes%1000 == 0 {
- rlog.Warnf("the cached message count exceeds
the threshold %d, so do flow control, "+
- "minOffset=%d, maxOffset=%d, count=%d,
size=%d MiB, pullRequest=%s, flowControlTimes=%d",
- pc.option.PullThresholdForQueue, 0,
pq.Min(), pq.Max(),
- pq.msgCache, cachedMessageSizeInMiB,
request.String(), pc.queueFlowControlTimes)
+ rlog.Warning("the cached message count exceeds
the threshold, so do flow control", map[string]interface{}{
+ "PullThresholdForQueue":
pc.option.PullThresholdForQueue,
+ "minOffset": pq.Min(),
+ "maxOffset": pq.Max(),
+ "count": pq.msgCache,
+ "size(MiB)":
cachedMessageSizeInMiB,
+ "flowControlTimes":
pc.queueFlowControlTimes,
+ rlog.LogKeyPullRequest:
request.String(),
+ })
}
pc.queueFlowControlTimes++
sleepTime = _PullDelayTimeWhenFlowControl
@@ -389,10 +413,15 @@ func (pc *pushConsumer) pullMessage(request *PullRequest)
{
if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue
{
if pc.queueFlowControlTimes%1000 == 0 {
- rlog.Warnf("the cached message size exceeds the
threshold %d MiB, so do flow control, "+
- "minOffset=%d, maxOffset=%d, count=%d,
size=%d MiB, pullRequest=%s, flowControlTimes=%d",
- pc.option.PullThresholdSizeForQueue,
pq.Min(), pq.Max(),
- pq.msgCache, cachedMessageSizeInMiB,
request.String(), pc.queueFlowControlTimes)
+ rlog.Warning("the cached message size exceeds
the threshold, so do flow control", map[string]interface{}{
+ "PullThresholdSizeForQueue":
pc.option.PullThresholdSizeForQueue,
+ "minOffset": pq.Min(),
+ "maxOffset": pq.Max(),
+ "count":
pq.msgCache,
+ "size(MiB)":
cachedMessageSizeInMiB,
+ "flowControlTimes":
pc.queueFlowControlTimes,
+ rlog.LogKeyPullRequest:
request.String(),
+ })
}
pc.queueFlowControlTimes++
sleepTime = _PullDelayTimeWhenFlowControl
@@ -401,11 +430,15 @@ func (pc *pushConsumer) pullMessage(request *PullRequest)
{
if !pc.consumeOrderly {
if pq.getMaxSpan() >
pc.option.ConsumeConcurrentlyMaxSpan {
-
if pc.queueMaxSpanFlowControlTimes%1000 == 0 {
- rlog.Warnf("the queue's messages, span
too long, limit=%d, so do flow control, minOffset=%d, "+
- "maxOffset=%d, maxSpan=%d,
pullRequest=%s, flowControlTimes=%d", pc.option.ConsumeConcurrentlyMaxSpan,
- pq.Min(), pq.Max(),
pq.getMaxSpan(), request.String(), pc.queueMaxSpanFlowControlTimes)
+ rlog.Warning("the queue's messages span
too long, so do flow control", map[string]interface{}{
+ "ConsumeConcurrentlyMaxSpan":
pc.option.ConsumeConcurrentlyMaxSpan,
+ "minOffset":
pq.Min(),
+ "maxOffset":
pq.Max(),
+ "maxSpan":
pq.getMaxSpan(),
+ "flowControlTimes":
pc.queueFlowControlTimes,
+ rlog.LogKeyPullRequest:
request.String(),
+ })
}
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
@@ -415,19 +448,23 @@ func (pc *pushConsumer) pullMessage(request *PullRequest)
{
if !request.lockedFirst {
offset :=
pc.computePullFromWhere(request.mq)
brokerBusy := offset <
request.nextOffset
- rlog.Infof("the first time to pull
message, so fix offset from broker. "+
- "pullRequest: [%s] NewOffset:
%d brokerBusy: %v",
- request.String(), offset,
brokerBusy)
+ rlog.Info("the first time to pull
message, so fix offset from broker, offset maybe changed",
map[string]interface{}{
+ rlog.LogKeyPullRequest:
request.String(),
+ rlog.LogKeyValueChangedFrom:
request.nextOffset,
+ rlog.LogKeyValueChangedTo:
offset,
+ "brokerBusy":
brokerBusy,
+ })
if brokerBusy {
- rlog.Infof("[NOTIFY_ME]the
first time to pull message, but pull request offset"+
- " larger than broker
consume offset. pullRequest: [%s] NewOffset: %d",
- request.String(),
offset)
+ rlog.Info("[NOTIFY_ME] the
first time to pull message, but pull request offset larger than "+
+ "broker consume
offset", map[string]interface{}{"offset": offset})
}
request.lockedFirst = true
request.nextOffset = offset
}
} else {
- rlog.Infof("pull message later because not
locked in broker, [%s]", request.String())
+ rlog.Info("pull message later because not
locked in broker", map[string]interface{}{
+ rlog.LogKeyPullRequest:
request.String(),
+ })
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
@@ -435,7 +472,9 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)
if !exist {
- rlog.Warnf("find the consumer's subscription failed,
%s", request.String())
+ rlog.Info("find the consumer's subscription failed",
map[string]interface{}{
+ rlog.LogKeyPullRequest: request.String(),
+ })
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
@@ -481,27 +520,34 @@ func (pc *pushConsumer) pullMessage(request *PullRequest)
{
brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)
if brokerResult == nil {
- rlog.Warnf("no broker found for %s",
request.mq.String())
+ rlog.Warning("no broker found for mq",
map[string]interface{}{
+ rlog.LogKeyPullRequest: request.mq.String(),
+ })
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
result, err := pc.client.PullMessage(context.Background(),
brokerResult.BrokerAddr, pullRequest)
if err != nil {
- rlog.Warnf("pull message from %s error: %s",
brokerResult.BrokerAddr, err.Error())
+ rlog.Warning("pull message from broker error",
map[string]interface{}{
+ rlog.LogKeyBroker:
brokerResult.BrokerAddr,
+ rlog.LogKeyUnderlayError: err.Error(),
+ })
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if result.Status == primitive.PullBrokerTimeout {
- rlog.Warnf("pull broker: %s timeout",
brokerResult.BrokerAddr)
+ rlog.Warning("pull broker timeout",
map[string]interface{}{
+ rlog.LogKeyBroker: brokerResult.BrokerAddr,
+ })
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
switch result.Status {
case primitive.PullFound:
- rlog.Debugf("Topic: %s, QueueId: %d found messages.",
request.mq.Topic, request.mq.QueueId)
+ rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found
messages.", request.mq.Topic, request.mq.QueueId), nil)
prevRequestOffset := request.nextOffset
request.nextOffset = result.NextBeginOffset
@@ -518,16 +564,23 @@ func (pc *pushConsumer) pullMessage(request *PullRequest)
{
pq.putMessage(msgFounded...)
}
if result.NextBeginOffset < prevRequestOffset ||
firstMsgOffset < prevRequestOffset {
- rlog.Warnf("[BUG] pull message result maybe
data wrong, [nextBeginOffset=%d, "+
- "firstMsgOffset=%d,
prevRequestOffset=%d]", result.NextBeginOffset, firstMsgOffset,
prevRequestOffset)
+ rlog.Warning("[BUG] pull message result maybe
data wrong", map[string]interface{}{
+ "nextBeginOffset":
result.NextBeginOffset,
+ "firstMsgOffset": firstMsgOffset,
+ "prevRequestOffset": prevRequestOffset,
+ })
}
case primitive.PullNoNewMsg:
- rlog.Debugf("Topic: %s, QueueId: %d no more msg,
current offset: %d, next offset: %d", request.mq.Topic, request.mq.QueueId,
pullRequest.QueueOffset, result.NextBeginOffset)
+ rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d no more
msg, current offset: %d, next offset: %d",
+ request.mq.Topic, request.mq.QueueId,
pullRequest.QueueOffset, result.NextBeginOffset), nil)
case primitive.PullNoMsgMatched:
request.nextOffset = result.NextBeginOffset
pc.correctTagsOffset(request)
case primitive.PullOffsetIllegal:
- rlog.Warnf("the pull request offset illegal, request:
%s, result: %s", request.String(), result.String())
+ rlog.Warning("the pull request offset illegal",
map[string]interface{}{
+ rlog.LogKeyPullRequest: request.String(),
+ "result": result.String(),
+ })
request.nextOffset = result.NextBeginOffset
pq.dropped = true
go func() {
@@ -535,10 +588,10 @@ func (pc *pushConsumer) pullMessage(request *PullRequest)
{
pc.storage.update(request.mq,
request.nextOffset, false)
pc.storage.persist([]*primitive.MessageQueue{request.mq})
pc.storage.remove(request.mq)
- rlog.Warnf("fix the pull request offset: %s",
request.String())
+ rlog.Warning(fmt.Sprintf("fix the pull request
offset: %s", request.String()), nil)
}()
default:
- rlog.Warnf("unknown pull status: %v", result.Status)
+ rlog.Warning(fmt.Sprintf("unknown pull status: %v",
result.Status), nil)
sleepTime = _PullDelayTimeWhenError
}
}
@@ -577,25 +630,25 @@ func (pc *pushConsumer) buildSendBackRequest(msg
*primitive.MessageExt, delayLev
func (pc *pushConsumer) suspend() {
pc.pause = true
- rlog.Infof("suspend consumer: %s", pc.consumerGroup)
+ rlog.Info(fmt.Sprintf("suspend consumer: %s", pc.consumerGroup), nil)
}
func (pc *pushConsumer) resume() {
pc.pause = false
pc.doBalance()
- rlog.Infof("resume consumer: %s", pc.consumerGroup)
+ rlog.Info(fmt.Sprintf("resume consumer: %s", pc.consumerGroup), nil)
}
func (pc *pushConsumer) resetOffset(topic string, table
map[primitive.MessageQueue]int64) {
//topic := cmd.ExtFields["topic"]
//group := cmd.ExtFields["group"]
//if topic == "" || group == "" {
- // rlog.Warnf("received reset offset command from: %s, but missing
params.", from)
+ // rlog.Warning("received reset offset command from: %s, but
missing params.", from)
// return
//}
//t, err := strconv.ParseInt(cmd.ExtFields["timestamp"], 10, 64)
//if err != nil {
- // rlog.Warnf("received reset offset command from: %s, but parse
time error: %s", err.Error())
+ // rlog.Warning("received reset offset command from: %s, but parse
time error: %s", err.Error())
// return
//}
//rlog.Infof("invoke reset offset operation from broker. brokerAddr=%s,
topic=%s, group=%s, timestamp=%v",
@@ -604,7 +657,7 @@ func (pc *pushConsumer) resetOffset(topic string, table
map[primitive.MessageQue
//offsetTable := make(map[MessageQueue]int64, 0)
//err = json.Unmarshal(cmd.Body, &offsetTable)
//if err != nil {
- // rlog.Warnf("received reset offset command from: %s, but parse
offset table: %s", err.Error())
+ // rlog.Warning("received reset offset command from: %s, but parse
offset table: %s", err.Error())
// return
//}
//v, exist := c.consumerMap.Load(group)
@@ -710,12 +763,13 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
go func() {
RETRY:
if pq.dropped {
- rlog.Infof("the message queue not be able to
consume, because it was dropped. group=%s, mq=%s",
- pc.consumerGroup, mq.String())
+ rlog.Info("the message queue not be able to
consume, because it was dropped", map[string]interface{}{
+ rlog.LogKeyMessageQueue: mq.String(),
+ rlog.LogKeyConsumerGroup:
pc.consumerGroup,
+ })
return
}
- // TODO hook
beginTime := time.Now()
pc.resetRetryAndNamespace(subMsgs)
var result ConsumeResult
@@ -757,7 +811,9 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
if pc.model == BroadCasting {
for i := 0; i < len(msgs); i++ {
-
rlog.Warnf("BROADCASTING, the message=%s consume failed, drop it, {}",
subMsgs[i])
+
rlog.Warning("BROADCASTING, the message consume failed, drop it",
map[string]interface{}{
+ "message":
subMsgs[i],
+ })
}
} else {
for i := 0; i < len(msgs); i++ {
@@ -781,8 +837,10 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
goto RETRY
}
} else {
- rlog.Warnf("processQueue is dropped without
process consume result. messageQueue=%s, msgs=%+v",
- mq, msgs)
+ rlog.Warning("processQueue is dropped without
process consume result.", map[string]interface{}{
+ rlog.LogKeyMessageQueue: mq,
+ "message": msgs,
+ })
}
}()
}
@@ -790,7 +848,9 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq
*primitive.MessageQueue) {
if pq.dropped {
- rlog.Warn("the message queue not be able to consume, because
it's dropped.")
+ rlog.Warning("the message queue not be able to consume, because
it's dropped.", map[string]interface{}{
+ rlog.LogKeyMessageQueue: mq.String(),
+ })
return
}
@@ -803,17 +863,23 @@ func (pc *pushConsumer) consumeMessageOrderly(pq
*processQueue, mq *primitive.Me
continueConsume := true
for continueConsume {
if pq.dropped {
- rlog.Warnf("the message queue not be able to
consume, because it's dropped. %v", mq)
+ rlog.Warning("the message queue not be able to
consume, because it's dropped.", map[string]interface{}{
+ rlog.LogKeyMessageQueue: mq.String(),
+ })
break
}
if pc.model == Clustering {
if !pq.locked {
- rlog.Warnf("the message queue not
locked, so consume later: %v", mq)
+ rlog.Warning("the message queue not
locked, so consume later", map[string]interface{}{
+ rlog.LogKeyMessageQueue:
mq.String(),
+ })
pc.tryLockLaterAndReconsume(mq, 10)
return
}
if pq.isLockExpired() {
- rlog.Warnf("the message queue lock
expired, so consume later: %v", mq)
+ rlog.Warning("the message queue lock
expired, so consume later", map[string]interface{}{
+ rlog.LogKeyMessageQueue:
mq.String(),
+ })
pc.tryLockLaterAndReconsume(mq, 10)
return
}
@@ -855,8 +921,11 @@ func (pc *pushConsumer) consumeMessageOrderly(pq
*processQueue, mq *primitive.Me
pq.lockConsume.Unlock()
if result == Rollback || result ==
SuspendCurrentQueueAMoment {
- rlog.Warnf("consumeMessage Orderly return not
OK, Group: %v Msgs: %v MQ: %v",
- pc.consumerGroup, msgs, mq)
+ rlog.Warning("consumeMessage Orderly return not
OK", map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
pc.consumerGroup,
+ "messages": msgs,
+ rlog.LogKeyMessageQueue: mq,
+ })
}
// jsut put consumeResult in consumerMessageCtx
@@ -875,7 +944,9 @@ func (pc *pushConsumer) consumeMessageOrderly(pq
*processQueue, mq *primitive.Me
if pc.option.AutoCommit {
switch result {
case Commit, Rollback:
- rlog.Warnf("the message queue consume
result is illegal, we think you want to ack these message: %v", mq)
+ rlog.Warning("the message queue consume
result is illegal, we think you want to ack these message: %v",
map[string]interface{}{
+ rlog.LogKeyMessageQueue: mq,
+ })
case ConsumeSuccess:
commitOffset = pq.commit()
case SuspendCurrentQueueAMoment:
@@ -906,12 +977,14 @@ func (pc *pushConsumer) consumeMessageOrderly(pq
*processQueue, mq *primitive.Me
}
}
if commitOffset > 0 && !pq.dropped {
- pc.updateOffset(mq, commitOffset)
+ _ = pc.updateOffset(mq, commitOffset)
}
}
} else {
if pq.dropped {
- rlog.Warnf("the message queue not be able to consume,
because it's dropped. %v", mq)
+ rlog.Warning("the message queue not be able to consume,
because it's dropped.", map[string]interface{}{
+ rlog.LogKeyMessageQueue: mq.String(),
+ })
}
pc.tryLockLaterAndReconsume(mq, 100)
}
@@ -923,7 +996,7 @@ func (pc *pushConsumer) checkReconsumeTimes(msgs
[]*primitive.MessageExt) bool {
maxReconsumeTimes := pc.getOrderlyMaxReconsumeTimes()
for _, msg := range msgs {
if msg.ReconsumeTimes > maxReconsumeTimes {
- rlog.Warnf("msg will be send to retry topic due
to ReconsumeTimes > %d, \n", maxReconsumeTimes)
+ rlog.Warning(fmt.Sprintf("msg will be send to
retry topic due to ReconsumeTimes > %d, \n", maxReconsumeTimes), nil)
msg.WithProperty("RECONSUME_TIME",
strconv.Itoa(int(msg.ReconsumeTimes)))
if !pc.sendMessageBack("", msg, -1) {
suspend = true
diff --git a/consumer/statistics.go b/consumer/statistics.go
index 43f547a..2448c74 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -19,6 +19,7 @@ package consumer
import (
"container/list"
+ "fmt"
"sync"
"sync/atomic"
"time"
@@ -358,20 +359,35 @@ func (si *statsItem) samplingInHour() {
func (si *statsItem) printAtMinutes() {
ss := computeStatsData(si.csListMinute)
- rlog.Infof("[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT:
%.2f",
- si.statsName, si.statsKey, ss.sum, ss.tps, ss.avgpt)
+ rlog.Info("Stats In One Minute, SUM: %d TPS: AVGPT: %.2f",
map[string]interface{}{
+ "statsName": si.statsName,
+ "statsKey": si.statsKey,
+ "SUM": ss.sum,
+ "TPS": fmt.Sprintf("%.2f", ss.tps),
+ "AVGPT": ss.avgpt,
+ })
}
func (si *statsItem) printAtHour() {
ss := computeStatsData(si.csListHour)
- rlog.Infof("[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.2f",
- si.statsName, si.statsKey, ss.sum, ss.tps, ss.avgpt)
+ rlog.Info("Stats In One Hour, SUM: %d TPS: AVGPT: %.2f",
map[string]interface{}{
+ "statsName": si.statsName,
+ "statsKey": si.statsKey,
+ "SUM": ss.sum,
+ "TPS": fmt.Sprintf("%.2f", ss.tps),
+ "AVGPT": ss.avgpt,
+ })
}
func (si *statsItem) printAtDay() {
ss := computeStatsData(si.csListDay)
- rlog.Infof("[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.2f",
- si.statsName, si.statsKey, ss.sum, ss.tps, ss.avgpt)
+ rlog.Info("Stats In One Day, SUM: %d TPS: AVGPT: %.2f",
map[string]interface{}{
+ "statsName": si.statsName,
+ "statsKey": si.statsKey,
+ "SUM": ss.sum,
+ "TPS": fmt.Sprintf("%.2f", ss.tps),
+ "AVGPT": ss.avgpt,
+ })
}
func nextMinutesTime() time.Time {
diff --git a/consumer/strategy.go b/consumer/strategy.go
index f326f32..ff7055f 100644
--- a/consumer/strategy.go
+++ b/consumer/strategy.go
@@ -61,7 +61,11 @@ func AllocateByAveragely(consumerGroup, currentCID string,
mqAll []*primitive.Me
}
}
if !find {
- rlog.Warnf("[BUG] ConsumerGroup=%s, ConsumerId=%s not in
cidAll:%+v", consumerGroup, currentCID, cidAll)
+ rlog.Warning("[BUG] ConsumerId not in cidAll",
map[string]interface{}{
+ rlog.LogKeyConsumerGroup: consumerGroup,
+ "consumerId": currentCID,
+ "cidAll": cidAll,
+ })
return nil
}
@@ -88,7 +92,7 @@ func AllocateByAveragely(consumerGroup, currentCID string,
mqAll []*primitive.Me
}
num := utils.MinInt(averageSize, mqSize-startIndex)
- result := []*primitive.MessageQueue{}
+ result := make([]*primitive.MessageQueue, 0)
for i := 0; i < num; i++ {
result = append(result, mqAll[(startIndex+i)%mqSize])
}
@@ -113,11 +117,15 @@ func AllocateByAveragelyCircle(consumerGroup, currentCID
string, mqAll []*primit
}
}
if !find {
- rlog.Warnf("[BUG] ConsumerGroup=%s, ConsumerId=%s not in
cidAll:%+v", consumerGroup, currentCID, cidAll)
+ rlog.Warning("[BUG] ConsumerId not in cidAll",
map[string]interface{}{
+ rlog.LogKeyConsumerGroup: consumerGroup,
+ "consumerId": currentCID,
+ "cidAll": cidAll,
+ })
return nil
}
- result := []*primitive.MessageQueue{}
+ result := make([]*primitive.MessageQueue, 0)
for i := index; i < len(mqAll); i++ {
if i%len(cidAll) == index {
result = append(result, mqAll[i])
@@ -156,11 +164,15 @@ func AllocateByMachineRoom(consumeridcs []string)
AllocateStrategy {
}
}
if !find {
- rlog.Warnf("[BUG] ConsumerGroup=%s, ConsumerId=%s not
in cidAll:%+v", consumerGroup, currentCID, cidAll)
+ rlog.Warning("[BUG] ConsumerId not in cidAll",
map[string]interface{}{
+ rlog.LogKeyConsumerGroup: consumerGroup,
+ "consumerId": currentCID,
+ "cidAll": cidAll,
+ })
return nil
}
- premqAll := []*primitive.MessageQueue{}
+ var premqAll []*primitive.MessageQueue
for _, mq := range mqAll {
temp := strings.Split(mq.BrokerName, "@")
if len(temp) == 2 {
@@ -177,7 +189,7 @@ func AllocateByMachineRoom(consumeridcs []string)
AllocateStrategy {
startIndex := mod * index
endIndex := startIndex + mod
- result := []*primitive.MessageQueue{}
+ result := make([]*primitive.MessageQueue, 0)
for i := startIndex; i < endIndex; i++ {
result = append(result, mqAll[i])
}
@@ -204,7 +216,11 @@ func AllocateByConsistentHash(virtualNodeCnt int)
AllocateStrategy {
}
}
if !find {
- rlog.Warnf("[BUG] ConsumerGroup=%s, ConsumerId=%s not
in cidAll:%+v", consumerGroup, currentCID, cidAll)
+ rlog.Warning("[BUG] ConsumerId not in cidAll",
map[string]interface{}{
+ rlog.LogKeyConsumerGroup: consumerGroup,
+ "consumerId": currentCID,
+ "cidAll": cidAll,
+ })
return nil
}
@@ -214,11 +230,13 @@ func AllocateByConsistentHash(virtualNodeCnt int)
AllocateStrategy {
c.Add(cid)
}
- result := []*primitive.MessageQueue{}
+ result := make([]*primitive.MessageQueue, 0)
for _, mq := range mqAll {
clientNode, err := c.Get(mq.String())
if err != nil {
- rlog.Warnf("[BUG] AllocateByConsistentHash
err:%s", err.Error())
+ rlog.Warning("[BUG] AllocateByConsistentHash
err: %s", map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
}
if currentCID == clientNode {
result = append(result, mq)
diff --git a/examples/consumer/pull/main.go b/examples/consumer/pull/main.go
index 35b970c..2b0a9f2 100644
--- a/examples/consumer/pull/main.go
+++ b/examples/consumer/pull/main.go
@@ -34,11 +34,11 @@ func main() {
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
)
if err != nil {
- rlog.Fatal("fail to new pullConsumer: ", err)
+ rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err),
nil)
}
err = c.Start()
if err != nil {
- rlog.Fatal("fail to new pullConsumer: ", err)
+ rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err),
nil)
}
ctx := context.Background()
diff --git a/go.sum b/go.sum
index b8f3ab3..9d64cc9 100644
--- a/go.sum
+++ b/go.sum
@@ -45,5 +45,3 @@ golang.org/x/text v0.3.0/go.mod
h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod
h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262
h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod
h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
-stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
-stathat.com/c/consistent v1.0.0/go.mod
h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
diff --git a/internal/client.go b/internal/client.go
index 99227ef..debc87c 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -177,7 +177,9 @@ func GetOrNewRocketMQClient(option ClientOptions,
callbackCh chan interface{}) *
actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
if !loaded {
client.remoteClient.RegisterRequestFunc(ReqNotifyConsumerIdsChanged, func(req
*remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
- rlog.Infof("receive broker's notification, the consumer
group: %s", req.ExtFields["consumerGroup"])
+ rlog.Info("receive broker's notification to consumer
group", map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
req.ExtFields["consumerGroup"],
+ })
client.RebalanceImmediately()
return nil
})
@@ -186,7 +188,7 @@ func GetOrNewRocketMQClient(option ClientOptions,
callbackCh chan interface{}) *
header.Decode(req.ExtFields)
msgExts := primitive.DecodeMessage(req.Body)
if len(msgExts) == 0 {
- rlog.Warn("checkTransactionState, decode
message failed")
+ rlog.Warning("checkTransactionState, decode
message failed", nil)
return nil
}
msgExt := msgExts[0]
@@ -197,11 +199,11 @@ func GetOrNewRocketMQClient(option ClientOptions,
callbackCh chan interface{}) *
}
group :=
msgExt.GetProperty(primitive.PropertyProducerGroup)
if group == "" {
- rlog.Warn("checkTransactionState, pick producer
group failed")
+ rlog.Warning("checkTransactionState, pick
producer group failed", nil)
return nil
}
if option.GroupName != group {
- rlog.Warn("producer group is not equal.")
+ rlog.Warning("producer group is not equal", nil)
return nil
}
callback := CheckTransactionStateCallback{
@@ -214,7 +216,7 @@ func GetOrNewRocketMQClient(option ClientOptions,
callbackCh chan interface{}) *
})
client.remoteClient.RegisterRequestFunc(ReqGetConsumerRunningInfo, func(req
*remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
- rlog.Info("receive get consumer running info
request...")
+ rlog.Info("receive get consumer running info
request...", nil)
res := remote.NewRemotingCommand(ResError, nil, nil)
res.Remark = "the go client has not supported consumer
running info"
return res
@@ -261,7 +263,9 @@ func (c *rmqClient) Start() {
consumer := value.(InnerConsumer)
err := consumer.PersistConsumerOffset()
if err != nil {
- rlog.Errorf("persist offset
failed. err: %v", err)
+ rlog.Error("persist offset
failed", map[string]interface{}{
+
rlog.LogKeyUnderlayError: err,
+ })
}
return true
})
@@ -349,7 +353,7 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
return true
})
if hbData.ProducerDatas.Len() == 0 && hbData.ConsumerDatas.Len() == 0 {
- rlog.Info("sending heartbeat, but no producer and no consumer")
+ rlog.Info("sending heartbeat, but no producer and no consumer",
nil)
return
}
c.namesrvs.brokerAddressesMap.Range(func(key, value interface{}) bool {
@@ -359,7 +363,9 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
cmd := remote.NewRemotingCommand(ReqHeartBeat, nil,
hbData.encode())
response, err :=
c.remoteClient.InvokeSync(context.Background(), addr, cmd, 3*time.Second)
if err != nil {
- rlog.Warnf("send heart beat to broker error:
%s", err.Error())
+ rlog.Warning("send heart beat to broker error",
map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
return true
}
if response.Code == ResSuccess {
@@ -372,7 +378,11 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
c.namesrvs.brokerVersionMap.Store(brokerName, m)
}
m[brokerName] = int32(response.Version)
- rlog.Debugf("send heart beat to broker[%s %d
%s] success", brokerName, id, addr)
+ rlog.Debug("send heart beat to broker success",
map[string]interface{}{
+ "brokerName": brokerName,
+ "brokerId": id,
+ "brokerAddr": addr,
+ })
}
}
return true
@@ -408,22 +418,6 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
}
}
-// SendMessageAsync send message with batch by async
-func (c *rmqClient) SendMessageAsync(ctx context.Context, brokerAddrs,
brokerName string, request *SendMessageRequest,
- msgs []*primitive.Message, f func(result *primitive.SendResult)) error {
- return nil
-}
-
-func (c *rmqClient) SendMessageOneWay(ctx context.Context, brokerAddrs string,
request *SendMessageRequest,
- msgs []*primitive.Message) (*primitive.SendResult, error) {
- cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request,
encodeMessages(msgs))
- err := c.remoteClient.InvokeOneWay(ctx, brokerAddrs, cmd, 3*time.Second)
- if err != nil {
- rlog.Warnf("send messages with oneway error: %v", err)
- }
- return nil, err
-}
-
func (c *rmqClient) ProcessSendResponse(brokerName string, cmd
*remote.RemotingCommand, resp *primitive.SendResult, msgs
...*primitive.Message) error {
var status primitive.SendStatus
switch cmd.Code {
diff --git a/internal/model.go b/internal/model.go
index ad86e2e..b7aa125 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -92,9 +92,11 @@ func NewHeartbeatData(clientID string) *heartbeatData {
func (data *heartbeatData) encode() []byte {
d, err := json.Marshal(data)
if err != nil {
- rlog.Errorf("marshal heartbeatData error: %s", err.Error())
+ rlog.Error("marshal heartbeatData error",
map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
return nil
}
- rlog.Debugf("heartbeat: " + string(d))
+ rlog.Debug("heartbeat: " + string(d), nil)
return d
}
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 703e958..6d34a8b 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -140,34 +140,44 @@ func (c *remotingClient) receiveResponse(r net.Conn) {
header := make([]byte, 4)
for {
if err != nil {
- rlog.Errorf("conn err: %s so close", err.Error())
+ rlog.Error("conn error, close connection",
map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
c.closeConnection(r)
break
}
_, err = io.ReadFull(r, header)
if err != nil {
- rlog.Errorf("io readfull error: %s", err.Error())
+ rlog.Error("io ReadFull error", map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
continue
}
var length int32
err = binary.Read(bytes.NewReader(header), binary.BigEndian,
&length)
if err != nil {
- rlog.Errorf("binary decode header: %s", err.Error())
+ rlog.Error("binary decode header error",
map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
continue
}
buf := make([]byte, length)
_, err = io.ReadFull(r, buf)
if err != nil {
- rlog.Errorf("io readfull error: %s", err.Error())
+ rlog.Error("io ReadFull error", map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
continue
}
cmd, err := decode(buf)
if err != nil {
- rlog.Errorf("decode RemotingCommand error: %s",
err.Error())
+ rlog.Error("decode RemotingCommand error",
map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
continue
}
c.processCMD(cmd, r)
@@ -198,12 +208,17 @@ func (c *remotingClient) processCMD(cmd *RemotingCommand,
r net.Conn) {
res.Flag |= 1 << 0
err := c.sendRequest(r, res)
if err != nil {
- rlog.Warnf("send response to
broker error: %s, type is: %d", err, res.Code)
+ rlog.Warning("send response to
broker error", map[string]interface{}{
+
rlog.LogKeyUnderlayError: err,
+ "responseCode":
res.Code,
+ })
}
}
}()
} else {
- rlog.Warnf("receive broker's requests, but no func to
handle, code is: %d", cmd.Code)
+ rlog.Warning("receive broker's requests, but no func to
handle", map[string]interface{}{
+ "responseCode": cmd.Code,
+ })
}
}
}
@@ -216,7 +231,9 @@ func (c *remotingClient) createScanner(r io.Reader)
*bufio.Scanner {
scanner.Split(func(data []byte, atEOF bool) (int, []byte, error) {
defer func() {
if err := recover(); err != nil {
- rlog.Errorf("panic: %v", err)
+ rlog.Error("scanner split panic",
map[string]interface{}{
+ "panic": err,
+ })
}
}()
if !atEOF {
@@ -224,7 +241,9 @@ func (c *remotingClient) createScanner(r io.Reader)
*bufio.Scanner {
var length int32
err := binary.Read(bytes.NewReader(data[0:4]),
binary.BigEndian, &length)
if err != nil {
- rlog.Errorf("split data error: %s",
err.Error())
+ rlog.Error("split data error",
map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
return 0, nil, err
}
diff --git a/internal/route.go b/internal/route.go
index 3611bb9..e05034f 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -72,12 +72,18 @@ func (s *namesrvs) cleanOfflineBroker() {
})
if !isBrokerAddrExistInTopicRoute {
delete(bd.BrokerAddresses, k)
- rlog.Infof("the broker: [name=%s, ID=%d,
addr=%s,] is offline, remove it", brokerName, k, v)
+ rlog.Info("the broker: [name=%s, ID=%d,
addr=%s,] is offline, remove it", map[string]interface{}{
+ "brokerName": brokerName,
+ "brokerID": k,
+ "brokerAddr": v,
+ })
}
}
if len(bd.BrokerAddresses) == 0 {
s.brokerAddressesMap.Delete(brokerName)
- rlog.Infof("the broker [name=%s] name's host is
offline, remove it", brokerName)
+ rlog.Info("the broker name's host is offline, remove
it", map[string]interface{}{
+ "brokerName": brokerName,
+ })
}
return true
})
@@ -113,12 +119,16 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string)
*TopicRouteData {
routeData, err := s.queryTopicRouteInfoFromServer(topic)
if err != nil {
- rlog.Warnf("query topic route from server error: %s", err)
+ rlog.Warning("query topic route from server error: %s",
map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
return nil
}
if routeData == nil {
- rlog.Warnf("queryTopicRouteInfoFromServer return nil, Topic:
%s", topic)
+ rlog.Warning("queryTopicRouteInfoFromServer return nil",
map[string]interface{}{
+ rlog.LogKeyTopic: topic,
+ })
return nil
}
@@ -130,8 +140,11 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string)
*TopicRouteData {
if changed {
s.routeDataMap.Store(topic, routeData)
- rlog.Infof("the topic [%s] route info changed, old %v ,new %s",
topic,
- oldRouteData, routeData.String())
+ rlog.Info("the topic route info changed",
map[string]interface{}{
+ rlog.LogKeyTopic: topic,
+ rlog.LogKeyValueChangedFrom: oldRouteData,
+ rlog.LogKeyValueChangedTo: routeData.String(),
+ })
for _, brokerData := range routeData.BrokerDataList {
s.brokerAddressesMap.Store(brokerData.BrokerName,
brokerData)
}
@@ -261,7 +274,9 @@ func (s *namesrvs) FetchPublishMessageQueues(topic string)
([]*primitive.Message
if !exist {
routeData, err = s.queryTopicRouteInfoFromServer(topic)
if err != nil {
- rlog.Error("queryTopicRouteInfoFromServer failed.
topic: ", topic)
+ rlog.Error("queryTopicRouteInfoFromServer failed",
map[string]interface{}{
+ rlog.LogKeyTopic: topic,
+ })
return nil, err
}
s.routeDataMap.Store(topic, routeData)
@@ -273,9 +288,9 @@ func (s *namesrvs) FetchPublishMessageQueues(topic string)
([]*primitive.Message
if err != nil {
return nil, err
}
- publishinfo := s.routeData2PublishInfo(topic, routeData)
+ publishInfo := s.routeData2PublishInfo(topic, routeData)
- return publishinfo.MqList, nil
+ return publishInfo.MqList, nil
}
func (s *namesrvs) findBrokerVersion(brokerName, brokerAddr string) int32 {
@@ -312,7 +327,9 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic
string) (*TopicRouteData,
}
}
if err != nil {
- rlog.Errorf("connect to namesrv: %v failed.", s)
+ rlog.Error("connect to namesrv failed.", map[string]interface{}{
+ "namesrv": s,
+ })
return nil, err
}
@@ -325,7 +342,9 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic
string) (*TopicRouteData,
err = routeData.decode(string(response.Body))
if err != nil {
- rlog.Warnf("decode TopicRouteData error: %s", err)
+ rlog.Warning("decode TopicRouteData error: %s",
map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
return nil, err
}
return routeData, nil
diff --git a/internal/trace.go b/internal/trace.go
index 0fa4307..53ff5a2 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -268,14 +268,17 @@ func (td *traceDispatcher) Close() {
func (td *traceDispatcher) Append(ctx TraceContext) bool {
if !td.running {
- rlog.Error("traceDispatcher is closed.")
+ rlog.Error("traceDispatcher is closed.", nil)
return false
}
select {
case td.input <- ctx:
return true
default:
- rlog.Warnf("buffer full: %d, ctx is %v",
atomic.AddInt64(&td.discardCount, 1), ctx)
+ rlog.Warning("buffer full", map[string]interface{}{
+ "discardCount": atomic.AddInt64(&td.discardCount, 1),
+ "TraceContext": ctx,
+ })
return false
}
}
@@ -319,7 +322,7 @@ func (td *traceDispatcher) process() {
now = time.Now().UnixNano() /
int64(time.Millisecond)
runtime.Gosched()
}
- rlog.Infof("------end trace send %v %v", td.input,
td.batchCh)
+ rlog.Info(fmt.Sprintf("------end trace send %v %v",
td.input, td.batchCh), nil)
}
}
}
@@ -391,9 +394,9 @@ func (td *traceDispatcher) flush(topic, regionID string,
data []TraceTransferBea
}
}
-func (td *traceDispatcher) sendTraceDataByMQ(keyset Keyset, regionID string,
data string) {
+func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string,
data string) {
msg := primitive.NewMessage(td.traceTopic, []byte(data))
- msg.WithKeys(keyset.slice())
+ msg.WithKeys(keySet.slice())
mq, addr := td.findMq()
if mq == nil {
@@ -401,17 +404,24 @@ func (td *traceDispatcher) sendTraceDataByMQ(keyset
Keyset, regionID string, dat
}
var req = td.buildSendRequest(mq, msg)
- td.cli.InvokeAsync(context.Background(), addr, req,
5000*time.Millisecond, func(command *remote.RemotingCommand, e error) {
+ err := td.cli.InvokeAsync(context.Background(), addr, req, 5 *
time.Second, func(command *remote.RemotingCommand, e error) {
if e != nil {
- rlog.Error("send trace data ,the traceData is %v", data)
+ rlog.Error("send trace data error",
map[string]interface{}{
+ "traceData": data,
+ })
}
})
+ rlog.Error("send trace data error when invoke", map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
}
func (td *traceDispatcher) findMq() (*primitive.MessageQueue, string) {
mqs, err := td.namesrvs.FetchPublishMessageQueues(td.traceTopic)
if err != nil {
- rlog.Error("fetch publish message queues failed. err: %v", err)
+ rlog.Error("fetch publish message queues failed",
map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
return nil, ""
}
i := atomic.AddInt32(&td.rrindex, 1)
diff --git a/internal/utils/errors.go b/internal/utils/errors.go
index 45dd1c8..c689f04 100644
--- a/internal/utils/errors.go
+++ b/internal/utils/errors.go
@@ -18,8 +18,9 @@ limitations under the License.
package utils
import (
+ "errors"
+
"github.com/apache/rocketmq-client-go/rlog"
- "github.com/pkg/errors"
)
var (
@@ -32,6 +33,8 @@ var (
func CheckError(action string, err error) {
if err != nil {
- rlog.Errorf("%s error: %s", action, err.Error())
+ rlog.Error( action, map[string]interface{}{
+ rlog.LogKeyUnderlayError: err.Error(),
+ })
}
}
diff --git a/internal/validators.go b/internal/validators.go
index 8b6f122..7753942 100644
--- a/internal/validators.go
+++ b/internal/validators.go
@@ -34,7 +34,7 @@ var (
func ValidateGroup(group string) {
if group == "" {
- rlog.Fatal("consumerGroup is empty")
+ rlog.Fatal("consumerGroup is empty", nil)
}
//if !_Pattern.Match([]byte(group)) {
@@ -42,6 +42,6 @@ func ValidateGroup(group string) {
//}
if len(group) > _CharacterMaxLength {
- rlog.Fatal("the specified group is longer than group max length
255.")
+ rlog.Fatal("the specified group is longer than group max length
255.", nil)
}
}
diff --git a/primitive/ctx.go b/primitive/ctx.go
index 0937498..5761bd6 100644
--- a/primitive/ctx.go
+++ b/primitive/ctx.go
@@ -22,6 +22,7 @@ package primitive
import (
"context"
+ "fmt"
"math"
"github.com/apache/rocketmq-client-go/rlog"
@@ -46,7 +47,7 @@ func (c ConsumeReturnType) Ordinal() int {
case FailedReturn:
return 4
default:
- rlog.Error("illegal ConsumeReturnType: %v", c)
+ rlog.Error(fmt.Sprintf("illegal ConsumeReturnType: %v", c), nil)
return 0
}
}
diff --git a/producer/producer.go b/producer/producer.go
index 5590f76..3fb8041 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -310,7 +310,7 @@ func (p *defaultProducer) selectMessageQueue(msg
*primitive.Message) *primitive.
}
if result.MqList != nil && len(result.MqList) <= 0 {
- rlog.Error("can not find proper message queue")
+ rlog.Error("can not find proper message queue", nil)
return nil
}
@@ -394,9 +394,15 @@ func (tp *transactionProducer) checkTransactionState() {
req :=
remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil)
req.Remark = tp.errRemark(nil)
- tp.producer.client.InvokeOneWay(context.Background(),
callback.Addr.String(), req, tp.producer.options.SendMsgTimeout)
+ err :=
tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(),
req,
+ tp.producer.options.SendMsgTimeout)
+ rlog.Error("send ReqENDTransaction to broker error",
map[string]interface{}{
+ "callback": callback.Addr.String(),
+ "request": req.String(),
+ rlog.LogKeyUnderlayError: err,
+ })
default:
- rlog.Error("unknow type %v", ch)
+ rlog.Error(fmt.Sprintf("unknown type %v", ch), nil)
}
}
}
@@ -421,7 +427,10 @@ func (tp *transactionProducer)
SendMessageInTransaction(ctx context.Context, msg
}
localTransactionState =
tp.listener.ExecuteLocalTransaction(*msg)
if localTransactionState != primitive.CommitMessageState {
- rlog.Errorf("executeLocalTransactionBranch return %v
with msg: %v\n", localTransactionState, msg)
+ rlog.Error("executeLocalTransaction but state
unexpected", map[string]interface{}{
+ "localState": localTransactionState,
+ "message": msg,
+ })
}
case primitive.SendFlushDiskTimeout, primitive.SendFlushSlaveTimeout,
primitive.SendSlaveNotAvailable:
diff --git a/rlog/log.go b/rlog/log.go
index 28615e9..72a4a01 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -18,82 +18,98 @@
package rlog
import (
- "io"
-
"github.com/sirupsen/logrus"
)
+const (
+ LogKeyConsumerGroup = "consumerGroup"
+ LogKeyTopic = "topic"
+ LogKeyMessageQueue = "MessageQueue"
+ LogKeyUnderlayError = "underlayError"
+ LogKeyBroker = "broker"
+ LogKeyValueChangedFrom = "changedFrom"
+ LogKeyValueChangedTo = "changeTo"
+ LogKeyPullRequest = "PullRequest"
+)
+
type Logger interface {
- SetLevel(l logrus.Level)
- SetOutput(output io.Writer)
- Debug(i ...interface{})
- Debugf(format string, args ...interface{})
- Info(i ...interface{})
- Infof(format string, args ...interface{})
- Warn(i ...interface{})
- Warnf(format string, args ...interface{})
- Error(i ...interface{})
- Errorf(format string, args ...interface{})
- Fatal(i ...interface{})
- Fatalf(format string, args ...interface{})
+ Debug(msg string, fields map[string]interface{})
+ Info(msg string, fields map[string]interface{})
+ Warning(msg string, fields map[string]interface{})
+ Error(msg string, fields map[string]interface{})
+ Fatal(msg string, fields map[string]interface{})
}
-var rLog Logger
-
func init() {
- r := logrus.New()
- r.SetLevel(logrus.InfoLevel)
+ r := &defaultLogger{
+ logger: logrus.New(),
+ }
rLog = r
}
-func SetLogger(log Logger) {
- rLog = log
-}
-
-func SetLevel(l logrus.Level) {
- rLog.SetLevel(l)
-}
+var rLog *defaultLogger
-func SetOutput(output io.Writer) {
- rLog.SetOutput(output)
+type defaultLogger struct {
+ logger *logrus.Logger
}
-func Debug(i ...interface{}) {
- rLog.Debug(i...)
+func (l *defaultLogger) Debug(msg string, fields map[string]interface{}) {
+ if msg == "" && len(fields) == 0 {
+ return
+ }
+ rLog.logger.WithFields(fields).Debug(msg)
}
-func Debugf(format string, args ...interface{}) {
- rLog.Debugf(format, args...)
+func (l *defaultLogger) Info(msg string, fields map[string]interface{}) {
+ if msg == "" && len(fields) == 0 {
+ return
+ }
+ rLog.logger.WithFields(fields).Info(msg)
}
-func Info(i ...interface{}) {
- rLog.Info(i...)
+func (l *defaultLogger) Warning(msg string, fields map[string]interface{}) {
+ if msg == "" && len(fields) == 0 {
+ return
+ }
+ rLog.logger.WithFields(fields).Warning(msg)
}
-func Infof(format string, args ...interface{}) {
- rLog.Infof(format, args...)
+func (l *defaultLogger) Error(msg string, fields map[string]interface{}) {
+ if msg == "" && len(fields) == 0 {
+ return
+ }
+ rLog.logger.WithFields(fields).WithFields(fields).Error(msg)
}
-func Warn(i ...interface{}) {
- rLog.Warn(i...)
+func (l *defaultLogger) Fatal(msg string, fields map[string]interface{}) {
+ if msg == "" && len(fields) == 0 {
+ return
+ }
+ rLog.logger.WithFields(fields).Fatal(msg)
}
-func Warnf(format string, args ...interface{}) {
- rLog.Warnf(format, args...)
+func Debug(msg string, fields map[string]interface{}) {
+ rLog.Debug(msg, fields)
}
-func Error(i ...interface{}) {
- rLog.Error(i...)
+func Info(msg string, fields map[string]interface{}) {
+ if msg == "" && len(fields) == 0 {
+ return
+ }
+ rLog.Info(msg, fields)
}
-func Errorf(format string, args ...interface{}) {
- rLog.Errorf(format, args...)
+func Warning(msg string, fields map[string]interface{}) {
+ if msg == "" && len(fields) == 0 {
+ return
+ }
+ rLog.Warning(msg, fields)
}
-func Fatal(i ...interface{}) {
- rLog.Fatal(i...)
+func Error(msg string, fields map[string]interface{}) {
+ rLog.Error(msg, fields)
}
-func Fatalf(format string, args ...interface{}) {
- rLog.Fatalf(format, args...)
+func Fatal(msg string, fields map[string]interface{}) {
+ rLog.Fatal(msg, fields)
}