This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new f1588b33d refactor(java): unify deserialization and serialization code 
for tcp clients (#2477)
f1588b33d is described below

commit f1588b33d3ca08adff47edb5278f8d4e4f953b29
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Fri Dec 12 13:35:24 2025 +0100

    refactor(java): unify deserialization and serialization code for tcp 
clients (#2477)
    
    resolves #2224
---
 .../client/async/tcp/AsyncBytesDeserializer.java   | 210 ------------------
 .../client/async/tcp/AsyncBytesSerializer.java     | 237 ---------------------
 .../client/async/tcp/ConsumerGroupsTcpClient.java  |  15 +-
 .../iggy/client/async/tcp/MessagesTcpClient.java   |   8 +-
 .../iggy/client/async/tcp/StreamsTcpClient.java    |  19 +-
 .../iggy/client/async/tcp/TopicsTcpClient.java     |  19 +-
 .../iggy/client/async/tcp/UsersTcpClient.java      |   7 +-
 .../blocking/tcp/ConsumerGroupsTcpClient.java      |   8 +-
 .../blocking/tcp/ConsumerOffsetTcpClient.java      |   6 +-
 .../client/blocking/tcp/InternalTcpClient.java     |   1 +
 .../client/blocking/tcp/MessagesTcpClient.java     |   4 +-
 .../client/blocking/tcp/PartitionsTcpClient.java   |   3 +-
 .../tcp/PersonalAccessTokensTcpClient.java         |  12 +-
 .../iggy/client/blocking/tcp/StreamsTcpClient.java |  10 +-
 .../iggy/client/blocking/tcp/SystemTcpClient.java  |   2 +
 .../iggy/client/blocking/tcp/TopicsTcpClient.java  |  12 +-
 .../iggy/client/blocking/tcp/UsersTcpClient.java   |  19 +-
 .../apache/iggy/message/BigIntegerMessageId.java   |   2 +-
 .../blocking/tcp => serde}/BytesDeserializer.java  |  48 +++--
 .../blocking/tcp => serde}/BytesSerializer.java    |  49 +++--
 .../blocking/tcp => serde}/CommandCode.java        |   6 +-
 .../iggy/client/async/AsyncPollMessageTest.java    |  25 ---
 .../client/blocking/tcp/BytesSerializerTest.java   |  24 +++
 .../apache/iggy/serde/BytesDeserializerTest.java   |  81 +++++++
 24 files changed, 247 insertions(+), 580 deletions(-)

diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesDeserializer.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesDeserializer.java
deleted file mode 100644
index ae3037f18..000000000
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesDeserializer.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iggy.client.async.tcp;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.iggy.message.BytesMessageId;
-import org.apache.iggy.message.Message;
-import org.apache.iggy.message.MessageHeader;
-import org.apache.iggy.message.PolledMessages;
-import org.apache.iggy.partition.Partition;
-import org.apache.iggy.stream.StreamBase;
-import org.apache.iggy.stream.StreamDetails;
-import org.apache.iggy.topic.CompressionAlgorithm;
-import org.apache.iggy.topic.Topic;
-import org.apache.iggy.topic.TopicDetails;
-
-import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-/**
- * Async version of BytesDeserializer for the async package.
- * Provides the same wire protocol deserialization as the blocking version.
- */
-public final class AsyncBytesDeserializer {
-
-    private AsyncBytesDeserializer() {}
-
-    /**
-     * Reads PolledMessages from the response buffer.
-     * @param response The ByteBuf containing the response data
-     * @return The deserialized PolledMessages
-     */
-    public static PolledMessages readPolledMessages(ByteBuf response) {
-        var partitionId = response.readUnsignedIntLE();
-        var currentOffset = readU64AsBigInteger(response);
-        var messagesCount = response.readUnsignedIntLE();
-        var messages = new ArrayList<Message>();
-        while (response.isReadable()) {
-            messages.add(readPolledMessage(response));
-        }
-        return new PolledMessages(partitionId, currentOffset, messagesCount, 
messages);
-    }
-
-    /**
-     * Reads a single Message from the buffer.
-     */
-    private static Message readPolledMessage(ByteBuf response) {
-        var checksum = readU64AsBigInteger(response);
-        var id = readBytesMessageId(response);
-        var offset = readU64AsBigInteger(response);
-        var timestamp = readU64AsBigInteger(response);
-        var originTimestamp = readU64AsBigInteger(response);
-        var userHeadersLength = response.readUnsignedIntLE();
-        var payloadLength = response.readUnsignedIntLE();
-        var header =
-                new MessageHeader(checksum, id, offset, timestamp, 
originTimestamp, userHeadersLength, payloadLength);
-        var payload = new byte[toInt(payloadLength)];
-        response.readBytes(payload);
-        // TODO: Add support for user headers.
-        return new Message(header, payload, Optional.empty());
-    }
-
-    /**
-     * Reads an unsigned 64-bit integer as BigInteger.
-     */
-    private static BigInteger readU64AsBigInteger(ByteBuf buffer) {
-        var bytesArray = new byte[8];
-        buffer.readBytes(bytesArray, 0, 8);
-        ArrayUtils.reverse(bytesArray);
-        // Ensure it's treated as unsigned
-        byte[] unsigned = new byte[9];
-        System.arraycopy(bytesArray, 0, unsigned, 1, 8);
-        return new BigInteger(unsigned);
-    }
-
-    /**
-     * Reads a 16-byte message ID.
-     */
-    private static BytesMessageId readBytesMessageId(ByteBuf buffer) {
-        var bytesArray = new byte[16];
-        buffer.readBytes(bytesArray);
-        ArrayUtils.reverse(bytesArray);
-        return new BytesMessageId(bytesArray);
-    }
-
-    /**
-     * Converts a long to int safely.
-     */
-    private static int toInt(long value) {
-        if (value > Integer.MAX_VALUE) {
-            throw new IllegalArgumentException("Value too large for int: " + 
value);
-        }
-        return (int) value;
-    }
-
-    /**
-     * Reads StreamBase from the buffer.
-     * @param response The ByteBuf containing the response data
-     * @return The deserialized StreamBase
-     */
-    public static StreamBase readStreamBase(ByteBuf response) {
-        var streamId = response.readUnsignedIntLE();
-        var createdAt = readU64AsBigInteger(response);
-        var topicsCount = response.readUnsignedIntLE();
-        var size = readU64AsBigInteger(response);
-        var messagesCount = readU64AsBigInteger(response);
-        var nameLength = response.readByte();
-        var name = response.readCharSequence(nameLength, 
StandardCharsets.UTF_8).toString();
-
-        return new StreamBase(streamId, createdAt, name, size.toString(), 
messagesCount, topicsCount);
-    }
-
-    /**
-     * Reads StreamDetails from the buffer.
-     * @param response The ByteBuf containing the response data
-     * @return The deserialized StreamDetails
-     */
-    public static StreamDetails readStreamDetails(ByteBuf response) {
-        var streamBase = readStreamBase(response);
-
-        List<Topic> topics = new ArrayList<>();
-        while (response.isReadable()) {
-            topics.add(readTopic(response));
-        }
-
-        return new StreamDetails(streamBase, topics);
-    }
-
-    /**
-     * Reads Topic from the buffer.
-     * @param response The ByteBuf containing the response data
-     * @return The deserialized Topic
-     */
-    public static Topic readTopic(ByteBuf response) {
-        var topicId = response.readUnsignedIntLE();
-        var createdAt = readU64AsBigInteger(response);
-        var partitionsCount = response.readUnsignedIntLE();
-        var messageExpiry = readU64AsBigInteger(response);
-        var compressionAlgorithmCode = response.readByte();
-        var maxTopicSize = readU64AsBigInteger(response);
-        var replicationFactor = response.readByte();
-        var size = readU64AsBigInteger(response);
-        var messagesCount = readU64AsBigInteger(response);
-        var nameLength = response.readByte();
-        var name = response.readCharSequence(nameLength, 
StandardCharsets.UTF_8).toString();
-
-        return new Topic(
-                topicId,
-                createdAt,
-                name,
-                size.toString(),
-                messageExpiry,
-                CompressionAlgorithm.fromCode(compressionAlgorithmCode),
-                maxTopicSize,
-                (short) replicationFactor,
-                messagesCount,
-                partitionsCount);
-    }
-
-    /**
-     * Reads TopicDetails from the buffer.
-     * @param response The ByteBuf containing the response data
-     * @return The deserialized TopicDetails
-     */
-    public static TopicDetails readTopicDetails(ByteBuf response) {
-        var topic = readTopic(response);
-
-        List<Partition> partitions = new ArrayList<>();
-        while (response.isReadable()) {
-            partitions.add(readPartition(response));
-        }
-
-        return new TopicDetails(topic, partitions);
-    }
-
-    /**
-     * Reads Partition from the buffer.
-     */
-    private static Partition readPartition(ByteBuf response) {
-        var partitionId = response.readUnsignedIntLE();
-        var createdAt = readU64AsBigInteger(response);
-        var segmentsCount = response.readUnsignedIntLE();
-        var currentOffset = readU64AsBigInteger(response);
-        var size = readU64AsBigInteger(response);
-        var messagesCount = readU64AsBigInteger(response);
-
-        return new Partition(partitionId, createdAt, segmentsCount, 
currentOffset, size.toString(), messagesCount);
-    }
-}
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesSerializer.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesSerializer.java
deleted file mode 100644
index 7b1c87bf2..000000000
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesSerializer.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iggy.client.async.tcp;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.iggy.consumergroup.Consumer;
-import org.apache.iggy.identifier.Identifier;
-import org.apache.iggy.message.Message;
-import org.apache.iggy.message.MessageHeader;
-import org.apache.iggy.message.Partitioning;
-import org.apache.iggy.message.PollingStrategy;
-
-import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
-import java.util.Optional;
-
-/**
- * Async version of BytesSerializer for the async package.
- * Provides the same wire protocol serialization as the blocking version.
- */
-public final class AsyncBytesSerializer {
-
-    private AsyncBytesSerializer() {}
-
-    /**
-     * Serializes a Consumer to bytes.
-     *
-     * @param consumer The consumer to serialize (can be null)
-     * @return ByteBuf containing the serialized consumer
-     */
-    public static ByteBuf toBytes(Consumer consumer) {
-        ByteBuf buffer = Unpooled.buffer();
-        if (consumer == null) {
-            // No consumer - use code 0 (1 byte) and empty identifier (4 bytes)
-            buffer.writeByte(0);
-            buffer.writeIntLE(0);
-        } else {
-            buffer.writeByte(consumer.kind().asCode());
-            buffer.writeBytes(toBytes(consumer.id()));
-        }
-        return buffer;
-    }
-
-    /**
-     * Serializes an Identifier to bytes.
-     *
-     * @param identifier The identifier to serialize
-     * @return ByteBuf containing the serialized identifier
-     */
-    public static ByteBuf toBytes(Identifier identifier) {
-        if (identifier.getKind() == 1) {
-            ByteBuf buffer = Unpooled.buffer(6);
-            buffer.writeByte(1);
-            buffer.writeByte(4);
-            buffer.writeIntLE(identifier.getId().intValue());
-            return buffer;
-        } else if (identifier.getKind() == 2) {
-            ByteBuf buffer = Unpooled.buffer(2 + 
identifier.getName().length());
-            buffer.writeByte(2);
-            buffer.writeByte(identifier.getName().length());
-            buffer.writeBytes(identifier.getName().getBytes());
-            return buffer;
-        } else {
-            throw new IllegalArgumentException("Unknown identifier kind: " + 
identifier.getKind());
-        }
-    }
-
-    /**
-     * Serializes a String to bytes with length prefix.
-     *
-     * @param str The string to serialize
-     * @return ByteBuf containing the serialized string
-     */
-    public static ByteBuf toBytes(String str) {
-        ByteBuf buffer = Unpooled.buffer(1 + str.length());
-        buffer.writeByte(str.length());
-        buffer.writeBytes(str.getBytes());
-        return buffer;
-    }
-
-    /**
-     * Serializes a Partitioning to bytes.
-     *
-     * @param partitioning The partitioning to serialize
-     * @return ByteBuf containing the serialized partitioning
-     */
-    public static ByteBuf toBytes(Partitioning partitioning) {
-        ByteBuf buffer = Unpooled.buffer(2 + partitioning.value().length);
-        buffer.writeByte(partitioning.kind().asCode());
-        buffer.writeByte(partitioning.value().length);
-        buffer.writeBytes(partitioning.value());
-        return buffer;
-    }
-
-    /**
-     * Serializes a Message to bytes.
-     *
-     * @param message The message to serialize
-     * @return ByteBuf containing the serialized message
-     */
-    public static ByteBuf toBytes(Message message) {
-        var buffer = Unpooled.buffer(MessageHeader.SIZE + 
message.payload().length);
-        buffer.writeBytes(toBytes(message.header()));
-        buffer.writeBytes(message.payload());
-        return buffer;
-    }
-
-    /**
-     * Serializes a MessageHeader to bytes.
-     *
-     * @param header The message header to serialize
-     * @return ByteBuf containing the serialized header
-     */
-    public static ByteBuf toBytes(MessageHeader header) {
-        var buffer = Unpooled.buffer(MessageHeader.SIZE);
-        buffer.writeBytes(toBytesAsU64(header.checksum()));
-        // Convert MessageId to BigInteger and serialize as U128
-        buffer.writeBytes(toBytesAsU128(header.id().toBigInteger()));
-        buffer.writeBytes(toBytesAsU64(header.offset()));
-        buffer.writeBytes(toBytesAsU64(header.timestamp()));
-        buffer.writeBytes(toBytesAsU64(header.originTimestamp()));
-        buffer.writeIntLE(header.userHeadersLength().intValue());
-        buffer.writeIntLE(header.payloadLength().intValue());
-        return buffer;
-    }
-
-    /**
-     * Serializes a PollingStrategy to bytes.
-     *
-     * @param strategy The polling strategy to serialize
-     * @return ByteBuf containing the serialized strategy
-     */
-    public static ByteBuf toBytes(PollingStrategy strategy) {
-        var buffer = Unpooled.buffer(9);
-        buffer.writeByte(strategy.kind().asCode());
-        buffer.writeBytes(toBytesAsU64(strategy.value()));
-        return buffer;
-    }
-
-    static ByteBuf toBytes(Optional<Long> optionalLong) {
-        var buffer = Unpooled.buffer(5);
-        if (optionalLong.isPresent()) {
-            buffer.writeByte(1);
-            buffer.writeIntLE(optionalLong.get().intValue());
-        } else {
-            buffer.writeByte(0);
-            buffer.writeIntLE(0);
-        }
-        return buffer;
-    }
-
-    /**
-     * Converts a BigInteger to bytes as unsigned 64-bit integer.
-     *
-     * @param value The BigInteger value to convert
-     * @return ByteBuf containing the value as 8 bytes in little-endian format
-     */
-    public static ByteBuf toBytesAsU64(BigInteger value) {
-        if (value.signum() == -1) {
-            throw new IllegalArgumentException("Negative value cannot be 
serialized to unsigned 64: " + value);
-        }
-        ByteBuf buffer = Unpooled.buffer(8);
-        byte[] valueAsBytes = value.toByteArray();
-        if (valueAsBytes.length > 9) {
-            throw new IllegalArgumentException("Value too large for U64: " + 
value);
-        }
-        // Handle sign byte if present
-        if (valueAsBytes.length == 9 && valueAsBytes[0] == 0) {
-            valueAsBytes = ArrayUtils.subarray(valueAsBytes, 1, 9);
-        }
-        ArrayUtils.reverse(valueAsBytes);
-        buffer.writeBytes(valueAsBytes, 0, Math.min(8, valueAsBytes.length));
-        if (valueAsBytes.length < 8) {
-            buffer.writeZero(8 - valueAsBytes.length);
-        }
-        return buffer;
-    }
-
-    /**
-     * Converts a name string to bytes with length prefix.
-     *
-     * @param name The name string to convert
-     * @return ByteBuf containing the serialized name
-     */
-    public static ByteBuf nameToBytes(String name) {
-        var buffer = Unpooled.buffer(1 + name.length());
-        buffer.writeByte(name.length());
-        buffer.writeBytes(name.getBytes(StandardCharsets.UTF_8));
-        return buffer;
-    }
-
-    /**
-     * Converts a BigInteger to bytes as unsigned 128-bit integer.
-     *
-     * @param value The BigInteger value to convert
-     * @return ByteBuf containing the value as 16 bytes in little-endian format
-     */
-    public static ByteBuf toBytesAsU128(BigInteger value) {
-        if (value.signum() == -1) {
-            throw new IllegalArgumentException("Negative value cannot be 
serialized to unsigned 128: " + value);
-        }
-        ByteBuf buffer = Unpooled.buffer(16, 16);
-        byte[] valueAsBytes = value.toByteArray();
-        if (valueAsBytes.length > 17) {
-            throw new IllegalArgumentException("Value too large for U128: " + 
value);
-        }
-        // Remove leading zero byte if present (from positive sign bit)
-        if (valueAsBytes.length == 17 && valueAsBytes[0] == 0) {
-            valueAsBytes = ArrayUtils.subarray(valueAsBytes, 1, 17);
-        }
-        ArrayUtils.reverse(valueAsBytes);
-        buffer.writeBytes(valueAsBytes, 0, Math.min(16, valueAsBytes.length));
-        if (valueAsBytes.length < 16) {
-            buffer.writeZero(16 - valueAsBytes.length);
-        }
-        return buffer;
-    }
-}
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerGroupsTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerGroupsTcpClient.java
index 06e778871..e671539f8 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerGroupsTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerGroupsTcpClient.java
@@ -21,10 +21,11 @@ package org.apache.iggy.client.async.tcp;
 
 import io.netty.buffer.Unpooled;
 import org.apache.iggy.client.async.ConsumerGroupsClient;
-import org.apache.iggy.client.blocking.tcp.CommandCode;
 import org.apache.iggy.identifier.ConsumerId;
 import org.apache.iggy.identifier.StreamId;
 import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,13 +48,13 @@ public class ConsumerGroupsTcpClient implements 
ConsumerGroupsClient {
         var payload = Unpooled.buffer();
 
         // Serialize stream ID
-        payload.writeBytes(AsyncBytesSerializer.toBytes(streamId));
+        payload.writeBytes(BytesSerializer.toBytes(streamId));
 
         // Serialize topic ID
-        payload.writeBytes(AsyncBytesSerializer.toBytes(topicId));
+        payload.writeBytes(BytesSerializer.toBytes(topicId));
 
         // Serialize consumer group ID
-        payload.writeBytes(AsyncBytesSerializer.toBytes(groupId));
+        payload.writeBytes(BytesSerializer.toBytes(groupId));
 
         log.debug("Joining consumer group - Stream: {}, Topic: {}, Group: {}", 
streamId, topicId, groupId);
 
@@ -70,13 +71,13 @@ public class ConsumerGroupsTcpClient implements 
ConsumerGroupsClient {
         var payload = Unpooled.buffer();
 
         // Serialize stream ID
-        payload.writeBytes(AsyncBytesSerializer.toBytes(streamId));
+        payload.writeBytes(BytesSerializer.toBytes(streamId));
 
         // Serialize topic ID
-        payload.writeBytes(AsyncBytesSerializer.toBytes(topicId));
+        payload.writeBytes(BytesSerializer.toBytes(topicId));
 
         // Serialize consumer group ID
-        payload.writeBytes(AsyncBytesSerializer.toBytes(groupId));
+        payload.writeBytes(BytesSerializer.toBytes(groupId));
 
         log.debug("Leaving consumer group - Stream: {}, Topic: {}, Group: {}", 
streamId, topicId, groupId);
 
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/MessagesTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/MessagesTcpClient.java
index 9e2abec50..74fdc49c2 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/MessagesTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/MessagesTcpClient.java
@@ -21,7 +21,6 @@ package org.apache.iggy.client.async.tcp;
 
 import io.netty.buffer.Unpooled;
 import org.apache.iggy.client.async.MessagesClient;
-import org.apache.iggy.client.blocking.tcp.CommandCode;
 import org.apache.iggy.consumergroup.Consumer;
 import org.apache.iggy.identifier.StreamId;
 import org.apache.iggy.identifier.TopicId;
@@ -29,12 +28,14 @@ import org.apache.iggy.message.Message;
 import org.apache.iggy.message.Partitioning;
 import org.apache.iggy.message.PolledMessages;
 import org.apache.iggy.message.PollingStrategy;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.CommandCode;
 
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.iggy.client.async.tcp.AsyncBytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
 
 /**
  * Async TCP implementation of MessagesClient using Netty for non-blocking I/O.
@@ -57,7 +58,6 @@ public class MessagesTcpClient implements MessagesClient {
             Long count,
             boolean autoCommit) {
 
-        // Build the request payload
         var payload = Unpooled.buffer();
 
         var consumerBytes = toBytes(consumer);
@@ -83,7 +83,7 @@ public class MessagesTcpClient implements MessagesClient {
                 .sendAsync(CommandCode.Messages.POLL.getValue(), payload)
                 .thenApply(response -> {
                     try {
-                        return 
AsyncBytesDeserializer.readPolledMessages(response);
+                        return BytesDeserializer.readPolledMessages(response);
                     } finally {
                         response.release();
                     }
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/StreamsTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/StreamsTcpClient.java
index fdd8ba01b..c9e10ee26 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/StreamsTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/StreamsTcpClient.java
@@ -20,9 +20,11 @@
 package org.apache.iggy.client.async.tcp;
 
 import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCounted;
 import org.apache.iggy.client.async.StreamsClient;
-import org.apache.iggy.client.blocking.tcp.CommandCode;
 import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
 import org.apache.iggy.stream.StreamBase;
 import org.apache.iggy.stream.StreamDetails;
 
@@ -31,10 +33,9 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
-import static 
org.apache.iggy.client.async.tcp.AsyncBytesDeserializer.readStreamBase;
-import static 
org.apache.iggy.client.async.tcp.AsyncBytesDeserializer.readStreamDetails;
-import static 
org.apache.iggy.client.async.tcp.AsyncBytesSerializer.nameToBytes;
-import static org.apache.iggy.client.async.tcp.AsyncBytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesDeserializer.readStreamBase;
+import static org.apache.iggy.serde.BytesDeserializer.readStreamDetails;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
 
 /**
  * Async TCP implementation of StreamsClient using Netty for non-blocking I/O.
@@ -52,7 +53,7 @@ public class StreamsTcpClient implements StreamsClient {
         var payloadSize = 1 + name.length();
         var payload = Unpooled.buffer(payloadSize);
 
-        payload.writeBytes(nameToBytes(name));
+        payload.writeBytes(BytesSerializer.toBytes(name));
 
         return connection
                 .sendAsync(CommandCode.Stream.CREATE.getValue(), payload)
@@ -100,11 +101,11 @@ public class StreamsTcpClient implements StreamsClient {
         var payload = Unpooled.buffer(payloadSize + idBytes.capacity());
 
         payload.writeBytes(idBytes);
-        payload.writeBytes(nameToBytes(name));
+        payload.writeBytes(BytesSerializer.toBytes(name));
 
         return connection
                 .sendAsync(CommandCode.Stream.UPDATE.getValue(), payload)
-                .thenAccept(response -> response.release());
+                .thenAccept(ReferenceCounted::release);
     }
 
     @Override
@@ -113,6 +114,6 @@ public class StreamsTcpClient implements StreamsClient {
 
         return connection
                 .sendAsync(CommandCode.Stream.DELETE.getValue(), payload)
-                .thenAccept(response -> response.release());
+                .thenAccept(ReferenceCounted::release);
     }
 }
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/TopicsTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/TopicsTcpClient.java
index 5128039d2..80e73b0e4 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/TopicsTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/TopicsTcpClient.java
@@ -21,9 +21,11 @@ package org.apache.iggy.client.async.tcp;
 
 import io.netty.buffer.Unpooled;
 import org.apache.iggy.client.async.TopicsClient;
-import org.apache.iggy.client.blocking.tcp.CommandCode;
 import org.apache.iggy.identifier.StreamId;
 import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
 import org.apache.iggy.topic.CompressionAlgorithm;
 import org.apache.iggy.topic.Topic;
 import org.apache.iggy.topic.TopicDetails;
@@ -34,9 +36,8 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
-import static 
org.apache.iggy.client.async.tcp.AsyncBytesSerializer.nameToBytes;
-import static org.apache.iggy.client.async.tcp.AsyncBytesSerializer.toBytes;
-import static 
org.apache.iggy.client.async.tcp.AsyncBytesSerializer.toBytesAsU64;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytesAsU64;
 
 /**
  * Async TCP implementation of TopicsClient using Netty for non-blocking I/O.
@@ -58,7 +59,7 @@ public class TopicsTcpClient implements TopicsClient {
         return connection.sendAsync(CommandCode.Topic.GET.getValue(), 
payload).thenApply(response -> {
             try {
                 if (response.isReadable()) {
-                    return 
Optional.of(AsyncBytesDeserializer.readTopicDetails(response));
+                    return 
Optional.of(BytesDeserializer.readTopicDetails(response));
                 }
                 return Optional.<TopicDetails>empty();
             } finally {
@@ -77,7 +78,7 @@ public class TopicsTcpClient implements TopicsClient {
                     try {
                         List<Topic> topics = new ArrayList<>();
                         while (response.isReadable()) {
-                            
topics.add(AsyncBytesDeserializer.readTopic(response));
+                            topics.add(BytesDeserializer.readTopic(response));
                         }
                         return topics;
                     } finally {
@@ -105,13 +106,13 @@ public class TopicsTcpClient implements TopicsClient {
         payload.writeBytes(toBytesAsU64(messageExpiry));
         payload.writeBytes(toBytesAsU64(maxTopicSize));
         payload.writeByte(replicationFactor.orElse((short) 0));
-        payload.writeBytes(nameToBytes(name));
+        payload.writeBytes(BytesSerializer.toBytes(name));
 
         return connection
                 .sendAsync(CommandCode.Topic.CREATE.getValue(), payload)
                 .thenApply(response -> {
                     try {
-                        return 
AsyncBytesDeserializer.readTopicDetails(response);
+                        return BytesDeserializer.readTopicDetails(response);
                     } finally {
                         response.release();
                     }
@@ -135,7 +136,7 @@ public class TopicsTcpClient implements TopicsClient {
         payload.writeBytes(toBytesAsU64(messageExpiry));
         payload.writeBytes(toBytesAsU64(maxTopicSize));
         payload.writeByte(replicationFactor.orElse((short) 0));
-        payload.writeBytes(nameToBytes(name));
+        payload.writeBytes(BytesSerializer.toBytes(name));
 
         return connection
                 .sendAsync(CommandCode.Topic.UPDATE.getValue(), payload)
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/UsersTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/UsersTcpClient.java
index 00d8f31db..8f7ded490 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/UsersTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/UsersTcpClient.java
@@ -21,7 +21,8 @@ package org.apache.iggy.client.async.tcp;
 
 import io.netty.buffer.Unpooled;
 import org.apache.iggy.client.async.UsersClient;
-import org.apache.iggy.client.blocking.tcp.CommandCode;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
 import org.apache.iggy.user.IdentityInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,8 +48,8 @@ public class UsersTcpClient implements UsersClient {
         String context = "java-sdk";
 
         var payload = Unpooled.buffer();
-        var usernameBytes = AsyncBytesSerializer.toBytes(username);
-        var passwordBytes = AsyncBytesSerializer.toBytes(password);
+        var usernameBytes = BytesSerializer.toBytes(username);
+        var passwordBytes = BytesSerializer.toBytes(password);
 
         payload.writeBytes(usernameBytes);
         payload.writeBytes(passwordBytes);
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java
index bf2fb9366..7868d1dfb 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java
@@ -26,12 +26,14 @@ import org.apache.iggy.consumergroup.ConsumerGroupDetails;
 import org.apache.iggy.identifier.ConsumerId;
 import org.apache.iggy.identifier.StreamId;
 import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
 
 import java.util.List;
 import java.util.Optional;
 
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.nameToBytes;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
 
 class ConsumerGroupsTcpClient implements ConsumerGroupsClient {
 
@@ -66,7 +68,7 @@ class ConsumerGroupsTcpClient implements ConsumerGroupsClient 
{
 
         payload.writeBytes(streamIdBytes);
         payload.writeBytes(topicIdBytes);
-        payload.writeBytes(nameToBytes(name));
+        payload.writeBytes(BytesSerializer.toBytes(name));
 
         return tcpClient.exchangeForEntity(
                 CommandCode.ConsumerGroup.CREATE, payload, 
BytesDeserializer::readConsumerGroupDetails);
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java
index 2fe86ba6b..08ddd5f3b 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java
@@ -24,12 +24,14 @@ import org.apache.iggy.consumergroup.Consumer;
 import org.apache.iggy.consumeroffset.ConsumerOffsetInfo;
 import org.apache.iggy.identifier.StreamId;
 import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.CommandCode;
 
 import java.math.BigInteger;
 import java.util.Optional;
 
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytesAsU64;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytesAsU64;
 
 class ConsumerOffsetTcpClient implements ConsumerOffsetsClient {
 
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
index ab4bf684e..234a29455 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
@@ -25,6 +25,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
+import org.apache.iggy.serde.CommandCode;
 import reactor.core.publisher.Mono;
 import reactor.netty.Connection;
 import reactor.netty.tcp.TcpClient;
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java
index 172d0c9e6..84e251d26 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java
@@ -28,11 +28,13 @@ import org.apache.iggy.message.Message;
 import org.apache.iggy.message.Partitioning;
 import org.apache.iggy.message.PolledMessages;
 import org.apache.iggy.message.PollingStrategy;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.CommandCode;
 
 import java.util.List;
 import java.util.Optional;
 
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
 
 class MessagesTcpClient implements MessagesClient {
 
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java
index 1fa634ad6..7678d7c6b 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java
@@ -22,8 +22,9 @@ package org.apache.iggy.client.blocking.tcp;
 import org.apache.iggy.client.blocking.PartitionsClient;
 import org.apache.iggy.identifier.StreamId;
 import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.serde.CommandCode;
 
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
 
 class PartitionsTcpClient implements PartitionsClient {
 
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java
index f0d0b22f3..b3535b888 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java
@@ -23,14 +23,16 @@ import io.netty.buffer.Unpooled;
 import org.apache.iggy.client.blocking.PersonalAccessTokensClient;
 import org.apache.iggy.personalaccesstoken.PersonalAccessTokenInfo;
 import org.apache.iggy.personalaccesstoken.RawPersonalAccessToken;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.CommandCode;
 import org.apache.iggy.user.IdentityInfo;
 
 import java.math.BigInteger;
 import java.util.List;
 import java.util.Optional;
 
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.nameToBytes;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytesAsU64;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytesAsU64;
 
 class PersonalAccessTokensTcpClient implements PersonalAccessTokensClient {
 
@@ -43,7 +45,7 @@ class PersonalAccessTokensTcpClient implements 
PersonalAccessTokensClient {
     @Override
     public RawPersonalAccessToken createPersonalAccessToken(String name, 
BigInteger expiry) {
         var payload = Unpooled.buffer();
-        payload.writeBytes(nameToBytes(name));
+        payload.writeBytes(toBytes(name));
         payload.writeBytes(toBytesAsU64(expiry));
         return tcpClient.exchangeForEntity(
                 CommandCode.PersonalAccessToken.CREATE, payload, 
BytesDeserializer::readRawPersonalAccessToken);
@@ -57,13 +59,13 @@ class PersonalAccessTokensTcpClient implements 
PersonalAccessTokensClient {
 
     @Override
     public void deletePersonalAccessToken(String name) {
-        var payload = nameToBytes(name);
+        var payload = toBytes(name);
         tcpClient.send(CommandCode.PersonalAccessToken.DELETE, payload);
     }
 
     @Override
     public IdentityInfo loginWithPersonalAccessToken(String token) {
-        var payload = nameToBytes(token);
+        var payload = toBytes(token);
         return 
tcpClient.exchangeForEntity(CommandCode.PersonalAccessToken.LOGIN, payload, buf 
-> {
             var userId = buf.readUnsignedIntLE();
             return new IdentityInfo(userId, Optional.empty());
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java
index e7152ddcb..7605f164e 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java
@@ -22,14 +22,16 @@ package org.apache.iggy.client.blocking.tcp;
 import io.netty.buffer.Unpooled;
 import org.apache.iggy.client.blocking.StreamsClient;
 import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
 import org.apache.iggy.stream.StreamBase;
 import org.apache.iggy.stream.StreamDetails;
 
 import java.util.List;
 import java.util.Optional;
 
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.nameToBytes;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
 
 class StreamsTcpClient implements StreamsClient {
 
@@ -44,7 +46,7 @@ class StreamsTcpClient implements StreamsClient {
         var payloadSize = 1 + name.length();
         var payload = Unpooled.buffer(payloadSize);
 
-        payload.writeBytes(nameToBytes(name));
+        payload.writeBytes(BytesSerializer.toBytes(name));
         return tcpClient.exchangeForEntity(CommandCode.Stream.CREATE, payload, 
BytesDeserializer::readStreamDetails);
     }
 
@@ -66,7 +68,7 @@ class StreamsTcpClient implements StreamsClient {
         var payload = Unpooled.buffer(payloadSize + idBytes.capacity());
 
         payload.writeBytes(idBytes);
-        payload.writeBytes(nameToBytes(name));
+        payload.writeBytes(BytesSerializer.toBytes(name));
         tcpClient.send(CommandCode.Stream.UPDATE, payload);
     }
 
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java
index 447f48b3f..70277b742 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java
@@ -21,6 +21,8 @@ package org.apache.iggy.client.blocking.tcp;
 
 import io.netty.buffer.Unpooled;
 import org.apache.iggy.client.blocking.SystemClient;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.CommandCode;
 import org.apache.iggy.system.ClientInfo;
 import org.apache.iggy.system.ClientInfoDetails;
 import org.apache.iggy.system.Stats;
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java
index 1ccbd512a..01392dc55 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java
@@ -23,6 +23,9 @@ import io.netty.buffer.Unpooled;
 import org.apache.iggy.client.blocking.TopicsClient;
 import org.apache.iggy.identifier.StreamId;
 import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
 import org.apache.iggy.topic.CompressionAlgorithm;
 import org.apache.iggy.topic.Topic;
 import org.apache.iggy.topic.TopicDetails;
@@ -31,9 +34,8 @@ import java.math.BigInteger;
 import java.util.List;
 import java.util.Optional;
 
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.nameToBytes;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytesAsU64;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytesAsU64;
 
 class TopicsTcpClient implements TopicsClient {
 
@@ -74,7 +76,7 @@ class TopicsTcpClient implements TopicsClient {
         payload.writeBytes(toBytesAsU64(messageExpiry));
         payload.writeBytes(toBytesAsU64(maxTopicSize));
         payload.writeByte(replicationFactor.orElse((short) 0));
-        payload.writeBytes(nameToBytes(name));
+        payload.writeBytes(BytesSerializer.toBytes(name));
 
         return tcpClient.exchangeForEntity(CommandCode.Topic.CREATE, payload, 
BytesDeserializer::readTopicDetails);
     }
@@ -95,7 +97,7 @@ class TopicsTcpClient implements TopicsClient {
         payload.writeBytes(toBytesAsU64(messageExpiry));
         payload.writeBytes(toBytesAsU64(maxTopicSize));
         payload.writeByte(replicationFactor.orElse((short) 0));
-        payload.writeBytes(nameToBytes(name));
+        payload.writeBytes(BytesSerializer.toBytes(name));
 
         tcpClient.send(CommandCode.Topic.UPDATE, payload);
     }
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java
index 006de6586..6fb201526 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java
@@ -23,6 +23,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import org.apache.iggy.client.blocking.UsersClient;
 import org.apache.iggy.identifier.UserId;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.CommandCode;
 import org.apache.iggy.user.IdentityInfo;
 import org.apache.iggy.user.Permissions;
 import org.apache.iggy.user.UserInfo;
@@ -32,8 +34,7 @@ import org.apache.iggy.user.UserStatus;
 import java.util.List;
 import java.util.Optional;
 
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.nameToBytes;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
 
 class UsersTcpClient implements UsersClient {
 
@@ -58,8 +59,8 @@ class UsersTcpClient implements UsersClient {
     public UserInfoDetails createUser(
             String username, String password, UserStatus status, 
Optional<Permissions> permissions) {
         var payload = Unpooled.buffer();
-        payload.writeBytes(nameToBytes(username));
-        payload.writeBytes(nameToBytes(password));
+        payload.writeBytes(toBytes(username));
+        payload.writeBytes(toBytes(password));
         payload.writeByte(status.asCode());
         permissions.ifPresentOrElse(
                 perms -> {
@@ -85,7 +86,7 @@ class UsersTcpClient implements UsersClient {
         usernameOptional.ifPresentOrElse(
                 (username) -> {
                     payload.writeByte(1);
-                    payload.writeBytes(nameToBytes(username));
+                    payload.writeBytes(toBytes(username));
                 },
                 () -> payload.writeByte(0));
         statusOptional.ifPresentOrElse(
@@ -117,8 +118,8 @@ class UsersTcpClient implements UsersClient {
     @Override
     public void changePassword(UserId userId, String currentPassword, String 
newPassword) {
         var payload = toBytes(userId);
-        payload.writeBytes(nameToBytes(currentPassword));
-        payload.writeBytes(nameToBytes(newPassword));
+        payload.writeBytes(toBytes(currentPassword));
+        payload.writeBytes(toBytes(newPassword));
 
         tcpClient.send(CommandCode.User.CHANGE_PASSWORD, payload);
     }
@@ -130,8 +131,8 @@ class UsersTcpClient implements UsersClient {
         var payloadSize = 2 + username.length() + password.length() + 4 + 
version.length() + 4 + context.length();
         var payload = Unpooled.buffer(payloadSize);
 
-        payload.writeBytes(nameToBytes(username));
-        payload.writeBytes(nameToBytes(password));
+        payload.writeBytes(toBytes(username));
+        payload.writeBytes(toBytes(password));
         payload.writeIntLE(version.length());
         payload.writeBytes(version.getBytes());
         payload.writeIntLE(context.length());
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/BigIntegerMessageId.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/BigIntegerMessageId.java
index 8e1b16691..d8b04e27d 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/BigIntegerMessageId.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/BigIntegerMessageId.java
@@ -20,7 +20,7 @@
 package org.apache.iggy.message;
 
 import io.netty.buffer.ByteBuf;
-import org.apache.iggy.client.blocking.tcp.BytesSerializer;
+import org.apache.iggy.serde.BytesSerializer;
 
 import java.math.BigInteger;
 
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesDeserializer.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java
similarity index 90%
rename from 
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesDeserializer.java
rename to 
foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java
index b89f6fce9..58e384b1c 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesDeserializer.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iggy.client.blocking.tcp;
+package org.apache.iggy.serde;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.commons.lang3.ArrayUtils;
@@ -57,11 +57,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-final class BytesDeserializer {
+/**
+ * Unified deserializer for both blocking and async clients.
+ * Provides deserialization of ByteBuf to domain objects according to Iggy 
wire protocol.
+ */
+public final class BytesDeserializer {
 
     private BytesDeserializer() {}
 
-    static StreamBase readStreamBase(ByteBuf response) {
+    public static StreamBase readStreamBase(ByteBuf response) {
         var streamId = response.readUnsignedIntLE();
         var createdAt = readU64AsBigInteger(response);
         var topicsCount = response.readUnsignedIntLE();
@@ -73,11 +77,11 @@ final class BytesDeserializer {
         return new StreamBase(streamId, createdAt, name, size.toString(), 
messagesCount, topicsCount);
     }
 
-    static StreamDetails readStreamDetails(ByteBuf response) {
+    public static StreamDetails readStreamDetails(ByteBuf response) {
         var streamBase = readStreamBase(response);
 
         List<Topic> topics = new ArrayList<>();
-        if (response.isReadable()) {
+        while (response.isReadable()) {
             topics.add(readTopic(response));
         }
 
@@ -95,7 +99,7 @@ final class BytesDeserializer {
         return new TopicDetails(topic, partitions);
     }
 
-    static Partition readPartition(ByteBuf response) {
+    public static Partition readPartition(ByteBuf response) {
         var partitionId = response.readUnsignedIntLE();
         var createdAt = readU64AsBigInteger(response);
         var segmentsCount = response.readUnsignedIntLE();
@@ -141,7 +145,7 @@ final class BytesDeserializer {
         return new ConsumerGroupDetails(consumerGroup, members);
     }
 
-    static ConsumerGroupMember readConsumerGroupMember(ByteBuf response) {
+    public static ConsumerGroupMember readConsumerGroupMember(ByteBuf 
response) {
         var memberId = response.readUnsignedIntLE();
         var partitionsCount = response.readUnsignedIntLE();
         List<Long> partitionIds = new ArrayList<>();
@@ -178,7 +182,7 @@ final class BytesDeserializer {
         return new PolledMessages(partitionId, currentOffset, messagesCount, 
messages);
     }
 
-    static Message readPolledMessage(ByteBuf response) {
+    public static Message readPolledMessage(ByteBuf response) {
         var checksum = readU64AsBigInteger(response);
         var id = readBytesMessageId(response);
         var offset = readU64AsBigInteger(response);
@@ -194,7 +198,7 @@ final class BytesDeserializer {
         return new Message(header, payload, Optional.empty());
     }
 
-    static Stats readStats(ByteBuf response) {
+    public static Stats readStats(ByteBuf response) {
         var processId = response.readUnsignedIntLE();
         var cpuUsage = response.readFloatLE();
         var totalCpuUsage = response.readFloatLE();
@@ -251,7 +255,7 @@ final class BytesDeserializer {
                 kernelVersion);
     }
 
-    static ClientInfoDetails readClientInfoDetails(ByteBuf response) {
+    public static ClientInfoDetails readClientInfoDetails(ByteBuf response) {
         var clientInfo = readClientInfo(response);
         var consumerGroups = new ArrayList<ConsumerGroupInfo>();
         for (int i = 0; i < clientInfo.consumerGroupsCount(); i++) {
@@ -261,7 +265,7 @@ final class BytesDeserializer {
         return new ClientInfoDetails(clientInfo, consumerGroups);
     }
 
-    static ClientInfo readClientInfo(ByteBuf response) {
+    public static ClientInfo readClientInfo(ByteBuf response) {
         var clientId = response.readUnsignedIntLE();
         var userId = response.readUnsignedIntLE();
         var userIdOptional = Optional.<Long>empty();
@@ -280,7 +284,7 @@ final class BytesDeserializer {
         return new ClientInfo(clientId, userIdOptional, address, 
transportString, consumerGroupsCount);
     }
 
-    static ConsumerGroupInfo readConsumerGroupInfo(ByteBuf response) {
+    public static ConsumerGroupInfo readConsumerGroupInfo(ByteBuf response) {
         var streamId = response.readUnsignedIntLE();
         var topicId = response.readUnsignedIntLE();
         var groupId = response.readUnsignedIntLE();
@@ -288,7 +292,7 @@ final class BytesDeserializer {
         return new ConsumerGroupInfo(streamId, topicId, groupId);
     }
 
-    static UserInfoDetails readUserInfoDetails(ByteBuf response) {
+    public static UserInfoDetails readUserInfoDetails(ByteBuf response) {
         var userInfo = readUserInfo(response);
 
         Optional<Permissions> permissionsOptional = Optional.empty();
@@ -300,7 +304,7 @@ final class BytesDeserializer {
         return new UserInfoDetails(userInfo, permissionsOptional);
     }
 
-    static Permissions readPermissions(ByteBuf response) {
+    public static Permissions readPermissions(ByteBuf response) {
         var _permissionsLength = response.readUnsignedIntLE();
         var globalPermissions = readGlobalPermissions(response);
         Map<Long, StreamPermissions> streamPermissionsMap = new HashMap<>();
@@ -312,7 +316,7 @@ final class BytesDeserializer {
         return new Permissions(globalPermissions, streamPermissionsMap);
     }
 
-    static StreamPermissions readStreamPermissions(ByteBuf response) {
+    public static StreamPermissions readStreamPermissions(ByteBuf response) {
         var manageStream = response.readBoolean();
         var readStream = response.readBoolean();
         var manageTopics = response.readBoolean();
@@ -329,7 +333,7 @@ final class BytesDeserializer {
                 manageStream, readStream, manageTopics, readTopics, 
pollMessages, sendMessages, topicPermissionsMap);
     }
 
-    static TopicPermissions readTopicPermissions(ByteBuf response) {
+    public static TopicPermissions readTopicPermissions(ByteBuf response) {
         var manageTopic = response.readBoolean();
         var readTopic = response.readBoolean();
         var pollMessages = response.readBoolean();
@@ -337,7 +341,7 @@ final class BytesDeserializer {
         return new TopicPermissions(manageTopic, readTopic, pollMessages, 
sendMessages);
     }
 
-    static GlobalPermissions readGlobalPermissions(ByteBuf response) {
+    public static GlobalPermissions readGlobalPermissions(ByteBuf response) {
         var manageServers = response.readBoolean();
         var readServers = response.readBoolean();
         var manageUsers = response.readBoolean();
@@ -361,7 +365,7 @@ final class BytesDeserializer {
                 sendMessages);
     }
 
-    static UserInfo readUserInfo(ByteBuf response) {
+    public static UserInfo readUserInfo(ByteBuf response) {
         var userId = response.readUnsignedIntLE();
         var createdAt = readU64AsBigInteger(response);
         var statusCode = response.readByte();
@@ -372,14 +376,14 @@ final class BytesDeserializer {
         return new UserInfo(userId, createdAt, status, username);
     }
 
-    static RawPersonalAccessToken readRawPersonalAccessToken(ByteBuf response) 
{
+    public static RawPersonalAccessToken readRawPersonalAccessToken(ByteBuf 
response) {
         var tokenLength = response.readByte();
         var token =
                 response.readCharSequence(tokenLength, 
StandardCharsets.UTF_8).toString();
         return new RawPersonalAccessToken(token);
     }
 
-    static PersonalAccessTokenInfo readPersonalAccessTokenInfo(ByteBuf 
response) {
+    public static PersonalAccessTokenInfo readPersonalAccessTokenInfo(ByteBuf 
response) {
         var nameLength = response.readByte();
         var name = response.readCharSequence(nameLength, 
StandardCharsets.UTF_8).toString();
         var expiry = readU64AsBigInteger(response);
@@ -387,11 +391,11 @@ final class BytesDeserializer {
         return new PersonalAccessTokenInfo(name, expiryOptional);
     }
 
-    private static BigInteger readU64AsBigInteger(ByteBuf buffer) {
+    static BigInteger readU64AsBigInteger(ByteBuf buffer) {
         var bytesArray = new byte[8];
         buffer.readBytes(bytesArray, 0, 8);
         ArrayUtils.reverse(bytesArray);
-        return new BigInteger(bytesArray);
+        return new BigInteger(1, bytesArray);
     }
 
     private static BytesMessageId readBytesMessageId(ByteBuf buffer) {
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesSerializer.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
similarity index 83%
rename from 
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesSerializer.java
rename to 
foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
index f95c0a622..267de8491 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesSerializer.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iggy.client.blocking.tcp;
+package org.apache.iggy.serde;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -35,21 +35,26 @@ import org.apache.iggy.user.StreamPermissions;
 import org.apache.iggy.user.TopicPermissions;
 
 import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.Optional;
 
+/**
+ * Unified serializer for both blocking and async clients.
+ * Provides serialization of domain objects to ByteBuf according to Iggy wire 
protocol.
+ */
 public final class BytesSerializer {
 
     private BytesSerializer() {}
 
-    static ByteBuf toBytes(Consumer consumer) {
+    public static ByteBuf toBytes(Consumer consumer) {
         ByteBuf buffer = Unpooled.buffer();
         buffer.writeByte(consumer.kind().asCode());
         buffer.writeBytes(toBytes(consumer.id()));
         return buffer;
     }
 
-    static ByteBuf toBytes(Identifier identifier) {
+    public static ByteBuf toBytes(Identifier identifier) {
         if (identifier.getKind() == 1) {
             ByteBuf buffer = Unpooled.buffer(6);
             buffer.writeByte(1);
@@ -67,7 +72,7 @@ public final class BytesSerializer {
         }
     }
 
-    static ByteBuf toBytes(Partitioning partitioning) {
+    public static ByteBuf toBytes(Partitioning partitioning) {
         ByteBuf buffer = Unpooled.buffer(2 + partitioning.value().length);
         buffer.writeByte(partitioning.kind().asCode());
         buffer.writeByte(partitioning.value().length);
@@ -75,14 +80,14 @@ public final class BytesSerializer {
         return buffer;
     }
 
-    static ByteBuf toBytes(Message message) {
+    public static ByteBuf toBytes(Message message) {
         var buffer = Unpooled.buffer(MessageHeader.SIZE + 
message.payload().length);
         buffer.writeBytes(toBytes(message.header()));
         buffer.writeBytes(message.payload());
         return buffer;
     }
 
-    static ByteBuf toBytes(MessageHeader header) {
+    public static ByteBuf toBytes(MessageHeader header) {
         var buffer = Unpooled.buffer(MessageHeader.SIZE);
         buffer.writeBytes(toBytesAsU64(header.checksum()));
         buffer.writeBytes(header.id().toBytes());
@@ -94,14 +99,14 @@ public final class BytesSerializer {
         return buffer;
     }
 
-    static ByteBuf toBytes(PollingStrategy strategy) {
+    public static ByteBuf toBytes(PollingStrategy strategy) {
         var buffer = Unpooled.buffer(9);
         buffer.writeByte(strategy.kind().asCode());
         buffer.writeBytes(toBytesAsU64(strategy.value()));
         return buffer;
     }
 
-    static ByteBuf toBytes(Optional<Long> optionalLong) {
+    public static ByteBuf toBytes(Optional<Long> optionalLong) {
         var buffer = Unpooled.buffer(5);
         if (optionalLong.isPresent()) {
             buffer.writeByte(1);
@@ -113,7 +118,7 @@ public final class BytesSerializer {
         return buffer;
     }
 
-    static ByteBuf toBytes(Map<String, HeaderValue> headers) {
+    public static ByteBuf toBytes(Map<String, HeaderValue> headers) {
         if (headers.isEmpty()) {
             return Unpooled.EMPTY_BUFFER;
         }
@@ -131,7 +136,7 @@ public final class BytesSerializer {
         return buffer;
     }
 
-    static ByteBuf toBytes(Permissions permissions) {
+    public static ByteBuf toBytes(Permissions permissions) {
         var buffer = Unpooled.buffer();
         buffer.writeBytes(toBytes(permissions.global()));
         if (permissions.streams().isEmpty()) {
@@ -149,7 +154,7 @@ public final class BytesSerializer {
         return buffer;
     }
 
-    static ByteBuf toBytes(GlobalPermissions permissions) {
+    public static ByteBuf toBytes(GlobalPermissions permissions) {
         var buffer = Unpooled.buffer();
         buffer.writeBoolean(permissions.manageServers());
         buffer.writeBoolean(permissions.readServers());
@@ -164,7 +169,7 @@ public final class BytesSerializer {
         return buffer;
     }
 
-    static ByteBuf toBytes(StreamPermissions permissions) {
+    public static ByteBuf toBytes(StreamPermissions permissions) {
         var buffer = Unpooled.buffer();
         buffer.writeBoolean(permissions.manageStream());
         buffer.writeBoolean(permissions.readStream());
@@ -187,7 +192,7 @@ public final class BytesSerializer {
         return buffer;
     }
 
-    static ByteBuf toBytes(TopicPermissions permissions) {
+    public static ByteBuf toBytes(TopicPermissions permissions) {
         var buffer = Unpooled.buffer();
         buffer.writeBoolean(permissions.manageTopic());
         buffer.writeBoolean(permissions.readTopic());
@@ -196,21 +201,21 @@ public final class BytesSerializer {
         return buffer;
     }
 
-    static ByteBuf nameToBytes(String name) {
-        ByteBuf buffer = Unpooled.buffer(1 + name.length());
-        buffer.writeByte(name.length());
-        buffer.writeBytes(name.getBytes());
+    public static ByteBuf toBytes(String value) {
+        ByteBuf buffer = Unpooled.buffer(1 + value.length());
+        buffer.writeByte(value.length());
+        buffer.writeBytes(value.getBytes(StandardCharsets.UTF_8));
         return buffer;
     }
 
-    static ByteBuf toBytesAsU64(BigInteger value) {
+    public static ByteBuf toBytesAsU64(BigInteger value) {
         if (value.signum() == -1) {
             throw new IllegalArgumentException("Negative value cannot be 
serialized to unsigned 64: " + value);
         }
         ByteBuf buffer = Unpooled.buffer(8, 8);
         byte[] valueAsBytes = value.toByteArray();
-        if (valueAsBytes.length > 9) {
-            throw new IllegalArgumentException();
+        if (valueAsBytes.length > 9 || valueAsBytes.length == 9 && 
valueAsBytes[0] != 0) {
+            throw new IllegalArgumentException("Value too large for U64: " + 
value);
         }
         ArrayUtils.reverse(valueAsBytes);
         buffer.writeBytes(valueAsBytes, 0, Math.min(8, valueAsBytes.length));
@@ -226,8 +231,8 @@ public final class BytesSerializer {
         }
         ByteBuf buffer = Unpooled.buffer(16, 16);
         byte[] valueAsBytes = value.toByteArray();
-        if (valueAsBytes.length > 17) {
-            throw new IllegalArgumentException();
+        if (valueAsBytes.length > 17 || valueAsBytes.length == 17 && 
valueAsBytes[0] != 0) {
+            throw new IllegalArgumentException("Value too large for U128: " + 
value);
         }
         ArrayUtils.reverse(valueAsBytes);
         buffer.writeBytes(valueAsBytes, 0, Math.min(16, valueAsBytes.length));
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/CommandCode.java
 b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/CommandCode.java
similarity index 96%
rename from 
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/CommandCode.java
rename to 
foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/CommandCode.java
index 5f7d6b525..e9f1f7905 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/CommandCode.java
+++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/CommandCode.java
@@ -17,8 +17,12 @@
  * under the License.
  */
 
-package org.apache.iggy.client.blocking.tcp;
+package org.apache.iggy.serde;
 
+/**
+ * TCP command codes for Iggy wire protocol.
+ * Used by both blocking and async TCP clients.
+ */
 public interface CommandCode {
 
     int getValue();
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncPollMessageTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncPollMessageTest.java
index d2d905ea8..e2e6b979f 100644
--- 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncPollMessageTest.java
+++ 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncPollMessageTest.java
@@ -47,7 +47,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.stream.IntStream;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * Test class specifically for testing poll message functionality with 
different consumer scenarios.
@@ -163,30 +162,6 @@ public class AsyncPollMessageTest {
         }
     }
 
-    @Test
-    @Order(1)
-    @DisplayName("Poll with NULL consumer - Expected to timeout (server 
doesn't respond)")
-    void testPollWithNullConsumer() {
-        log.info("TEST 1: Polling with NULL consumer");
-
-        // This test demonstrates the issue: server doesn't respond to null 
consumer
-        assertThatThrownBy(() -> {
-                    client.messages()
-                            .pollMessagesAsync(
-                                    StreamId.of(testStream),
-                                    TopicId.of(TEST_TOPIC),
-                                    Optional.of(PARTITION_ID),
-                                    null, // NULL consumer causes server to 
not respond
-                                    PollingStrategy.offset(BigInteger.ZERO),
-                                    10L,
-                                    false)
-                            .get(3, TimeUnit.SECONDS);
-                })
-                .isInstanceOf(TimeoutException.class);
-
-        log.info("CONFIRMED: Null consumer causes timeout (server doesn't 
respond)");
-    }
-
     @Test
     @Order(2)
     @DisplayName("Poll with various consumer IDs")
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
index b864e1cc6..2e035979e 100644
--- 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
+++ 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
@@ -21,6 +21,7 @@ package org.apache.iggy.client.blocking.tcp;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import org.apache.iggy.serde.BytesSerializer;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 
@@ -71,6 +72,17 @@ class BytesSerializerTest {
             // when & then
             assertThatThrownBy(() -> 
BytesSerializer.toBytesAsU64(value)).isInstanceOf(IllegalArgumentException.class);
         }
+
+        @Test
+        void shouldFailForValueLargerThanU64() {
+            // given
+            long maxLong = 0xFFFF_FFFF_FFFF_FFFFL;
+            var maxU64 = new BigInteger(Long.toUnsignedString(maxLong));
+            var value = maxU64.add(BigInteger.ONE);
+
+            // when & then
+            assertThatThrownBy(() -> 
BytesSerializer.toBytesAsU64(value)).isInstanceOf(IllegalArgumentException.class);
+        }
     }
 
     @Nested
@@ -113,5 +125,17 @@ class BytesSerializerTest {
             // when & then
             assertThatThrownBy(() -> 
BytesSerializer.toBytesAsU128(value)).isInstanceOf(IllegalArgumentException.class);
         }
+
+        @Test
+        void shouldFailForValueLargerThanU128() {
+            // given
+            byte[] maxU128 = new byte[17];
+            Arrays.fill(maxU128, 1, 17, (byte) 0xFF);
+            var maxU128Value = new BigInteger(maxU128);
+            var value = maxU128Value.add(BigInteger.ONE);
+
+            // when & then
+            assertThatThrownBy(() -> 
BytesSerializer.toBytesAsU128(value)).isInstanceOf(IllegalArgumentException.class);
+        }
     }
 }
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java
new file mode 100644
index 000000000..42e8b0158
--- /dev/null
+++ 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.serde;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.commons.lang3.ArrayUtils;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigInteger;
+import java.util.HexFormat;
+
+import static org.apache.iggy.serde.BytesDeserializer.readU64AsBigInteger;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class BytesDeserializerTest {
+
+    @Nested
+    class U64 {
+
+        @Test
+        void shouldDeserializeMaxValue() {
+            // given
+            long maxLong = 0xFFFF_FFFF_FFFF_FFFFL;
+            ByteBuf buffer = Unpooled.copyLong(maxLong);
+            var expectedMaxU64 = new 
BigInteger(Long.toUnsignedString(maxLong));
+
+            // when
+            BigInteger result = readU64AsBigInteger(buffer);
+
+            // then
+            assertThat(result).isEqualTo(expectedMaxU64);
+        }
+
+        @Test
+        void shouldDeserializeZero() {
+            // given
+            ByteBuf buffer = Unpooled.buffer(8);
+            buffer.writeZero(8);
+
+            // when
+            BigInteger result = readU64AsBigInteger(buffer);
+
+            // then
+            assertThat(result).isEqualTo(BigInteger.ZERO);
+        }
+
+        @Test
+        void shouldDeserializeArbitraryValue() {
+            // given
+            byte[] bytes = HexFormat.of().parseHex("8000000000000000");
+            var expected = new BigInteger(1, bytes);
+            ArrayUtils.reverse(bytes);
+            ByteBuf buffer = Unpooled.wrappedBuffer(bytes);
+
+            // when
+            BigInteger result = readU64AsBigInteger(buffer);
+
+            // then
+            assertThat(result).isEqualTo(expected);
+        }
+    }
+}

Reply via email to