This is an automated email from the ASF dual-hosted git repository.
RongtongJin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new f038a534 [Go] Support offset option for lite topic subscriptions
(#1285)
f038a534 is described below
commit f038a53433fa5f96fe2b3a9c584ab76482aa9dba
Author: Yim <[email protected]>
AuthorDate: Wed Jun 24 14:50:45 2026 +0800
[Go] Support offset option for lite topic subscriptions (#1285)
Add OffsetOption helpers and pass optional offset options through
SyncLiteSubscriptionRequest when subscribing to lite topics.
---
golang/lite_push_consumer.go | 22 ++++++--
golang/lite_push_consumer_mock.go | 13 +++--
golang/lite_push_consumer_test.go | 69 ++++++++++++++++++++++-
golang/offset_option.go | 103 ++++++++++++++++++++++++++++++++++
golang/offset_option_test.go | 113 ++++++++++++++++++++++++++++++++++++++
5 files changed, 309 insertions(+), 11 deletions(-)
diff --git a/golang/lite_push_consumer.go b/golang/lite_push_consumer.go
index c16229ca..63ab3287 100644
--- a/golang/lite_push_consumer.go
+++ b/golang/lite_push_consumer.go
@@ -30,7 +30,7 @@ import (
type LitePushConsumer interface {
PushConsumer
- SubscribeLite(liteTopic string) error
+ SubscribeLite(liteTopic string, offsetOption ...OffsetOption) error
UnSubscribeLite(liteTopic string) error
}
@@ -107,11 +107,18 @@ func (lpc *defaultLitePushConsumer)
notifyUnsubscribeLite(command *v2.NotifyUnsu
lpc.litePushConsumerSettings.liteTopicSet.Delete(liteTopic)
}
-func (lpc *defaultLitePushConsumer) SubscribeLite(liteTopic string) error {
+func (lpc *defaultLitePushConsumer) SubscribeLite(liteTopic string,
offsetOption ...OffsetOption) error {
if err := lpc.checkRunning(); err != nil {
return err
}
- if err := lpc.syncLiteSubscription(context.TODO(),
v2.LiteSubscriptionAction_PARTIAL_ADD, []string{liteTopic}); err != nil {
+ if len(offsetOption) > 1 {
+ return errors.New("only one offset option is supported")
+ }
+ var option *OffsetOption
+ if len(offsetOption) == 1 {
+ option = &offsetOption[0]
+ }
+ if err := lpc.syncLiteSubscription(context.TODO(),
v2.LiteSubscriptionAction_PARTIAL_ADD, []string{liteTopic}, option); err != nil
{
sugarBaseLogger.Errorf("LitePushConsumer SubscribeLite
liteTopic:%s err:%v", liteTopic, err)
return err
}
@@ -123,7 +130,7 @@ func (lpc *defaultLitePushConsumer)
UnSubscribeLite(liteTopic string) error {
if err := lpc.checkRunning(); err != nil {
return err
}
- if err := lpc.syncLiteSubscription(context.TODO(),
v2.LiteSubscriptionAction_PARTIAL_REMOVE, []string{liteTopic}); err != nil {
+ if err := lpc.syncLiteSubscription(context.TODO(),
v2.LiteSubscriptionAction_PARTIAL_REMOVE, []string{liteTopic}, nil); err != nil
{
sugarBaseLogger.Errorf("LitePushConsumer UnSubscribeLite
liteTopic:%s err:%v", liteTopic, err)
return err
}
@@ -151,12 +158,12 @@ func (lpc *defaultLitePushConsumer)
syncAllLiteSubscription() {
//if len(liteTopicSet) == 0 {
// return
//}
- if err := lpc.syncLiteSubscription(context.TODO(),
v2.LiteSubscriptionAction_COMPLETE_ADD, liteTopicSet); err != nil {
+ if err := lpc.syncLiteSubscription(context.TODO(),
v2.LiteSubscriptionAction_COMPLETE_ADD, liteTopicSet, nil); err != nil {
sugarBaseLogger.Errorf("LitePushConsumer
syncAllLiteSubscription:%v, err:%v", liteTopicSet, err)
}
}
-func (lpc *defaultLitePushConsumer) syncLiteSubscription(context
context.Context, action v2.LiteSubscriptionAction, diff []string) error {
+func (lpc *defaultLitePushConsumer) syncLiteSubscription(context
context.Context, action v2.LiteSubscriptionAction, diff []string, offsetOption
*OffsetOption) error {
topic := lpc.litePushConsumerSettings.bindTopic
group := lpc.litePushConsumerSettings.groupName
clientId := lpc.litePushConsumerSettings.clientId
@@ -171,6 +178,9 @@ func (lpc *defaultLitePushConsumer)
syncLiteSubscription(context context.Context
Group: group,
LiteTopicSet: diff,
}
+ if offsetOption != nil {
+ request.OffsetOption = offsetOption.toProtobuf()
+ }
if action == v2.LiteSubscriptionAction_COMPLETE_ADD {
sugarBaseLogger.Infof("syncLiteSubscription action:%s,
topic:%s, group:%s, clientId:%s, liteTopicCount:%d",
diff --git a/golang/lite_push_consumer_mock.go
b/golang/lite_push_consumer_mock.go
index ae15047a..bf75ecd8 100644
--- a/golang/lite_push_consumer_mock.go
+++ b/golang/lite_push_consumer_mock.go
@@ -34,17 +34,22 @@ func (m *MockLitePushConsumer) EXPECT()
*MockLitePushConsumerMockRecorder {
}
// SubscribeLite mocks base method.
-func (m *MockLitePushConsumer) SubscribeLite(liteTopic string) error {
+func (m *MockLitePushConsumer) SubscribeLite(liteTopic string, offsetOption
...OffsetOption) error {
m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "SubscribeLite", liteTopic)
+ varargs := []interface{}{liteTopic}
+ for _, a := range offsetOption {
+ varargs = append(varargs, a)
+ }
+ ret := m.ctrl.Call(m, "SubscribeLite", varargs...)
ret0, _ := ret[0].(error)
return ret0
}
// SubscribeLite indicates an expected call of SubscribeLite.
-func (mr *MockLitePushConsumerMockRecorder) SubscribeLite(liteTopic
interface{}) *gomock.Call {
+func (mr *MockLitePushConsumerMockRecorder) SubscribeLite(liteTopic
interface{}, offsetOption ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeLite",
reflect.TypeOf((*MockLitePushConsumer)(nil).SubscribeLite), liteTopic)
+ varargs := append([]interface{}{liteTopic}, offsetOption...)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeLite",
reflect.TypeOf((*MockLitePushConsumer)(nil).SubscribeLite), varargs...)
}
// UnSubscribeLite mocks base method.
diff --git a/golang/lite_push_consumer_test.go
b/golang/lite_push_consumer_test.go
index a084794d..a9018c56 100644
--- a/golang/lite_push_consumer_test.go
+++ b/golang/lite_push_consumer_test.go
@@ -206,6 +206,73 @@ func TestLitePushConsumer_SubscribeLite(t *testing.T) {
}
}
+func TestLitePushConsumer_SubscribeLite_WithOffset(t *testing.T) {
+ setupTest(t)
+ defer teardownTest()
+
+ dlpc, err := createTestLitePushConsumer(t)
+ if err != nil {
+ t.Fatalf("failed to create test lite push consumer: %v", err)
+ }
+
+ offsetOption, err := NewOffsetOptionWithOffset(100)
+ if err != nil {
+ t.Fatalf("failed to create offset option: %v", err)
+ }
+
+ mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(),
gomock.Any()).DoAndReturn(func(ctx context.Context, req
*v2.SyncLiteSubscriptionRequest) (*v2.SyncLiteSubscriptionResponse, error) {
+ if req.GetAction() != v2.LiteSubscriptionAction_PARTIAL_ADD {
+ t.Errorf("expected action PARTIAL_ADD, got %v",
req.GetAction())
+ }
+ if len(req.GetLiteTopicSet()) != 1 || req.GetLiteTopicSet()[0]
!= "lite-topic-1" {
+ t.Errorf("expected lite topic set ['lite-topic-1'], got
%v", req.GetLiteTopicSet())
+ }
+ if req.GetOffsetOption() == nil {
+ t.Fatal("expected offset option to be set")
+ }
+ if _, ok :=
req.GetOffsetOption().GetOffsetType().(*v2.OffsetOption_Offset); !ok {
+ t.Fatalf("expected offset option type OFFSET, got %T",
req.GetOffsetOption().GetOffsetType())
+ }
+ if req.GetOffsetOption().GetOffset() != 100 {
+ t.Errorf("expected offset 100, got %d",
req.GetOffsetOption().GetOffset())
+ }
+ return setupSuccessResponse(), nil
+ }).Times(1)
+
+ err = dlpc.SubscribeLite("lite-topic-1", offsetOption)
+ if err != nil {
+ t.Fatalf("expected no error for SubscribeLite with offset, got
%v", err)
+ }
+}
+
+func TestLitePushConsumer_SubscribeLite_WithLastOffset(t *testing.T) {
+ setupTest(t)
+ defer teardownTest()
+
+ dlpc, err := createTestLitePushConsumer(t)
+ if err != nil {
+ t.Fatalf("failed to create test lite push consumer: %v", err)
+ }
+
+ mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(),
gomock.Any()).DoAndReturn(func(ctx context.Context, req
*v2.SyncLiteSubscriptionRequest) (*v2.SyncLiteSubscriptionResponse, error) {
+ if req.GetOffsetOption() == nil {
+ t.Fatal("expected offset option to be set")
+ }
+ if _, ok :=
req.GetOffsetOption().GetOffsetType().(*v2.OffsetOption_Policy_); !ok {
+ t.Fatalf("expected offset option type POLICY, got %T",
req.GetOffsetOption().GetOffsetType())
+ }
+ if req.GetOffsetOption().GetPolicy() != v2.OffsetOption_LAST {
+ t.Errorf("expected policy LAST, got %v",
req.GetOffsetOption().GetPolicy())
+ }
+ return setupSuccessResponse(), nil
+ }).Times(1)
+
+ err = dlpc.SubscribeLite("lite-topic-1", LastOffset)
+ if err != nil {
+ t.Fatalf("expected no error for SubscribeLite with last offset,
got %v", err)
+ }
+}
+
func TestLitePushConsumer_SubscribeLite_NotRunning(t *testing.T) {
setupTest(t)
defer teardownTest()
@@ -335,7 +402,7 @@ func
TestLitePushConsumer_syncLiteSubscription_StatusError(t *testing.T) {
mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(),
gomock.Any()).Return(setupErrorResponse(v2.Code_INTERNAL_SERVER_ERROR,
"internal error"), nil)
- err = dlpc.syncLiteSubscription(context.TODO(),
v2.LiteSubscriptionAction_PARTIAL_ADD, []string{"test"})
+ err = dlpc.syncLiteSubscription(context.TODO(),
v2.LiteSubscriptionAction_PARTIAL_ADD, []string{"test"}, nil)
if err == nil {
t.Fatal("expected error for non-OK status code")
}
diff --git a/golang/offset_option.go b/golang/offset_option.go
new file mode 100644
index 00000000..1a79d8c6
--- /dev/null
+++ b/golang/offset_option.go
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package golang
+
+import (
+ "fmt"
+
+ v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
+)
+
+type offsetOptionType int
+
+const (
+ offsetOptionTypePolicy offsetOptionType = iota
+ offsetOptionTypeOffset
+ offsetOptionTypeTailN
+ offsetOptionTypeTimestamp
+)
+
+// OffsetOption specifies the starting offset for lite topic consumption.
+type OffsetOption struct {
+ optionType offsetOptionType
+ value int64
+}
+
+var (
+ // LastOffset starts consuming from the last consumed offset of the
consumer group.
+ LastOffset = OffsetOption{optionType: offsetOptionTypePolicy, value:
int64(v2.OffsetOption_LAST)}
+ // MinOffset starts consuming from the minimum available offset.
+ MinOffset = OffsetOption{optionType: offsetOptionTypePolicy, value:
int64(v2.OffsetOption_MIN)}
+ // MaxOffset starts consuming from the maximum available offset,
skipping existing messages.
+ MaxOffset = OffsetOption{optionType: offsetOptionTypePolicy, value:
int64(v2.OffsetOption_MAX)}
+)
+
+// NewOffsetOptionWithOffset creates an OffsetOption from a specific offset.
+func NewOffsetOptionWithOffset(offset int64) (OffsetOption, error) {
+ if offset < 0 {
+ return OffsetOption{}, fmt.Errorf("offset must be greater than
or equal to 0")
+ }
+ return OffsetOption{optionType: offsetOptionTypeOffset, value: offset},
nil
+}
+
+// NewOffsetOptionWithTailN creates an OffsetOption from the last N messages.
+func NewOffsetOptionWithTailN(tailN int64) (OffsetOption, error) {
+ if tailN < 0 {
+ return OffsetOption{}, fmt.Errorf("tailN must be greater than
or equal to 0")
+ }
+ return OffsetOption{optionType: offsetOptionTypeTailN, value: tailN},
nil
+}
+
+// NewOffsetOptionWithTimestamp creates an OffsetOption from a Unix
millisecond timestamp.
+func NewOffsetOptionWithTimestamp(timestamp int64) (OffsetOption, error) {
+ if timestamp < 0 {
+ return OffsetOption{}, fmt.Errorf("timestamp must be greater
than or equal to 0")
+ }
+ return OffsetOption{optionType: offsetOptionTypeTimestamp, value:
timestamp}, nil
+}
+
+func (option OffsetOption) toProtobuf() *v2.OffsetOption {
+ switch option.optionType {
+ case offsetOptionTypePolicy:
+ return &v2.OffsetOption{
+ OffsetType: &v2.OffsetOption_Policy_{
+ Policy: v2.OffsetOption_Policy(option.value),
+ },
+ }
+ case offsetOptionTypeOffset:
+ return &v2.OffsetOption{
+ OffsetType: &v2.OffsetOption_Offset{
+ Offset: option.value,
+ },
+ }
+ case offsetOptionTypeTailN:
+ return &v2.OffsetOption{
+ OffsetType: &v2.OffsetOption_TailN{
+ TailN: option.value,
+ },
+ }
+ case offsetOptionTypeTimestamp:
+ return &v2.OffsetOption{
+ OffsetType: &v2.OffsetOption_Timestamp{
+ Timestamp: option.value,
+ },
+ }
+ default:
+ return nil
+ }
+}
diff --git a/golang/offset_option_test.go b/golang/offset_option_test.go
new file mode 100644
index 00000000..954b3f59
--- /dev/null
+++ b/golang/offset_option_test.go
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package golang
+
+import (
+ "testing"
+
+ v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
+)
+
+func TestOffsetOptionToProtobuf(t *testing.T) {
+ tests := []struct {
+ name string
+ option OffsetOption
+ offsetType interface{}
+ expectValue int64
+ }{
+ {
+ name: "last offset",
+ option: LastOffset,
+ offsetType: &v2.OffsetOption_Policy_{},
+ expectValue: int64(v2.OffsetOption_LAST),
+ },
+ {
+ name: "offset",
+ option:
mustOffsetOption(NewOffsetOptionWithOffset(100)),
+ offsetType: &v2.OffsetOption_Offset{},
+ expectValue: 100,
+ },
+ {
+ name: "tail n",
+ option:
mustOffsetOption(NewOffsetOptionWithTailN(10)),
+ offsetType: &v2.OffsetOption_TailN{},
+ expectValue: 10,
+ },
+ {
+ name: "timestamp",
+ option:
mustOffsetOption(NewOffsetOptionWithTimestamp(1234567890)),
+ offsetType: &v2.OffsetOption_Timestamp{},
+ expectValue: 1234567890,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ protobufOption := tt.option.toProtobuf()
+ switch tt.offsetType.(type) {
+ case *v2.OffsetOption_Policy_:
+ if _, ok :=
protobufOption.GetOffsetType().(*v2.OffsetOption_Policy_); !ok {
+ t.Fatalf("expected policy offset type,
got %T", protobufOption.GetOffsetType())
+ }
+ if int64(protobufOption.GetPolicy()) !=
tt.expectValue {
+ t.Errorf("expected policy %d, got %d",
tt.expectValue, protobufOption.GetPolicy())
+ }
+ case *v2.OffsetOption_Offset:
+ if _, ok :=
protobufOption.GetOffsetType().(*v2.OffsetOption_Offset); !ok {
+ t.Fatalf("expected offset type, got
%T", protobufOption.GetOffsetType())
+ }
+ if protobufOption.GetOffset() != tt.expectValue
{
+ t.Errorf("expected offset %d, got %d",
tt.expectValue, protobufOption.GetOffset())
+ }
+ case *v2.OffsetOption_TailN:
+ if _, ok :=
protobufOption.GetOffsetType().(*v2.OffsetOption_TailN); !ok {
+ t.Fatalf("expected tail_n type, got
%T", protobufOption.GetOffsetType())
+ }
+ if protobufOption.GetTailN() != tt.expectValue {
+ t.Errorf("expected tail_n %d, got %d",
tt.expectValue, protobufOption.GetTailN())
+ }
+ case *v2.OffsetOption_Timestamp:
+ if _, ok :=
protobufOption.GetOffsetType().(*v2.OffsetOption_Timestamp); !ok {
+ t.Fatalf("expected timestamp type, got
%T", protobufOption.GetOffsetType())
+ }
+ if protobufOption.GetTimestamp() !=
tt.expectValue {
+ t.Errorf("expected timestamp %d, got
%d", tt.expectValue, protobufOption.GetTimestamp())
+ }
+ }
+ })
+ }
+}
+
+func TestOffsetOptionRejectsNegativeValue(t *testing.T) {
+ if _, err := NewOffsetOptionWithOffset(-1); err == nil {
+ t.Fatal("expected error for negative offset")
+ }
+ if _, err := NewOffsetOptionWithTailN(-1); err == nil {
+ t.Fatal("expected error for negative tailN")
+ }
+ if _, err := NewOffsetOptionWithTimestamp(-1); err == nil {
+ t.Fatal("expected error for negative timestamp")
+ }
+}
+
+func mustOffsetOption(option OffsetOption, err error) OffsetOption {
+ if err != nil {
+ panic(err)
+ }
+ return option
+}