This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 601aee5b refactor(lite_push_consumer): use `sync.Map` to replace
regular map and fix related method calls (#1171)
601aee5b is described below
commit 601aee5be5318dd903bf8c69ae6ababb88cad183
Author: Quan <[email protected]>
AuthorDate: Mon Jan 12 14:24:25 2026 +0800
refactor(lite_push_consumer): use `sync.Map` to replace regular map and fix
related method calls (#1171)
test(lite_push_consumer_test): update test cases to match new `sync.Map`
implementation
docs(README.md): update development guide and build steps
Change-Id: I08e1483141fd4f4e21629be86160f9e42de0ffcb
Co-authored-by: 靖泉 <[email protected]>
---
golang/README.md | 27 ++++++++
golang/README_dev.md | 4 --
golang/lite_push_consumer.go | 25 ++++---
golang/lite_push_consumer_options.go | 13 ++--
golang/lite_push_consumer_test.go | 128 +++++++++--------------------------
golang/push_consumer_test.go | 17 +++++
6 files changed, 96 insertions(+), 118 deletions(-)
diff --git a/golang/README.md b/golang/README.md
index 347b8713..41df54a6 100644
--- a/golang/README.md
+++ b/golang/README.md
@@ -24,5 +24,32 @@ Otherwise, to install the `golang` package, run the
following command:
go get -u github.com/apache/rocketmq-clients/golang/v5
```
+## Development
+
+### Protocol Buffers Generation
+
+If you need to regenerate the Protocol Buffers code after modifying the
`.proto` files, run the following commands:
+
+```bash
+protoc --go-grpc_out=. apache/rocketmq/v2/*.proto
+protoc --go_out=. apache/rocketmq/v2/*.proto
+```
+
+### Building
+
+To build the entire project:
+
+```bash
+go build ./...
+```
+
+### Running Tests
+
+To run all tests:
+
+```bash
+go test ./...
+```
+
[codecov-golang-image]:
https://img.shields.io/codecov/c/gh/apache/rocketmq-clients/master?flag=golang&label=Golang%20Coverage&logo=codecov
[codecov-url]: https://app.codecov.io/gh/apache/rocketmq-clients
diff --git a/golang/README_dev.md b/golang/README_dev.md
deleted file mode 100644
index 1c21a789..00000000
--- a/golang/README_dev.md
+++ /dev/null
@@ -1,4 +0,0 @@
-
-# golang pb generate:
-protoc --go-grpc_out=. apache/rocketmq/v2/*.proto
-protoc --go_out=. apache/rocketmq/v2/*.proto
\ No newline at end of file
diff --git a/golang/lite_push_consumer.go b/golang/lite_push_consumer.go
index 2789ba93..3fa620fe 100644
--- a/golang/lite_push_consumer.go
+++ b/golang/lite_push_consumer.go
@@ -54,15 +54,15 @@ func NewLitePushConsumerConfig(bindTopic string,
invisibleDuration time.Duration
}
var NewLitePushConsumer = func(config *Config, liteConfig
*LitePushConsumerConfig, opts ...PushConsumerOption) (LitePushConsumer, error) {
+ if liteConfig == nil {
+ return nil, errors.New("LitePushConsumerConfig is required")
+ }
if liteConfig.bindTopic == "" {
return nil, errors.New("LitePushConsumerConfig.bindTopic is
required")
}
filterExpressionMap := map[string]*FilterExpression{
liteConfig.bindTopic: SUB_ALL,
}
- if liteConfig == nil {
- return nil, errors.New("LitePushConsumerConfig is required")
- }
opts = append(opts,
WithPushSubscriptionExpressions(filterExpressionMap))
if pushConsumer, err := newPushConsumer(config, opts...); err != nil {
return nil, err
@@ -104,7 +104,7 @@ func (lpc *defaultLitePushConsumer)
notifyUnsubscribeLite(command *v2.NotifyUnsu
if liteTopic == "" {
return
}
- delete(lpc.litePushConsumerSettings.liteTopicSet, liteTopic)
+ lpc.litePushConsumerSettings.liteTopicSet.Delete(liteTopic)
}
func (lpc *defaultLitePushConsumer) SubscribeLite(topic string) error {
@@ -115,7 +115,7 @@ func (lpc *defaultLitePushConsumer) SubscribeLite(topic
string) error {
sugarBaseLogger.Errorf("LitePushConsumer SubscribeLite topic:%s
err:%v", topic, err)
return err
}
- lpc.litePushConsumerSettings.liteTopicSet[topic] = struct{}{}
+ lpc.litePushConsumerSettings.liteTopicSet.Store(topic, struct{}{})
return nil
}
@@ -127,7 +127,7 @@ func (lpc *defaultLitePushConsumer) UnSubscribeLite(topic
string) error {
sugarBaseLogger.Errorf("LitePushConsumer UnSubscribeLite
topic:%s err:%v", topic, err)
return err
}
- delete(lpc.litePushConsumerSettings.liteTopicSet, topic)
+ lpc.litePushConsumerSettings.liteTopicSet.Delete(topic)
return nil
}
@@ -140,10 +140,13 @@ func (lpc *defaultLitePushConsumer) checkRunning() error {
}
func (lpc *defaultLitePushConsumer) syncAllLiteSubscription() {
- var liteTopicSet = make([]string, 0,
len(lpc.litePushConsumerSettings.liteTopicSet))
- for k := range lpc.litePushConsumerSettings.liteTopicSet {
- liteTopicSet = append(liteTopicSet, k)
- }
+ liteTopicSet := make([]string, 0,
lpc.litePushConsumerSettings.maxLiteTopicSize)
+ lpc.litePushConsumerSettings.liteTopicSet.Range(func(key, value
interface{}) bool {
+ if liteTopic, ok := key.(string); ok {
+ liteTopicSet = append(liteTopicSet, liteTopic)
+ }
+ return true
+ })
if len(liteTopicSet) == 0 {
return
}
@@ -157,7 +160,7 @@ func (lpc *defaultLitePushConsumer)
syncLiteSubscription(context context.Context
request := v2.SyncLiteSubscriptionRequest{
Action: action,
Topic: &v2.Resource{
- Name:
lpc.litePushConsumerSettings.bingTopic,
+ Name:
lpc.litePushConsumerSettings.bindTopic,
ResourceNamespace: lpc.cli.config.NameSpace,
},
Group: lpc.litePushConsumerSettings.groupName,
diff --git a/golang/lite_push_consumer_options.go
b/golang/lite_push_consumer_options.go
index 0daeb589..e01ae90f 100644
--- a/golang/lite_push_consumer_options.go
+++ b/golang/lite_push_consumer_options.go
@@ -19,6 +19,7 @@ package golang
import (
"fmt"
+ "sync"
"time"
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
@@ -29,9 +30,9 @@ var _ = ClientSettings(&litePushConsumerSettings{})
type litePushConsumerSettings struct {
*pushConsumerSettings
- bingTopic string
- liteTopicSet map[string]struct{}
- liteSubscriptionQuota int32 // default 1200
+ bindTopic string
+ liteTopicSet *sync.Map
+ liteSubscriptionQuota int32
maxLiteTopicSize int32
invisibleDuration time.Duration
}
@@ -39,11 +40,11 @@ type litePushConsumerSettings struct {
func newLitePushConsumerSettings(settings *pushConsumerSettings, bindTopic
string, invisibleDuration time.Duration) *litePushConsumerSettings {
return &litePushConsumerSettings{
pushConsumerSettings: settings,
- bingTopic: bindTopic,
- liteTopicSet: map[string]struct{}{},
+ bindTopic: bindTopic,
+ liteTopicSet: &sync.Map{},
invisibleDuration: invisibleDuration,
// default value
- liteSubscriptionQuota: 1200,
+ liteSubscriptionQuota: 2000,
maxLiteTopicSize: 64,
}
}
diff --git a/golang/lite_push_consumer_test.go
b/golang/lite_push_consumer_test.go
index f952de09..953159f0 100644
--- a/golang/lite_push_consumer_test.go
+++ b/golang/lite_push_consumer_test.go
@@ -29,31 +29,26 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
)
-// 全局变量,用于在测试过程中设置和初始化
var (
mockCtrl *gomock.Controller
mockRpcClient *MockRpcClient
)
-// 设置测试环境,创建并初始化 mock 对象
func setupTest(t *testing.T) {
mockCtrl = gomock.NewController(t)
mockRpcClient = NewMockRpcClient(mockCtrl)
}
-// 清理测试环境
func teardownTest() {
mockCtrl.Finish()
}
-// 用于测试的辅助函数,设置标准的成功响应
func setupSuccessResponse() *v2.SyncLiteSubscriptionResponse {
return &v2.SyncLiteSubscriptionResponse{
Status: &v2.Status{Code: v2.Code_OK},
}
}
-// 用于测试的辅助函数,设置错误响应
func setupErrorResponse(code v2.Code, message string)
*v2.SyncLiteSubscriptionResponse {
return &v2.SyncLiteSubscriptionResponse{
Status: &v2.Status{
@@ -64,13 +59,11 @@ func setupErrorResponse(code v2.Code, message string)
*v2.SyncLiteSubscriptionRe
}
func TestNewLitePushConsumer(t *testing.T) {
- // 设置测试环境
setupTest(t)
defer teardownTest()
config := &Config{Endpoint: fakeAddress, NameSpace: "test-namespace",
ConsumerGroup: "test-group"}
- // 测试成功创建 LitePushConsumer
liteConfig := &LitePushConsumerConfig{bindTopic: "bind-topic"}
lpc, err := NewLitePushConsumer(config, liteConfig,
WithPushMessageListener(&FuncMessageListener{
Consume: func(*MessageView) ConsumerResult { return SUCCESS },
@@ -80,29 +73,25 @@ func TestNewLitePushConsumer(t *testing.T) {
}
dlpc := lpc.(*defaultLitePushConsumer)
- if dlpc.litePushConsumerSettings.bingTopic != "bind-topic" {
- t.Errorf("expected bind topic 'bind-topic', got %s",
dlpc.litePushConsumerSettings.bingTopic)
+ if dlpc.litePushConsumerSettings.bindTopic != "bind-topic" {
+ t.Errorf("expected bind topic 'bind-topic', got %s",
dlpc.litePushConsumerSettings.bindTopic)
}
- // 验证 client type 是否正确设置为 LITE_PUSH_CONSUMER
if int32(dlpc.pcSettings.clientType) !=
int32(v2.ClientType_LITE_PUSH_CONSUMER) {
t.Errorf("expected client type LITE_PUSH_CONSUMER, got %v",
dlpc.pcSettings.clientType)
}
- // 验证 isFifo 被强制设置为 true
if !dlpc.pcSettings.isFifo {
t.Error("expected isFifo to be true for lite push consumer")
}
}
func TestNewLitePushConsumer_EmptyBindTopic(t *testing.T) {
- // 设置测试环境
setupTest(t)
defer teardownTest()
config := &Config{Endpoint: fakeAddress, NameSpace: "test-namespace",
ConsumerGroup: "test-group"}
- // 测试空 BindTopic 的错误情况
liteConfig := &LitePushConsumerConfig{bindTopic: ""}
_, err := NewLitePushConsumer(config, liteConfig)
if err == nil {
@@ -115,7 +104,6 @@ func TestNewLitePushConsumer_EmptyBindTopic(t *testing.T) {
}
}
-// 辅助方法: 创建测试用的 LitePushConsumer 实例
func createTestLitePushConsumer(t *testing.T) (*defaultLitePushConsumer,
error) {
config := &Config{Endpoint: fakeAddress, NameSpace: "test-namespace",
ConsumerGroup: "test-group"}
liteConfig := &LitePushConsumerConfig{bindTopic: "bind-topic"}
@@ -129,18 +117,14 @@ func createTestLitePushConsumer(t *testing.T)
(*defaultLitePushConsumer, error)
dlpc := lpc.(*defaultLitePushConsumer)
- // 模拟客户端已经启动
dlpc.cli.on.Store(true)
- // 创建完全自定义的 mockedClientManager
mockedClientManager := &mockedClientManager{
mockRpcClient: mockRpcClient,
}
- // 只需要替换一个 clientManager,因为 dlpc.cli 和 dlpc.defaultPushConsumer.cli
是同一个实例
dlpc.cli.clientManager = mockedClientManager
- // 验证两个 cli 是否是同一个实例
if dlpc.cli != dlpc.defaultPushConsumer.cli {
t.Errorf("Expected dlpc.cli and dlpc.defaultPushConsumer.cli to
be the same instance")
}
@@ -148,12 +132,10 @@ func createTestLitePushConsumer(t *testing.T)
(*defaultLitePushConsumer, error)
return dlpc, nil
}
-// mockedClientManager 完全实现 ClientManager 接口
type mockedClientManager struct {
mockRpcClient *MockRpcClient
}
-// 实现 ClientManager 接口的所有方法
func (m *mockedClientManager) RegisterClient(client Client) {}
func (m *mockedClientManager) UnRegisterClient(client Client) {}
func (m *mockedClientManager) QueryRoute(ctx context.Context, endpoints
*v2.Endpoints, request *v2.QueryRouteRequest, duration time.Duration)
(*v2.QueryRouteResponse, error) {
@@ -190,15 +172,12 @@ func (m *mockedClientManager)
ForwardMessageToDeadLetterQueue(ctx context.Contex
return nil, nil
}
-// SyncLiteSubscription 是关键方法,直接使用 mockRpcClient
func (m *mockedClientManager) SyncLiteSubscription(ctx context.Context,
endpoints *v2.Endpoints, request *v2.SyncLiteSubscriptionRequest, duration
time.Duration) (*v2.SyncLiteSubscriptionResponse, error) {
- // 添加调试日志
fmt.Printf("DEBUG: mockedClientManager.SyncLiteSubscription called with
request: %+v\n", request)
return m.mockRpcClient.SyncLiteSubscription(ctx, request)
}
func TestLitePushConsumer_SubscribeLite(t *testing.T) {
- // 设置测试环境
setupTest(t)
defer teardownTest()
@@ -207,12 +186,10 @@ func TestLitePushConsumer_SubscribeLite(t *testing.T) {
t.Fatalf("failed to create test lite push consumer: %v", err)
}
- // 验证 mock 对象是否正确注入
if dlpc.defaultPushConsumer.cli.clientManager == nil {
t.Fatal("clientManager should not be nil")
}
- // Mock SyncLiteSubscription 成功响应 - 简化版本
mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(),
gomock.Any()).Return(setupSuccessResponse(), nil).Times(1)
err = dlpc.SubscribeLite("lite-topic-1")
@@ -220,14 +197,12 @@ func TestLitePushConsumer_SubscribeLite(t *testing.T) {
t.Fatalf("expected no error for SubscribeLite, got %v", err)
}
- // 验证 lite topic 被添加到 set 中
- if _, exists :=
dlpc.litePushConsumerSettings.liteTopicSet["lite-topic-1"]; !exists {
+ if _, exists :=
dlpc.litePushConsumerSettings.liteTopicSet.Load("lite-topic-1"); !exists {
t.Error("expected lite topic to be added to set")
}
}
func TestLitePushConsumer_SubscribeLite_NotRunning(t *testing.T) {
- // 设置测试环境
setupTest(t)
defer teardownTest()
@@ -236,7 +211,6 @@ func TestLitePushConsumer_SubscribeLite_NotRunning(t
*testing.T) {
t.Fatalf("failed to create test lite push consumer: %v", err)
}
- // 将客户端状态设置为未运行
dlpc.cli.on.Store(false)
err = dlpc.SubscribeLite("lite-topic-1")
@@ -251,7 +225,6 @@ func TestLitePushConsumer_SubscribeLite_NotRunning(t
*testing.T) {
}
func TestLitePushConsumer_SubscribeLite_RpcError(t *testing.T) {
- // 设置测试环境
setupTest(t)
defer teardownTest()
@@ -260,24 +233,19 @@ func TestLitePushConsumer_SubscribeLite_RpcError(t
*testing.T) {
t.Fatalf("failed to create test lite push consumer: %v", err)
}
- // Mock RPC 错误
- mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(),
gomock.Any()).Return(
- nil, errors.New("rpc error"),
- )
+ mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(),
gomock.Any()).Return(nil, errors.New("rpc error"))
err = dlpc.SubscribeLite("lite-topic-1")
if err == nil {
t.Fatal("expected rpc error")
}
- // 验证 lite topic 没有被添加到 set 中
- if _, exists :=
dlpc.litePushConsumerSettings.liteTopicSet["lite-topic-1"]; exists {
+ if _, exists :=
dlpc.litePushConsumerSettings.liteTopicSet.Load("lite-topic-1"); exists {
t.Error("lite topic should not be added when rpc fails")
}
}
func TestLitePushConsumer_UnSubscribeLite(t *testing.T) {
- // 设置测试环境
setupTest(t)
defer teardownTest()
@@ -286,35 +254,29 @@ func TestLitePushConsumer_UnSubscribeLite(t *testing.T) {
t.Fatalf("failed to create test lite push consumer: %v", err)
}
- // 预先添加一个 lite topic
- dlpc.litePushConsumerSettings.liteTopicSet["lite-topic-1"] = struct{}{}
+ dlpc.litePushConsumerSettings.liteTopicSet.Store("lite-topic-1",
struct{}{})
- // Mock SyncLiteSubscription 成功响应
- mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(),
gomock.Any()).DoAndReturn(
- func(ctx context.Context, req *v2.SyncLiteSubscriptionRequest)
(*v2.SyncLiteSubscriptionResponse, error) {
- if req.Action !=
v2.LiteSubscriptionAction_PARTIAL_REMOVE {
- t.Errorf("expected action INCREMENTAL_REMOVE,
got %v", req.Action)
- }
- if len(req.LiteTopicSet) != 1 || req.LiteTopicSet[0] !=
"lite-topic-1" {
- t.Errorf("expected lite topic set
['lite-topic-1'], got %v", req.LiteTopicSet)
- }
- return setupSuccessResponse(), nil
- },
- )
+ mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(),
gomock.Any()).DoAndReturn(func(ctx context.Context, req
*v2.SyncLiteSubscriptionRequest) (*v2.SyncLiteSubscriptionResponse, error) {
+ if req.Action != v2.LiteSubscriptionAction_PARTIAL_REMOVE {
+ t.Errorf("expected action INCREMENTAL_REMOVE, got %v",
req.Action)
+ }
+ if len(req.LiteTopicSet) != 1 || req.LiteTopicSet[0] !=
"lite-topic-1" {
+ t.Errorf("expected lite topic set ['lite-topic-1'], got
%v", req.LiteTopicSet)
+ }
+ return setupSuccessResponse(), nil
+ })
err = dlpc.UnSubscribeLite("lite-topic-1")
if err != nil {
t.Fatalf("expected no error for UnSubscribeLite, got %v", err)
}
- // 验证 lite topic 被从 set 中删除
- if _, exists :=
dlpc.litePushConsumerSettings.liteTopicSet["lite-topic-1"]; exists {
+ if _, exists :=
dlpc.litePushConsumerSettings.liteTopicSet.Load("lite-topic-1"); exists {
t.Error("expected lite topic to be removed from set")
}
}
func TestLitePushConsumer_notifyUnsubscribeLite(t *testing.T) {
- // 设置测试环境
setupTest(t)
defer teardownTest()
@@ -323,8 +285,7 @@ func TestLitePushConsumer_notifyUnsubscribeLite(t
*testing.T) {
t.Fatalf("failed to create test lite push consumer: %v", err)
}
- // 预先添加一个 lite topic
- dlpc.litePushConsumerSettings.liteTopicSet["lite-topic-notify"] =
struct{}{}
+ dlpc.litePushConsumerSettings.liteTopicSet.Store("lite-topic-notify",
struct{}{})
cmd := &v2.NotifyUnsubscribeLiteCommand{
LiteTopic: "lite-topic-notify",
@@ -332,14 +293,12 @@ func TestLitePushConsumer_notifyUnsubscribeLite(t
*testing.T) {
dlpc.notifyUnsubscribeLite(cmd)
- // 验证 lite topic 被从 set 中删除
- if _, exists :=
dlpc.litePushConsumerSettings.liteTopicSet["lite-topic-notify"]; exists {
+ if _, exists :=
dlpc.litePushConsumerSettings.liteTopicSet.Load("lite-topic-notify"); exists {
t.Error("expected lite topic to be removed from set after
notify")
}
}
func TestLitePushConsumer_notifyUnsubscribeLite_EmptyLiteTopic(t *testing.T) {
- // 设置测试环境
setupTest(t)
defer teardownTest()
@@ -348,23 +307,20 @@ func
TestLitePushConsumer_notifyUnsubscribeLite_EmptyLiteTopic(t *testing.T) {
t.Fatalf("failed to create test lite push consumer: %v", err)
}
- // 预先添加一个 lite topic
- dlpc.litePushConsumerSettings.liteTopicSet["lite-topic-keep"] =
struct{}{}
+ dlpc.litePushConsumerSettings.liteTopicSet.Store("lite-topic-keep",
struct{}{})
cmd := &v2.NotifyUnsubscribeLiteCommand{
- LiteTopic: "", // 空的 lite topic
+ LiteTopic: "",
}
dlpc.notifyUnsubscribeLite(cmd)
- // 验证 lite topic 仍然存在(因为 LiteTopic 为空,函数会提前返回)
- if _, exists :=
dlpc.litePushConsumerSettings.liteTopicSet["lite-topic-keep"]; !exists {
+ if _, exists :=
dlpc.litePushConsumerSettings.liteTopicSet.Load("lite-topic-keep"); !exists {
t.Error("lite topic should not be removed when command has
empty lite topic")
}
}
func TestLitePushConsumer_syncLiteSubscription_StatusError(t *testing.T) {
- // 设置测试环境
setupTest(t)
defer teardownTest()
@@ -373,10 +329,7 @@ func
TestLitePushConsumer_syncLiteSubscription_StatusError(t *testing.T) {
t.Fatalf("failed to create test lite push consumer: %v", err)
}
- // Mock SyncLiteSubscription 返回错误状态码
- mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(),
gomock.Any()).Return(
- setupErrorResponse(v2.Code_INTERNAL_SERVER_ERROR, "internal
error"), nil,
- )
+ mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(),
gomock.Any()).Return(setupErrorResponse(v2.Code_INTERNAL_SERVER_ERROR,
"internal error"), nil)
err = dlpc.syncLiteSubscription(context.TODO(),
v2.LiteSubscriptionAction_PARTIAL_ADD, []string{"test"})
if err == nil {
@@ -398,7 +351,6 @@ func
TestLitePushConsumer_syncLiteSubscription_StatusError(t *testing.T) {
}
func TestLitePushConsumer_WrapReceiveMessageRequest(t *testing.T) {
- // 设置测试环境
setupTest(t)
defer teardownTest()
@@ -441,14 +393,12 @@ func TestLitePushConsumer_WrapReceiveMessageRequest(t
*testing.T) {
t.Error("expected attempt id to be set")
}
- // 修正 WrapReceiveMessageRequest 的 duration 断言
if req.GetLongPollingTimeout().GetSeconds() != int64(10) {
t.Errorf("expected polling timeout 10s, got %v",
req.GetLongPollingTimeout().GetSeconds())
}
}
func TestLitePushConsumer_WrapHeartbeatRequest(t *testing.T) {
- // 设置测试环境
setupTest(t)
defer teardownTest()
@@ -467,14 +417,12 @@ func TestLitePushConsumer_WrapHeartbeatRequest(t
*testing.T) {
t.Errorf("expected namespace 'test-namespace', got %s",
req.GetGroup().GetResourceNamespace())
}
- // 修正 WrapHeartbeatRequest 的 clientType 断言
if int32(req.GetClientType()) !=
int32(v2.ClientType_LITE_PUSH_CONSUMER) {
t.Errorf("expected client type LITE_PUSH_CONSUMER, got %v",
req.GetClientType())
}
}
func TestLitePushConsumerSettings_applySettingsCommand(t *testing.T) {
- // 设置测试环境
setupTest(t)
defer teardownTest()
@@ -485,7 +433,6 @@ func TestLitePushConsumerSettings_applySettingsCommand(t
*testing.T) {
settings := dlpc.litePushConsumerSettings
- // 创建测试 Settings
liteQuota := int32(100)
maxSize := int32(1024)
fifoVal := false
@@ -495,7 +442,7 @@ func TestLitePushConsumerSettings_applySettingsCommand(t
*testing.T) {
Subscription: &v2.Subscription{
LiteSubscriptionQuota: &liteQuota,
MaxLiteTopicSize: &maxSize,
- Fifo: &fifoVal, // 这会被强制设置为
true
+ Fifo: &fifoVal,
},
},
BackoffPolicy: &v2.RetryPolicy{
@@ -515,24 +462,20 @@ func TestLitePushConsumerSettings_applySettingsCommand(t
*testing.T) {
t.Fatalf("applySettingsCommand failed: %v", err)
}
- // 验证 lite subscription quota 被设置
if settings.liteSubscriptionQuota != liteQuota {
t.Errorf("expected lite subscription quota %d, got %d",
liteQuota, settings.liteSubscriptionQuota)
}
- // 验证 max lite topic size 被设置
if settings.maxLiteTopicSize != maxSize {
t.Errorf("expected max lite topic size %d, got %d", maxSize,
settings.maxLiteTopicSize)
}
- // 验证 isFifo 被强制设置为 true
if !settings.isFifo {
t.Error("expected isFifo to be forced to true")
}
}
func TestLitePushConsumerSettings_toProtobuf(t *testing.T) {
- // 设置测试环境
setupTest(t)
defer teardownTest()
@@ -543,18 +486,15 @@ func TestLitePushConsumerSettings_toProtobuf(t
*testing.T) {
settings := dlpc.litePushConsumerSettings
- // 设置一些测试值
settings.liteSubscriptionQuota = 50
settings.maxLiteTopicSize = 512
protobuf := settings.toProtobuf()
- // 验证 ClientType
if int32(protobuf.GetClientType()) !=
int32(v2.ClientType_LITE_PUSH_CONSUMER) {
t.Errorf("expected client type LITE_PUSH_CONSUMER, got %v",
protobuf.GetClientType())
}
- // 验证 PubSub 设置
pubsub := protobuf.GetPubSub()
if pubsub == nil {
t.Fatal("expected PubSub to be set")
@@ -565,27 +505,21 @@ func TestLitePushConsumerSettings_toProtobuf(t
*testing.T) {
t.Fatal("expected subscription to be set")
}
- // 验证 LiteSubscriptionQuota
if subscription.Subscription.GetLiteSubscriptionQuota() != 50 {
t.Errorf("expected lite subscription quota 50, got %d",
subscription.Subscription.GetLiteSubscriptionQuota())
}
- // 验证 MaxLiteTopicSize
if subscription.Subscription.GetMaxLiteTopicSize() != 512 {
t.Errorf("expected max lite topic size 512, got %d",
subscription.Subscription.GetMaxLiteTopicSize())
}
- // 验证订阅信息
- if len(subscription.Subscription.GetSubscriptions()) == 0 {
- t.Error("expected at least one subscription entry")
- } else {
- entry := subscription.Subscription.GetSubscriptions()[0]
- // 修正 subscription entry 断言
- if entry.GetTopic().GetName() != "bind-topic" {
- t.Errorf("expected topic name 'bind-topic', got %s",
entry.GetTopic().GetName())
- }
- if entry.GetTopic().GetResourceNamespace() != "test-namespace" {
- t.Errorf("expected namespace 'test-namespace', got %s",
entry.GetTopic().GetResourceNamespace())
- }
+ entry := subscription.Subscription.GetSubscriptions()[0]
+
+ if entry.GetTopic().GetName() != "bind-topic" {
+ t.Errorf("expected topic name 'bind-topic', got %s",
entry.GetTopic().GetName())
+ }
+
+ if entry.GetTopic().GetResourceNamespace() != "test-namespace" {
+ t.Errorf("expected namespace 'test-namespace', got %s",
entry.GetTopic().GetResourceNamespace())
}
}
diff --git a/golang/push_consumer_test.go b/golang/push_consumer_test.go
index ae40917e..45b0ddbd 100644
--- a/golang/push_consumer_test.go
+++ b/golang/push_consumer_test.go
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package golang
import (