This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new f272171d chore(sdk): Add type safety to CommandCode (#1751)
f272171d is described below
commit f272171d3136210cdb7c957a5e8d6e91e74fb322
Author: Nandakumar Vadivelu <[email protected]>
AuthorDate: Wed May 14 22:43:06 2025 +0530
chore(sdk): Add type safety to CommandCode (#1751)
Fixes #1750
---
.../iggy/client/blocking/tcp/CommandCode.java | 192 +++++++++++++++++++++
.../blocking/tcp/ConsumerGroupsTcpClient.java | 19 +-
.../blocking/tcp/ConsumerOffsetTcpClient.java | 7 +-
.../client/blocking/tcp/InternalTcpClient.java | 12 ++
.../client/blocking/tcp/MessagesTcpClient.java | 7 +-
.../client/blocking/tcp/PartitionsTcpClient.java | 6 +-
.../tcp/PersonalAccessTokensTcpClient.java | 13 +-
.../iggy/client/blocking/tcp/StreamsTcpClient.java | 15 +-
.../iggy/client/blocking/tcp/SystemTcpClient.java | 15 +-
.../iggy/client/blocking/tcp/TopicsTcpClient.java | 16 +-
.../iggy/client/blocking/tcp/UsersTcpClient.java | 28 +--
11 files changed, 244 insertions(+), 86 deletions(-)
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/CommandCode.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/CommandCode.java
new file mode 100644
index 00000000..70125056
--- /dev/null
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/CommandCode.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.blocking.tcp;
+
+public interface CommandCode {
+
+ int getValue();
+
+ enum System implements CommandCode {
+ PING(1),
+ GET_STATS(10),
+ GET_ME(20),
+ GET_CLIENT(21),
+ GET_ALL_CLIENTS(22);
+
+ private final int value;
+
+ System(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public int getValue() {
+ return value;
+ }
+ }
+
+ enum User implements CommandCode {
+ GET(31),
+ GET_ALL(32),
+ CREATE(33),
+ DELETE(34),
+ UPDATE(35),
+ UPDATE_PERMISSIONS(36),
+ CHANGE_PASSWORD(37),
+ LOGIN(38),
+ LOGOUT(39);
+
+ private final int value;
+
+ User(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public int getValue() {
+ return value;
+ }
+ }
+
+ enum PersonalAccessToken implements CommandCode {
+ GET_ALL(41),
+ CREATE(42),
+ DELETE(43),
+ LOGIN(44);
+
+ private final int value;
+
+ PersonalAccessToken(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public int getValue() {
+ return value;
+ }
+ }
+
+ enum Messages implements CommandCode {
+ POLL(100),
+ SEND(101);
+
+ private final int value;
+
+ Messages(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public int getValue() {
+ return value;
+ }
+ }
+
+ enum ConsumerOffset implements CommandCode {
+ GET(120),
+ STORE(121);
+
+ private final int value;
+
+ ConsumerOffset(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public int getValue() {
+ return value;
+ }
+ }
+
+ enum Stream implements CommandCode {
+ GET(200),
+ GET_ALL(201),
+ CREATE(202),
+ DELETE(203),
+ UPDATE(204);
+
+ private final int value;
+
+ Stream(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public int getValue() {
+ return value;
+ }
+ }
+
+ enum Topic implements CommandCode {
+ GET(300),
+ GET_ALL(301),
+ CREATE(302),
+ DELETE(303),
+ UPDATE(304),
+ PURGE(305);
+
+ private final int value;
+
+ Topic(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public int getValue() {
+ return value;
+ }
+ }
+
+ enum Partition implements CommandCode {
+ CREATE(402),
+ DELETE(403);
+
+ private final int value;
+
+ Partition(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public int getValue() {
+ return value;
+ }
+ }
+
+ enum ConsumerGroup implements CommandCode {
+ GET(600),
+ GET_ALL(601),
+ CREATE(602),
+ DELETE(603),
+ JOIN(604),
+ LEAVE(605);
+
+ private final int value;
+
+ ConsumerGroup(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public int getValue() {
+ return value;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java
index 3e45230b..7a7a1951 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java
@@ -37,13 +37,6 @@ import static
org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
class ConsumerGroupsTcpClient implements ConsumerGroupsClient {
- private static final int GET_CONSUMER_GROUP_CODE = 600;
- private static final int GET_CONSUMER_GROUPS_CODE = 601;
- private static final int CREATE_CONSUMER_GROUP_CODE = 602;
- private static final int DELETE_CONSUMER_GROUP_CODE = 603;
- private static final int JOIN_CONSUMER_GROUP_CODE = 604;
- private static final int LEAVE_CONSUMER_GROUP_CODE = 605;
-
private final InternalTcpClient tcpClient;
public ConsumerGroupsTcpClient(InternalTcpClient tcpClient) {
@@ -55,7 +48,7 @@ class ConsumerGroupsTcpClient implements ConsumerGroupsClient
{
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
payload.writeBytes(toBytes(groupId));
- var response = tcpClient.send(GET_CONSUMER_GROUP_CODE, payload);
+ var response = tcpClient.send(CommandCode.ConsumerGroup.GET, payload);
if (response.isReadable()) {
return Optional.of(readConsumerGroupDetails(response));
}
@@ -66,7 +59,7 @@ class ConsumerGroupsTcpClient implements ConsumerGroupsClient
{
public List<ConsumerGroup> getConsumerGroups(StreamId streamId, TopicId
topicId) {
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
- var response = tcpClient.send(GET_CONSUMER_GROUPS_CODE, payload);
+ var response = tcpClient.send(CommandCode.ConsumerGroup.GET_ALL,
payload);
List<ConsumerGroup> groups = new ArrayList<>();
while (response.isReadable()) {
groups.add(readConsumerGroup(response));
@@ -85,7 +78,7 @@ class ConsumerGroupsTcpClient implements ConsumerGroupsClient
{
payload.writeIntLE(groupId.orElse(0L).intValue());
payload.writeBytes(nameToBytes(name));
- ByteBuf response = tcpClient.send(CREATE_CONSUMER_GROUP_CODE, payload);
+ ByteBuf response = tcpClient.send(CommandCode.ConsumerGroup.CREATE,
payload);
return readConsumerGroupDetails(response);
}
@@ -94,7 +87,7 @@ class ConsumerGroupsTcpClient implements ConsumerGroupsClient
{
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
payload.writeBytes(toBytes(groupId));
- tcpClient.send(DELETE_CONSUMER_GROUP_CODE, payload);
+ tcpClient.send(CommandCode.ConsumerGroup.DELETE, payload);
}
@Override
@@ -103,7 +96,7 @@ class ConsumerGroupsTcpClient implements
ConsumerGroupsClient {
payload.writeBytes(toBytes(topicId));
payload.writeBytes(toBytes(groupId));
- tcpClient.send(JOIN_CONSUMER_GROUP_CODE, payload);
+ tcpClient.send(CommandCode.ConsumerGroup.JOIN, payload);
}
@Override
@@ -112,7 +105,7 @@ class ConsumerGroupsTcpClient implements
ConsumerGroupsClient {
payload.writeBytes(toBytes(topicId));
payload.writeBytes(toBytes(groupId));
- tcpClient.send(LEAVE_CONSUMER_GROUP_CODE, payload);
+ tcpClient.send(CommandCode.ConsumerGroup.LEAVE, payload);
}
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java
index 342cb110..24e1c5a6 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java
@@ -32,9 +32,6 @@ import static
org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytesAsU64;
class ConsumerOffsetTcpClient implements ConsumerOffsetsClient {
- private static final int GET_CONSUMER_OFFSET_CODE = 120;
- private static final int STORE_CONSUMER_OFFSET_CODE = 121;
-
private final InternalTcpClient tcpClient;
public ConsumerOffsetTcpClient(InternalTcpClient tcpClient) {
@@ -49,7 +46,7 @@ class ConsumerOffsetTcpClient implements
ConsumerOffsetsClient {
payload.writeIntLE(partitionId.orElse(0L).intValue());
payload.writeBytes(toBytesAsU64(offset));
- tcpClient.send(STORE_CONSUMER_OFFSET_CODE, payload);
+ tcpClient.send(CommandCode.ConsumerOffset.STORE, payload);
}
@Override
@@ -59,7 +56,7 @@ class ConsumerOffsetTcpClient implements
ConsumerOffsetsClient {
payload.writeBytes(toBytes(topicId));
payload.writeIntLE(partitionId.orElse(0L).intValue());
- var response = tcpClient.send(GET_CONSUMER_OFFSET_CODE, payload);
+ var response = tcpClient.send(CommandCode.ConsumerOffset.GET, payload);
if (response.isReadable()) {
return Optional.of(readConsumerOffsetInfo(response));
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
index ceccee99..59b89c3b 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
@@ -52,10 +52,22 @@ final class InternalTcpClient {
this.connection.inbound().receiveObject().ofType(IggyResponse.class).subscribe(responses::add);
}
+ ByteBuf send(CommandCode code) {
+ return send(code.getValue());
+ }
+
+ /** Use {@link #send(CommandCode)} instead. */
+ @Deprecated
ByteBuf send(int command) {
return send(command, Unpooled.EMPTY_BUFFER);
}
+ ByteBuf send(CommandCode code, ByteBuf payload) {
+ return send(code.getValue(), payload);
+ }
+
+ /** Use {@link #send(CommandCode, ByteBuf)} instead. */
+ @Deprecated
ByteBuf send(int command, ByteBuf payload) {
var payloadSize = payload.readableBytes() + COMMAND_LENGTH;
var buffer = Unpooled.buffer(REQUEST_INITIAL_BYTES_LENGTH +
payloadSize);
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java
index 04149ea5..1bc88b4f 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java
@@ -33,9 +33,6 @@ import static
org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
class MessagesTcpClient implements MessagesClient {
- private static final int POLL_MESSAGES_CODE = 100;
- private static final int SEND_MESSAGES_CODE = 101;
-
private final InternalTcpClient tcpClient;
public MessagesTcpClient(InternalTcpClient tcpClient) {
@@ -52,7 +49,7 @@ class MessagesTcpClient implements MessagesClient {
payload.writeIntLE(count.intValue());
payload.writeByte(autoCommit ? 1 : 0);
- var response = tcpClient.send(POLL_MESSAGES_CODE, payload);
+ var response = tcpClient.send(CommandCode.Messages.POLL, payload);
return BytesDeserializer.readPolledMessages(response);
}
@@ -66,6 +63,6 @@ class MessagesTcpClient implements MessagesClient {
payload.writeBytes(toBytes(message));
}
- tcpClient.send(SEND_MESSAGES_CODE, payload);
+ tcpClient.send(CommandCode.Messages.SEND, payload);
}
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java
index 7d0e6e96..e0874c48 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java
@@ -26,8 +26,6 @@ import static
org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
class PartitionsTcpClient implements PartitionsClient {
- private static final int CREATE_PARTITION_CODE = 402;
- private static final int DELETE_PARTITION_CODE = 403;
private final InternalTcpClient tcpClient;
PartitionsTcpClient(InternalTcpClient tcpClient) {
@@ -39,7 +37,7 @@ class PartitionsTcpClient implements PartitionsClient {
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
payload.writeIntLE(partitionsCount.intValue());
- tcpClient.send(CREATE_PARTITION_CODE, payload);
+ tcpClient.send(CommandCode.Partition.CREATE, payload);
}
@Override
@@ -47,6 +45,6 @@ class PartitionsTcpClient implements PartitionsClient {
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
payload.writeIntLE(partitionsCount.intValue());
- tcpClient.send(DELETE_PARTITION_CODE, payload);
+ tcpClient.send(CommandCode.Partition.DELETE, payload);
}
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java
index 6a96a87d..bdb5cccf 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java
@@ -35,11 +35,6 @@ import static
org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytesAsU64;
class PersonalAccessTokensTcpClient implements PersonalAccessTokensClient {
- private static final int GET_PERSONAL_ACCESS_TOKENS_CODE = 41;
- private static final int CREATE_PERSONAL_ACCESS_TOKEN_CODE = 42;
- private static final int DELETE_PERSONAL_ACCESS_TOKEN_CODE = 43;
- private static final int LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE = 44;
-
private final InternalTcpClient tcpClient;
public PersonalAccessTokensTcpClient(InternalTcpClient tcpClient) {
@@ -51,13 +46,13 @@ class PersonalAccessTokensTcpClient implements
PersonalAccessTokensClient {
var payload = Unpooled.buffer();
payload.writeBytes(nameToBytes(name));
payload.writeBytes(toBytesAsU64(expiry));
- var response = tcpClient.send(CREATE_PERSONAL_ACCESS_TOKEN_CODE,
payload);
+ var response = tcpClient.send(CommandCode.PersonalAccessToken.CREATE,
payload);
return readRawPersonalAccessToken(response);
}
@Override
public List<PersonalAccessTokenInfo> getPersonalAccessTokens() {
- var response = tcpClient.send(GET_PERSONAL_ACCESS_TOKENS_CODE);
+ var response = tcpClient.send(CommandCode.PersonalAccessToken.GET_ALL);
var tokens = new ArrayList<PersonalAccessTokenInfo>();
while (response.isReadable()) {
tokens.add(readPersonalAccessTokenInfo(response));
@@ -68,13 +63,13 @@ class PersonalAccessTokensTcpClient implements
PersonalAccessTokensClient {
@Override
public void deletePersonalAccessToken(String name) {
var payload = nameToBytes(name);
- tcpClient.send(DELETE_PERSONAL_ACCESS_TOKEN_CODE, payload);
+ tcpClient.send(CommandCode.PersonalAccessToken.DELETE, payload);
}
@Override
public IdentityInfo loginWithPersonalAccessToken(String token) {
var payload = nameToBytes(token);
- var response = tcpClient.send(LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE,
payload);
+ var response = tcpClient.send(CommandCode.PersonalAccessToken.LOGIN,
payload);
var userId = response.readUnsignedIntLE();
return new IdentityInfo(userId, Optional.empty());
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java
index feca3417..03e1292a 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java
@@ -35,11 +35,6 @@ import static
org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
class StreamsTcpClient implements StreamsClient {
- private static final int GET_STREAM_CODE = 200;
- private static final int GET_STREAMS_CODE = 201;
- private static final int CREATE_STREAM_CODE = 202;
- private static final int DELETE_STREAM_CODE = 203;
- private static final int UPDATE_STREAM_CODE = 204;
private final InternalTcpClient tcpClient;
StreamsTcpClient(InternalTcpClient tcpClient) {
@@ -53,14 +48,14 @@ class StreamsTcpClient implements StreamsClient {
payload.writeIntLE(streamId.orElse(0L).intValue());
payload.writeBytes(nameToBytes(name));
- var response = tcpClient.send(CREATE_STREAM_CODE, payload);
+ var response = tcpClient.send(CommandCode.Stream.CREATE, payload);
return readStreamDetails(response);
}
@Override
public Optional<StreamDetails> getStream(StreamId streamId) {
var payload = toBytes(streamId);
- var response = tcpClient.send(GET_STREAM_CODE, payload);
+ var response = tcpClient.send(CommandCode.Stream.GET, payload);
if (response.isReadable()) {
return Optional.of(readStreamDetails(response));
}
@@ -69,7 +64,7 @@ class StreamsTcpClient implements StreamsClient {
@Override
public List<StreamBase> getStreams() {
- ByteBuf response = tcpClient.send(GET_STREAMS_CODE);
+ ByteBuf response = tcpClient.send(CommandCode.Stream.GET_ALL);
List<StreamBase> streams = new ArrayList<>();
while (response.isReadable()) {
streams.add(readStreamBase(response));
@@ -85,13 +80,13 @@ class StreamsTcpClient implements StreamsClient {
payload.writeBytes(idBytes);
payload.writeBytes(nameToBytes(name));
- tcpClient.send(UPDATE_STREAM_CODE, payload);
+ tcpClient.send(CommandCode.Stream.UPDATE, payload);
}
@Override
public void deleteStream(StreamId streamId) {
var payload = toBytes(streamId);
- tcpClient.send(DELETE_STREAM_CODE, payload);
+ tcpClient.send(CommandCode.Stream.DELETE, payload);
}
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java
index 1e57a380..17920ea2 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java
@@ -29,11 +29,6 @@ import java.util.List;
import static org.apache.iggy.client.blocking.tcp.BytesDeserializer.*;
class SystemTcpClient implements SystemClient {
- private static final int PING_CODE = 1;
- private static final int GET_STATS_CODE = 10;
- private static final int GET_ME_CODE = 20;
- private static final int GET_CLIENT_CODE = 21;
- private static final int GET_CLIENTS_CODE = 22;
private final InternalTcpClient tcpClient;
@@ -43,13 +38,13 @@ class SystemTcpClient implements SystemClient {
@Override
public Stats getStats() {
- var response = tcpClient.send(GET_STATS_CODE);
+ var response = tcpClient.send(CommandCode.System.GET_STATS);
return readStats(response);
}
@Override
public ClientInfoDetails getMe() {
- var response = tcpClient.send(GET_ME_CODE);
+ var response = tcpClient.send(CommandCode.System.GET_ME);
return readClientInfoDetails(response);
}
@@ -57,13 +52,13 @@ class SystemTcpClient implements SystemClient {
public ClientInfoDetails getClient(Long clientId) {
var payload = Unpooled.buffer(4);
payload.writeIntLE(clientId.intValue());
- var response = tcpClient.send(GET_CLIENT_CODE, payload);
+ var response = tcpClient.send(CommandCode.System.GET_CLIENT, payload);
return readClientInfoDetails(response);
}
@Override
public List<ClientInfo> getClients() {
- var response = tcpClient.send(GET_CLIENTS_CODE);
+ var response = tcpClient.send(CommandCode.System.GET_ALL_CLIENTS);
List<ClientInfo> clients = new ArrayList<>();
while (response.isReadable()) {
clients.add(readClientInfo(response));
@@ -73,7 +68,7 @@ class SystemTcpClient implements SystemClient {
@Override
public String ping() {
- tcpClient.send(PING_CODE);
+ tcpClient.send(CommandCode.System.PING);
return "";
}
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java
index f23c39a6..971f5bb0 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java
@@ -36,12 +36,6 @@ import static
org.apache.iggy.client.blocking.tcp.BytesSerializer.*;
class TopicsTcpClient implements TopicsClient {
- private static final int GET_TOPIC_CODE = 300;
- private static final int GET_TOPICS_CODE = 301;
- private static final int CREATE_TOPIC_CODE = 302;
- private static final int DELETE_TOPIC_CODE = 303;
- private static final int UPDATE_TOPIC_CODE = 304;
- private static final int PURGE_TOPIC_CODE = 305;
private final InternalTcpClient tcpClient;
TopicsTcpClient(InternalTcpClient tcpClient) {
@@ -52,7 +46,7 @@ class TopicsTcpClient implements TopicsClient {
public Optional<TopicDetails> getTopic(StreamId streamId, TopicId topicId)
{
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
- var response = tcpClient.send(GET_TOPIC_CODE, payload);
+ var response = tcpClient.send(CommandCode.Topic.GET, payload);
if (response.isReadable()) {
return Optional.of(readTopicDetails(response));
}
@@ -62,7 +56,7 @@ class TopicsTcpClient implements TopicsClient {
@Override
public List<Topic> getTopics(StreamId streamId) {
var payload = toBytes(streamId);
- var response = tcpClient.send(GET_TOPICS_CODE, payload);
+ var response = tcpClient.send(CommandCode.Topic.GET_ALL, payload);
List<Topic> topics = new ArrayList<>();
while (response.isReadable()) {
topics.add(readTopic(response));
@@ -84,7 +78,7 @@ class TopicsTcpClient implements TopicsClient {
payload.writeByte(replicationFactor.orElse((short) 0));
payload.writeBytes(nameToBytes(name));
- var response = tcpClient.send(CREATE_TOPIC_CODE, payload);
+ var response = tcpClient.send(CommandCode.Topic.CREATE, payload);
return readTopicDetails(response);
}
@@ -105,13 +99,13 @@ class TopicsTcpClient implements TopicsClient {
payload.writeByte(replicationFactor.orElse((short) 0));
payload.writeBytes(nameToBytes(name));
- tcpClient.send(UPDATE_TOPIC_CODE, payload);
+ tcpClient.send(CommandCode.Topic.UPDATE, payload);
}
@Override
public void deleteTopic(StreamId streamId, TopicId topicId) {
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
- tcpClient.send(DELETE_TOPIC_CODE, payload);
+ tcpClient.send(CommandCode.Topic.DELETE, payload);
}
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java
index e856ddf1..2bff56c4 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java
@@ -36,16 +36,6 @@ import static
org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
class UsersTcpClient implements UsersClient {
- private static final int GET_USER_CODE = 31;
- private static final int GET_USERS_CODE = 32;
- private static final int CREATE_USER_CODE = 33;
- private static final int DELETE_USER_CODE = 34;
- private static final int UPDATE_USER_CODE = 35;
- private static final int UPDATE_PERMISSIONS_CODE = 36;
- private static final int CHANGE_PASSWORD_CODE = 37;
- private static final int LOGIN_USER_CODE = 38;
- private static final int LOGOUT_USER_CODE = 39;
-
private final InternalTcpClient tcpClient;
UsersTcpClient(InternalTcpClient tcpClient) {
@@ -55,7 +45,7 @@ class UsersTcpClient implements UsersClient {
@Override
public Optional<UserInfoDetails> getUser(UserId userId) {
var payload = toBytes(userId);
- var response = tcpClient.send(GET_USER_CODE, payload);
+ var response = tcpClient.send(CommandCode.User.GET, payload);
if (response.isReadable()) {
return Optional.of(readUserInfoDetails(response));
}
@@ -64,7 +54,7 @@ class UsersTcpClient implements UsersClient {
@Override
public List<UserInfo> getUsers() {
- var response = tcpClient.send(GET_USERS_CODE);
+ var response = tcpClient.send(CommandCode.User.GET_ALL);
List<UserInfo> users = new ArrayList<>();
while (response.isReadable()) {
users.add(BytesDeserializer.readUserInfo(response));
@@ -85,14 +75,14 @@ class UsersTcpClient implements UsersClient {
payload.writeBytes(permissionBytes);
}, () -> payload.writeByte(0));
- var response = tcpClient.send(CREATE_USER_CODE, payload);
+ var response = tcpClient.send(CommandCode.User.CREATE, payload);
return readUserInfoDetails(response);
}
@Override
public void deleteUser(UserId userId) {
var payload = toBytes(userId);
- tcpClient.send(DELETE_USER_CODE, payload);
+ tcpClient.send(CommandCode.User.DELETE, payload);
}
@Override
@@ -107,7 +97,7 @@ class UsersTcpClient implements UsersClient {
payload.writeByte(status.asCode());
}, () -> payload.writeByte(0));
- tcpClient.send(UPDATE_USER_CODE, payload);
+ tcpClient.send(CommandCode.User.UPDATE, payload);
}
@Override
@@ -121,7 +111,7 @@ class UsersTcpClient implements UsersClient {
payload.writeBytes(permissionBytes);
}, () -> payload.writeByte(0));
- tcpClient.send(UPDATE_PERMISSIONS_CODE, payload);
+ tcpClient.send(CommandCode.User.UPDATE_PERMISSIONS, payload);
}
@Override
@@ -130,7 +120,7 @@ class UsersTcpClient implements UsersClient {
payload.writeBytes(nameToBytes(currentPassword));
payload.writeBytes(nameToBytes(newPassword));
- tcpClient.send(CHANGE_PASSWORD_CODE, payload);
+ tcpClient.send(CommandCode.User.CHANGE_PASSWORD, payload);
}
@Override
@@ -147,7 +137,7 @@ class UsersTcpClient implements UsersClient {
payload.writeIntLE(context.length());
payload.writeBytes(context.getBytes());
- var response = tcpClient.send(LOGIN_USER_CODE, payload);
+ var response = tcpClient.send(CommandCode.User.LOGIN, payload);
var userId = response.readUnsignedIntLE();
return new IdentityInfo(userId, Optional.empty());
@@ -155,6 +145,6 @@ class UsersTcpClient implements UsersClient {
@Override
public void logout() {
- tcpClient.send(LOGOUT_USER_CODE);
+ tcpClient.send(CommandCode.User.LOGOUT);
}
}