This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 6188b772 refactor(sdk): change argument of CreateTopic and UpdateTopic
method. (#2004)
6188b772 is described below
commit 6188b772d62c2e5b7b6389c85d5f29d76b2c8f36
Author: Chengxi <[email protected]>
AuthorDate: Sun Jul 13 01:28:11 2025 -0400
refactor(sdk): change argument of CreateTopic and UpdateTopic method.
(#2004)
1. Introduce CompressionAlgorithm enum type
Replaced the raw `uint8` type used for the `compressionAlgorithm`
parameter in the `CreateTopic` and `UpdateTopic` functions with a custom
type `CompressionAlgorithm`.
2. Add iggcon.Duration type aligned with Rust SDK
Introduced a new `iggcon.Duration` type to replace `time.Duration`,
based on `uint64`, with microsecond (µs) precision. This provides
consistency with the Rust SDK and helps prevent overflow in future
scenarios where high-resolution durations may be used.
3. Changed the `partitionsCount` type from int to uint32
---
bdd/go/tests/tcp_test/topic_feature_create.go | 24 ++++++-------
bdd/go/tests/tcp_test/topic_feature_update.go | 8 ++---
.../go/benchmarks/send_messages_benchmark_test.go | 4 +--
.../binary_response_deserializer.go | 4 +--
.../create_topic_serializer.go | 26 +++++++-------
.../update_topic_serializer.go | 21 ++++++------
.../update_topic_serializer_test.go | 2 +-
foreign/go/contracts/compression_algorithm.go | 25 ++++++++++++++
foreign/go/contracts/duration.go | 40 ++++++++++++++++++++++
foreign/go/contracts/topics.go | 20 +++++------
foreign/go/iggycli/client.go | 12 +++----
foreign/go/tcp/tcp_topic_managament.go | 12 +++----
12 files changed, 128 insertions(+), 70 deletions(-)
diff --git a/bdd/go/tests/tcp_test/topic_feature_create.go
b/bdd/go/tests/tcp_test/topic_feature_create.go
index abf89dbd..92477f66 100644
--- a/bdd/go/tests/tcp_test/topic_feature_create.go
+++ b/bdd/go/tests/tcp_test/topic_feature_create.go
@@ -39,8 +39,8 @@ var _ = ginkgo.Describe("CREATE TOPIC:", func() {
streamIdentifier,
name,
2,
- 1,
- 1000,
+ iggcon.CompressionAlgorithmNone,
+ iggcon.Millisecond,
math.MaxUint64,
&replicationFactor,
&topicId)
@@ -60,8 +60,8 @@ var _ = ginkgo.Describe("CREATE TOPIC:", func() {
streamIdentifier,
name,
2,
- 1,
- 1000,
+ iggcon.CompressionAlgorithmNone,
+ iggcon.Millisecond,
math.MaxUint64,
&replicationFactor,
&topicId)
@@ -82,8 +82,8 @@ var _ = ginkgo.Describe("CREATE TOPIC:", func() {
streamIdentifier,
name,
2,
- 1,
- 0,
+ iggcon.CompressionAlgorithmNone,
+ iggcon.IggyExpiryServerDefault,
math.MaxUint64,
&replicationFactor,
&topicId)
@@ -101,8 +101,8 @@ var _ = ginkgo.Describe("CREATE TOPIC:", func() {
streamIdentifier,
createRandomString(32),
2,
- 1,
- 0,
+ iggcon.CompressionAlgorithmNone,
+ iggcon.IggyExpiryServerDefault,
math.MaxUint64,
&replicationFactor,
&topicId)
@@ -121,8 +121,8 @@ var _ = ginkgo.Describe("CREATE TOPIC:", func() {
streamIdentifier,
createRandomString(256),
2,
- 1,
- 0,
+ iggcon.CompressionAlgorithmNone,
+ iggcon.IggyExpiryServerDefault,
math.MaxUint64,
&replicationFactor,
&topicId)
@@ -141,8 +141,8 @@ var _ = ginkgo.Describe("CREATE TOPIC:", func() {
streamIdentifier,
"name",
2,
- 1,
- 0,
+ iggcon.CompressionAlgorithmNone,
+ iggcon.IggyExpiryServerDefault,
math.MaxUint64,
&replicationFactor,
&topicId)
diff --git a/bdd/go/tests/tcp_test/topic_feature_update.go
b/bdd/go/tests/tcp_test/topic_feature_update.go
index 8c606404..21a97fb5 100644
--- a/bdd/go/tests/tcp_test/topic_feature_update.go
+++ b/bdd/go/tests/tcp_test/topic_feature_update.go
@@ -40,8 +40,8 @@ var _ = ginkgo.Describe("UPDATE TOPIC:", func() {
streamIdentifier,
topicIdentifier,
newName,
- 1,
- 1,
+ iggcon.CompressionAlgorithmNone,
+ iggcon.Microsecond,
math.MaxUint64,
&replicationFactor)
itShouldNotReturnError(err)
@@ -61,8 +61,8 @@ var _ = ginkgo.Describe("UPDATE TOPIC:", func() {
streamIdentifier,
topic2Identifier,
topic1Name,
- 1,
- 0,
+ iggcon.CompressionAlgorithmNone,
+ iggcon.IggyExpiryServerDefault,
math.MaxUint64,
&replicationFactor)
diff --git a/foreign/go/benchmarks/send_messages_benchmark_test.go
b/foreign/go/benchmarks/send_messages_benchmark_test.go
index 301f5ff8..04dafdf5 100644
--- a/foreign/go/benchmarks/send_messages_benchmark_test.go
+++ b/foreign/go/benchmarks/send_messages_benchmark_test.go
@@ -118,8 +118,8 @@ func ensureInfrastructureIsInitialized(cli iggycli.Client,
streamId uint32) erro
streamIdentifier,
"benchmark",
1,
- 1,
- 0,
+ iggcon.CompressionAlgorithmNone,
+ iggcon.IggyExpiryServerDefault,
1,
nil,
nil,
diff --git a/foreign/go/binary_serialization/binary_response_deserializer.go
b/foreign/go/binary_serialization/binary_response_deserializer.go
index e2b746af..6a46d5bf 100644
--- a/foreign/go/binary_serialization/binary_response_deserializer.go
+++ b/foreign/go/binary_serialization/binary_response_deserializer.go
@@ -201,11 +201,11 @@ func DeserializeToTopic(payload []byte, position int)
(iggcon.Topic, int, error)
topic.Id = binary.LittleEndian.Uint32(payload[position : position+4])
topic.CreatedAt = binary.LittleEndian.Uint64(payload[position+4 :
position+12])
topic.PartitionsCount = binary.LittleEndian.Uint32(payload[position+12
: position+16])
- topic.MessageExpiry = time.Microsecond *
time.Duration(int(binary.LittleEndian.Uint64(payload[position+16:position+24])))
+ topic.MessageExpiry =
iggcon.Duration(binary.LittleEndian.Uint64(payload[position+16 : position+24]))
topic.CompressionAlgorithm = payload[position+24]
topic.MaxTopicSize = binary.LittleEndian.Uint64(payload[position+25 :
position+33])
topic.ReplicationFactor = payload[position+33]
- topic.SizeBytes = binary.LittleEndian.Uint64(payload[position+34 :
position+42])
+ topic.Size = binary.LittleEndian.Uint64(payload[position+34 :
position+42])
topic.MessagesCount = binary.LittleEndian.Uint64(payload[position+42 :
position+50])
nameLength := int(payload[position+50])
diff --git a/foreign/go/binary_serialization/create_topic_serializer.go
b/foreign/go/binary_serialization/create_topic_serializer.go
index 5fc2ed5b..e9cb4587 100644
--- a/foreign/go/binary_serialization/create_topic_serializer.go
+++ b/foreign/go/binary_serialization/create_topic_serializer.go
@@ -19,20 +19,18 @@ package binaryserialization
import (
"encoding/binary"
- "time"
-
iggcon "github.com/apache/iggy/foreign/go/contracts"
)
type TcpCreateTopicRequest struct {
- StreamId iggcon.Identifier `json:"streamId"`
- PartitionsCount int `json:"partitionsCount"`
- CompressionAlgorithm uint8 `json:"compressionAlgorithm"`
- MessageExpiry time.Duration `json:"messageExpiry"`
- MaxTopicSize uint64 `json:"maxTopicSize"`
- Name string `json:"name"`
- ReplicationFactor *uint8 `json:"replicationFactor"`
- TopicId *uint32 `json:"topicId"`
+ StreamId iggcon.Identifier `json:"streamId"`
+ PartitionsCount uint32
`json:"partitionsCount"`
+ CompressionAlgorithm iggcon.CompressionAlgorithm
`json:"compressionAlgorithm"`
+ MessageExpiry iggcon.Duration `json:"messageExpiry"`
+ MaxTopicSize uint64 `json:"maxTopicSize"`
+ Name string `json:"name"`
+ ReplicationFactor *uint8
`json:"replicationFactor"`
+ TopicId *uint32 `json:"topicId"`
}
func (request *TcpCreateTopicRequest) Serialize() []byte {
@@ -64,19 +62,19 @@ func (request *TcpCreateTopicRequest) Serialize() []byte {
position += len(streamIdBytes)
// TopicId
- binary.LittleEndian.PutUint32(bytes[position:],
uint32(*request.TopicId))
+ binary.LittleEndian.PutUint32(bytes[position:], *request.TopicId)
position += 4
// PartitionsCount
- binary.LittleEndian.PutUint32(bytes[position:],
uint32(request.PartitionsCount))
+ binary.LittleEndian.PutUint32(bytes[position:], request.PartitionsCount)
position += 4
// CompressionAlgorithm
- bytes[position] = request.CompressionAlgorithm
+ bytes[position] = byte(request.CompressionAlgorithm)
position++
// MessageExpiry
- binary.LittleEndian.PutUint64(bytes[position:],
uint64(request.MessageExpiry.Microseconds()))
+ binary.LittleEndian.PutUint64(bytes[position:],
uint64(request.MessageExpiry))
position += 8
// MaxTopicSize
diff --git a/foreign/go/binary_serialization/update_topic_serializer.go
b/foreign/go/binary_serialization/update_topic_serializer.go
index 20f85f97..2005cb08 100644
--- a/foreign/go/binary_serialization/update_topic_serializer.go
+++ b/foreign/go/binary_serialization/update_topic_serializer.go
@@ -19,19 +19,18 @@ package binaryserialization
import (
"encoding/binary"
- "time"
iggcon "github.com/apache/iggy/foreign/go/contracts"
)
type TcpUpdateTopicRequest struct {
- StreamId iggcon.Identifier `json:"streamId"`
- TopicId iggcon.Identifier `json:"topicId"`
- CompressionAlgorithm uint8 `json:"compressionAlgorithm"`
- MessageExpiry time.Duration `json:"messageExpiry"`
- MaxTopicSize uint64 `json:"maxTopicSize"`
- ReplicationFactor *uint8 `json:"replicationFactor"`
- Name string `json:"name"`
+ StreamId iggcon.Identifier `json:"streamId"`
+ TopicId iggcon.Identifier `json:"topicId"`
+ CompressionAlgorithm iggcon.CompressionAlgorithm
`json:"compressionAlgorithm"`
+ MessageExpiry iggcon.Duration `json:"messageExpiry"`
+ MaxTopicSize uint64 `json:"maxTopicSize"`
+ ReplicationFactor *uint8
`json:"replicationFactor"`
+ Name string `json:"name"`
}
func (request *TcpUpdateTopicRequest) Serialize() []byte {
@@ -48,13 +47,13 @@ func (request *TcpUpdateTopicRequest) Serialize() []byte {
offset += copy(buffer[offset:], streamIdBytes)
offset += copy(buffer[offset:], topicIdBytes)
- buffer[offset] = request.CompressionAlgorithm
+ buffer[offset] = byte(request.CompressionAlgorithm)
offset++
- binary.LittleEndian.PutUint64(buffer[offset:],
uint64(request.MessageExpiry.Microseconds()))
+ binary.LittleEndian.PutUint64(buffer[offset:],
uint64(request.MessageExpiry))
offset += 8
- binary.LittleEndian.PutUint64(buffer[offset:],
uint64(request.MaxTopicSize))
+ binary.LittleEndian.PutUint64(buffer[offset:], request.MaxTopicSize)
offset += 8
buffer[offset] = *request.ReplicationFactor
diff --git a/foreign/go/binary_serialization/update_topic_serializer_test.go
b/foreign/go/binary_serialization/update_topic_serializer_test.go
index 3c4e14f2..f9df2222 100644
--- a/foreign/go/binary_serialization/update_topic_serializer_test.go
+++ b/foreign/go/binary_serialization/update_topic_serializer_test.go
@@ -30,7 +30,7 @@ func TestSerialize_UpdateTopic(t *testing.T) {
StreamId: streamId,
TopicId: topicId,
Name: "update_topic",
- MessageExpiry: 100000,
+ MessageExpiry: 100 * iggcon.Microsecond,
MaxTopicSize: 100,
}
diff --git a/foreign/go/contracts/compression_algorithm.go
b/foreign/go/contracts/compression_algorithm.go
new file mode 100644
index 00000000..389318a1
--- /dev/null
+++ b/foreign/go/contracts/compression_algorithm.go
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iggcon
+
+type CompressionAlgorithm uint8
+
+const (
+ CompressionAlgorithmNone CompressionAlgorithm = 1
+ CompressionAlgorithmGzip CompressionAlgorithm = 2
+)
diff --git a/foreign/go/contracts/duration.go b/foreign/go/contracts/duration.go
new file mode 100644
index 00000000..607347dd
--- /dev/null
+++ b/foreign/go/contracts/duration.go
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iggcon
+
+import (
+ "math"
+)
+
+const (
+ // IggyExpiryServerDefault use the default expiry time from the server
+ IggyExpiryServerDefault Duration = 0
+ // IggyExpiryNeverExpire never expire
+ IggyExpiryNeverExpire Duration = math.MaxUint64
+)
+
+// Duration represents the expiration duration in microsecond (µs).
+type Duration uint64
+
+const (
+ Microsecond Duration = 1
+ Millisecond = 1000 * Microsecond
+ Second = 1000 * Millisecond
+ Minute = 60 * Second
+ Hour = 60 * Minute
+)
diff --git a/foreign/go/contracts/topics.go b/foreign/go/contracts/topics.go
index 23305a78..554be7ee 100644
--- a/foreign/go/contracts/topics.go
+++ b/foreign/go/contracts/topics.go
@@ -41,16 +41,16 @@ type UpdateTopicRequest struct {
}
type Topic struct {
- Id uint32 `json:"id"`
- CreatedAt uint64 `json:"createdAt"`
- Name string `json:"name"`
- SizeBytes uint64 `json:"sizeBytes"`
- MessageExpiry time.Duration `json:"messageExpiry"`
- CompressionAlgorithm uint8 `json:"compressionAlgorithm"`
- MaxTopicSize uint64 `json:"maxTopicSize"`
- ReplicationFactor uint8 `json:"replicationFactor"`
- MessagesCount uint64 `json:"messagesCount"`
- PartitionsCount uint32 `json:"partitionsCount"`
+ Id uint32 `json:"id"`
+ CreatedAt uint64 `json:"createdAt"`
+ Name string `json:"name"`
+ Size uint64 `json:"size"`
+ MessageExpiry Duration `json:"messageExpiry"`
+ CompressionAlgorithm uint8 `json:"compressionAlgorithm"`
+ MaxTopicSize uint64 `json:"maxTopicSize"`
+ ReplicationFactor uint8 `json:"replicationFactor"`
+ MessagesCount uint64 `json:"messagesCount"`
+ PartitionsCount uint32 `json:"partitionsCount"`
}
type TopicDetails struct {
diff --git a/foreign/go/iggycli/client.go b/foreign/go/iggycli/client.go
index 6befa52a..cc0cc191 100644
--- a/foreign/go/iggycli/client.go
+++ b/foreign/go/iggycli/client.go
@@ -18,8 +18,6 @@
package iggycli
import (
- "time"
-
iggcon "github.com/apache/iggy/foreign/go/contracts"
)
@@ -57,9 +55,9 @@ type Client interface {
CreateTopic(
streamId iggcon.Identifier,
name string,
- partitionsCount int,
- compressionAlgorithm uint8,
- messageExpiry time.Duration,
+ partitionsCount uint32,
+ compressionAlgorithm iggcon.CompressionAlgorithm,
+ messageExpiry iggcon.Duration,
maxTopicSize uint64,
replicationFactor *uint8,
topicId *uint32,
@@ -71,8 +69,8 @@ type Client interface {
streamId iggcon.Identifier,
topicId iggcon.Identifier,
name string,
- compressionAlgorithm uint8,
- messageExpiry time.Duration,
+ compressionAlgorithm iggcon.CompressionAlgorithm,
+ messageExpiry iggcon.Duration,
maxTopicSize uint64,
replicationFactor *uint8,
) error
diff --git a/foreign/go/tcp/tcp_topic_managament.go
b/foreign/go/tcp/tcp_topic_managament.go
index ebefabee..19c14278 100644
--- a/foreign/go/tcp/tcp_topic_managament.go
+++ b/foreign/go/tcp/tcp_topic_managament.go
@@ -18,8 +18,6 @@
package tcp
import (
- "time"
-
binaryserialization
"github.com/apache/iggy/foreign/go/binary_serialization"
iggcon "github.com/apache/iggy/foreign/go/contracts"
ierror "github.com/apache/iggy/foreign/go/errors"
@@ -56,9 +54,9 @@ func (tms *IggyTcpClient) GetTopic(streamId
iggcon.Identifier, topicId iggcon.Id
func (tms *IggyTcpClient) CreateTopic(
streamId iggcon.Identifier,
name string,
- partitionsCount int,
- compressionAlgorithm uint8,
- messageExpiry time.Duration,
+ partitionsCount uint32,
+ compressionAlgorithm iggcon.CompressionAlgorithm,
+ messageExpiry iggcon.Duration,
maxTopicSize uint64,
replicationFactor *uint8,
topicId *uint32,
@@ -88,8 +86,8 @@ func (tms *IggyTcpClient) UpdateTopic(
streamId iggcon.Identifier,
topicId iggcon.Identifier,
name string,
- compressionAlgorithm uint8,
- messageExpiry time.Duration,
+ compressionAlgorithm iggcon.CompressionAlgorithm,
+ messageExpiry iggcon.Duration,
maxTopicSize uint64,
replicationFactor *uint8,
) error {