This is an automated email from the ASF dual-hosted git repository.
RongtongJin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new d4de689e [Go] Fix LitePushConsumer.WrapReceiveMessageRequest using
wrong AutoRenew value (#1291)
d4de689e is described below
commit d4de689eff6060db9b326d8a94ea43f04c03bdf9
Author: Quan <[email protected]>
AuthorDate: Wed Jul 1 13:51:37 2026 +0800
[Go] Fix LitePushConsumer.WrapReceiveMessageRequest using wrong AutoRenew
value (#1291)
* fix(go): remove redundant WrapReceiveMessageRequest override in
LitePushConsumer
- Remove duplicated WrapReceiveMessageRequest from defaultLitePushConsumer,
delegate to embedded *defaultPushConsumer which correctly sets
AutoRenew=true
and includes FilterExpression
- Drop unused imports: uuid, durationpb
- Fix test assertion: AutoRenew should be true for lite push consumer
* fix(go): remove redundant WrapReceiveMessageRequest tests for
LitePushConsumer
- Delete TestLitePushConsumer_WrapReceiveMessageRequest (delegates to
parent, already covered by
TestDefaultPushConsumer_WrapReceiveMessageRequest)
- Rename TestDefaultLitePushConsumer_Wraps to
TestDefaultLitePushConsumer_WrapHeartbeatRequest, keep only the
LitePushConsumer-specific ClientType assertion
---
golang/lite_push_consumer.go | 18 ---------------
golang/lite_push_consumer_test.go | 47 ---------------------------------------
golang/push_consumer_test.go | 23 +------------------
3 files changed, 1 insertion(+), 87 deletions(-)
diff --git a/golang/lite_push_consumer.go b/golang/lite_push_consumer.go
index 63ab3287..a350111c 100644
--- a/golang/lite_push_consumer.go
+++ b/golang/lite_push_consumer.go
@@ -24,8 +24,6 @@ import (
"github.com/apache/rocketmq-clients/golang/v5/pkg/ticker"
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
- "github.com/google/uuid"
- "google.golang.org/protobuf/types/known/durationpb"
)
type LitePushConsumer interface {
@@ -206,22 +204,6 @@ func (lpc *defaultLitePushConsumer)
syncLiteSubscription(context context.Context
var _ = PushConsumerExtension(&defaultLitePushConsumer{})
-func (lpc *defaultLitePushConsumer) WrapReceiveMessageRequest(batchSize int,
messageQueue *v2.MessageQueue, filterExpression *FilterExpression,
longPollingTimeout time.Duration) *v2.ReceiveMessageRequest {
- attemptId := uuid.New().String()
-
- return &v2.ReceiveMessageRequest{
- Group: &v2.Resource{
- Name: lpc.groupName,
- ResourceNamespace: lpc.cli.config.NameSpace,
- },
- MessageQueue: messageQueue,
- LongPollingTimeout: durationpb.New(longPollingTimeout),
- BatchSize: int32(batchSize),
- AutoRenew: false,
- AttemptId: &attemptId,
- InvisibleDuration:
durationpb.New(lpc.litePushConsumerSettings.invisibleDuration),
- }
-}
func (lpc *defaultLitePushConsumer) WrapHeartbeatRequest()
*v2.HeartbeatRequest {
return &v2.HeartbeatRequest{
diff --git a/golang/lite_push_consumer_test.go
b/golang/lite_push_consumer_test.go
index a9018c56..a51fc0f2 100644
--- a/golang/lite_push_consumer_test.go
+++ b/golang/lite_push_consumer_test.go
@@ -421,53 +421,6 @@ func
TestLitePushConsumer_syncLiteSubscription_StatusError(t *testing.T) {
}
}
-func TestLitePushConsumer_WrapReceiveMessageRequest(t *testing.T) {
- setupTest(t)
- defer teardownTest()
-
- dlpc, err := createTestLitePushConsumer(t)
- if err != nil {
- t.Fatalf("failed to create test lite push consumer: %v", err)
- }
-
- messageQueue := &v2.MessageQueue{
- Topic: &v2.Resource{
- Name: "bind-topic",
- ResourceNamespace: "test-namespace",
- },
- Id: 1,
- Broker: &v2.Broker{
- Name: "test-broker",
- Endpoints: fakeEndpoints(),
- },
- }
-
- req := dlpc.WrapReceiveMessageRequest(5, messageQueue, SUB_ALL,
time.Second*10)
-
- if req.GetGroup().GetName() != "test-group" {
- t.Errorf("expected group name 'test-group', got %s",
req.GetGroup().GetName())
- }
-
- if req.GetGroup().GetResourceNamespace() != "test-namespace" {
- t.Errorf("expected namespace 'test-namespace', got %s",
req.GetGroup().GetResourceNamespace())
- }
-
- if req.GetBatchSize() != 5 {
- t.Errorf("expected batch size 5, got %d", req.GetBatchSize())
- }
-
- if req.GetAutoRenew() {
- t.Error("expected auto renew to be true for lite push consumer")
- }
-
- if req.GetAttemptId() == "" {
- t.Error("expected attempt id to be set")
- }
-
- if req.GetLongPollingTimeout().GetSeconds() != int64(10) {
- t.Errorf("expected polling timeout 10s, got %v",
req.GetLongPollingTimeout().GetSeconds())
- }
-}
func TestLitePushConsumer_WrapHeartbeatRequest(t *testing.T) {
setupTest(t)
diff --git a/golang/push_consumer_test.go b/golang/push_consumer_test.go
index 85d43037..77c9039f 100644
--- a/golang/push_consumer_test.go
+++ b/golang/push_consumer_test.go
@@ -195,39 +195,18 @@ func TestDefaultPushConsumer_wrapAckMessageRequest_lite(t
*testing.T) {
}
}
-func TestDefaultLitePushConsumer_Wraps(t *testing.T) {
+func TestDefaultLitePushConsumer_WrapHeartbeatRequest(t *testing.T) {
config := &Config{Endpoint: fakeAddress, NameSpace: "test-namespace",
ConsumerGroup: "test-group"}
lpc, err := NewLitePushConsumer(config,
&LitePushConsumerConfig{bindTopic: "bind-topic"},
WithPushMessageListener(&FuncMessageListener{Consume: func(*MessageView)
ConsumerResult { return SUCCESS }}))
if err != nil {
t.Fatalf("failed to create lite push consumer: %v", err)
}
- // 类型断言为具体类型以调用 Wrap* 方法(接口 LitePushConsumer 上未暴露这些方法)
dlpc, ok := lpc.(*defaultLitePushConsumer)
if !ok {
t.Fatalf("failed to assert lite push consumer concrete type")
}
- messageQueue := &v2.MessageQueue{
- Topic: &v2.Resource{
- Name: "bind-topic",
- ResourceNamespace: "test-namespace",
- },
- Id: 1,
- Broker: &v2.Broker{
- Name: "test-broker",
- Endpoints: fakeEndpoints(),
- },
- }
-
- req := dlpc.WrapReceiveMessageRequest(5, messageQueue, SUB_ALL,
time.Second*10)
- if req.GetGroup().GetName() != "test-group" {
- t.Errorf("expected group name 'test-group', got %s",
req.GetGroup().GetName())
- }
- if req.GetAutoRenew() {
- t.Error("expected auto renew to be true")
- }
-
hb := dlpc.WrapHeartbeatRequest()
if hb.GetClientType() != v2.ClientType_LITE_PUSH_CONSUMER {
t.Errorf("expected client type LITE_PUSH_CONSUMER, got %v",
hb.GetClientType())