This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 78ef2b9ee docs(java): add async client JavaDoc and usage guide (#2711)
78ef2b9ee is described below
commit 78ef2b9ee74d9a6073a7834617689c11880dace6
Author: Atharva Lade <[email protected]>
AuthorDate: Tue Feb 17 13:58:59 2026 -0600
docs(java): add async client JavaDoc and usage guide (#2711)
The async client had minimal documentation — interfaces lacked JavaDoc,
there was no usage guide, and developers had no reference for
CompletableFuture patterns, error handling, or migration from the
blocking client.
Closes #2231
---
examples/java/README.md | 95 +++++-
examples/java/build.gradle.kts | 5 +-
.../apache/iggy/examples/async/AsyncConsumer.java | 349 +++++++++++++++++++++
.../iggy/examples/async/AsyncConsumerExample.java | 157 ---------
.../apache/iggy/examples/async/AsyncProducer.java | 240 +++++++-------
foreign/java/README.md | 9 +-
.../iggy/client/async/ConsumerGroupsClient.java | 58 +++-
.../apache/iggy/client/async/MessagesClient.java | 126 ++++++--
.../apache/iggy/client/async/StreamsClient.java | 99 ++++++
.../org/apache/iggy/client/async/TopicsClient.java | 103 ++++--
.../org/apache/iggy/client/async/UsersClient.java | 48 ++-
.../org/apache/iggy/client/async/package-info.java | 57 ++++
.../iggy/client/async/tcp/AsyncIggyTcpClient.java | 112 ++++++-
.../apache/iggy/client/async/tcp/package-info.java | 48 +++
14 files changed, 1123 insertions(+), 383 deletions(-)
diff --git a/examples/java/README.md b/examples/java/README.md
index 93c1e63b3..590eb3ad8 100644
--- a/examples/java/README.md
+++ b/examples/java/README.md
@@ -132,18 +132,103 @@ Building streams with advanced configuration:
Shows how to use the stream builder API to create and configure streams with
custom settings.
-## Async Client
+## Async Client Examples
-The following example demonstrates how to use the asynchronous client:
+### Async Producer
-Async producer example:
+High-throughput async production with pipelining:
```bash
./gradlew runAsyncProducer
```
-Async consumer example:
+Shows:
+
+- CompletableFuture chaining patterns
+- Pipelining multiple sends without blocking
+- Performance comparison with blocking client
+
+### Async Consumer
+
+Non-blocking async consumption with advanced patterns:
```bash
-./gradlew runAsyncConsumerExample
+./gradlew runAsyncConsumer
+```
+
+Shows:
+
+- Backpressure management (don't poll faster than you can process)
+- Error recovery with exponential backoff
+- Thread pool separation (Netty I/O threads vs. processing threads)
+- Offset-based polling with CompletableFuture
+
+**CRITICAL ASYNC PATTERN - Thread Pool Management:**
+
+The async client uses Netty's event loop threads for I/O operations. **NEVER**
block these threads with:
+
+- `.join()` or `.get()` inside `thenApply/thenAccept`
+- `Thread.sleep()`
+- Blocking database calls
+- Long-running computations
+
+If your message processing involves blocking operations, offload to a separate
thread pool using `thenApplyAsync(fn, executor)`.
+
+## Blocking vs. Async - When to Use Each
+
+The Iggy Java SDK provides two client types: **blocking (synchronous)** and
**async (non-blocking)**. Choose based on your use case:
+
+### Use Blocking Client When
+
+- Writing scripts, CLI tools, or simple applications
+- Sequential code is easier to reason about
+- Integration tests
+
+### Use Async Client When
+
+- Need high throughput
+- Application is already async/reactive (Spring WebFlux, Vert.x)
+- Want to pipeline multiple requests over a single connection
+- Building services that handle many concurrent streams
+
+## Key Async Patterns
+
+### CompletableFuture Chaining
+
+```java
+client.connect()
+ .thenCompose(v -> client.login())
+ .thenCompose(identity -> client.streams().createStream("my-stream"))
+ .thenAccept(stream -> System.out.println("Created: " + stream.name()))
+ .exceptionally(ex -> {
+ System.err.println("Error: " + ex.getMessage());
+ return null;
+ });
+```
+
+### Pipelining for Throughput
+
+```java
+List<CompletableFuture<Void>> sends = new ArrayList<>();
+for (int i = 0; i < 10; i++) {
+ sends.add(client.messages().sendMessages(...));
+}
+CompletableFuture.allOf(sends.toArray(new CompletableFuture[0])).join();
+```
+
+### Thread Pool Offloading
+
+```java
+// WRONG - blocks Netty event loop
+client.messages().pollMessages(...)
+ .thenAccept(polled -> {
+ saveToDatabase(polled); // blocking I/O!
+ });
+
+// CORRECT - offloads to processing pool
+var processingPool = Executors.newFixedThreadPool(8);
+client.messages().pollMessages(...)
+ .thenAcceptAsync(polled -> {
+ saveToDatabase(polled); // runs on processingPool
+ }, processingPool);
```
diff --git a/examples/java/build.gradle.kts b/examples/java/build.gradle.kts
index 258a90978..95bfd195b 100644
--- a/examples/java/build.gradle.kts
+++ b/examples/java/build.gradle.kts
@@ -105,7 +105,8 @@ tasks.register<JavaExec>("runAsyncProducer") {
mainClass.set("org.apache.iggy.examples.async.AsyncProducer")
}
-tasks.register<JavaExec>("runAsyncConsumerExample") {
+tasks.register<JavaExec>("runAsyncConsumer") {
classpath = sourceSets["main"].runtimeClasspath
- mainClass.set("org.apache.iggy.examples.async.AsyncConsumerExample")
+ mainClass.set("org.apache.iggy.examples.async.AsyncConsumer")
}
+
diff --git
a/examples/java/src/main/java/org/apache/iggy/examples/async/AsyncConsumer.java
b/examples/java/src/main/java/org/apache/iggy/examples/async/AsyncConsumer.java
new file mode 100644
index 000000000..99e59c4d2
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/iggy/examples/async/AsyncConsumer.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.examples.async;
+
+import org.apache.iggy.Iggy;
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.PolledMessages;
+import org.apache.iggy.message.PollingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Async Consumer Example - Backpressure and Error Handling
+ *
+ * <p>Demonstrates advanced async message consumption patterns including:
+ * <ul>
+ * <li>Non-blocking continuous polling</li>
+ * <li>Backpressure management (don't poll faster than you can process)</li>
+ * <li>Error recovery with exponential backoff</li>
+ * <li>Offloading CPU-intensive work from Netty threads</li>
+ * <li>Graceful shutdown</li>
+ * </ul>
+ *
+ * <h2>CRITICAL ASYNC PATTERN - Thread Pool Management:</h2>
+ *
+ * <p>The async client uses Netty's event loop threads for I/O operations.
+ * <strong>NEVER</strong> block these threads with:
+ * <ul>
+ * <li>{@code .join()} or {@code .get()} inside {@code
thenApply/thenAccept}</li>
+ * <li>{@code Thread.sleep()}</li>
+ * <li>Blocking database calls</li>
+ * <li>Long-running computations</li>
+ * </ul>
+ *
+ * <p>If your message processing involves blocking operations, offload to a
separate
+ * thread pool using {@code thenApplyAsync(fn, executor)}.
+ *
+ * <p>This example shows the correct pattern.
+ *
+ * <p>Run after AsyncProducer to see messages flow.
+ *
+ * <p>Run with: {@code ./gradlew runAsyncConsumer}
+ */
+public final class AsyncConsumer {
+ private static final Logger log =
LoggerFactory.getLogger(AsyncConsumer.class);
+
+ // Configuration (must match AsyncProducer)
+ private static final String IGGY_HOST = "localhost";
+ private static final int IGGY_PORT = 8090;
+ private static final String USERNAME = "iggy";
+ private static final String PASSWORD = "iggy";
+ private static final String STREAM_NAME = "async-test";
+ private static final String TOPIC_NAME = "events";
+ private static final long PARTITION_ID = 0L;
+ private static final long CONSUMER_ID = 0L;
+
+ // Polling configuration
+ private static final int POLL_BATCH_SIZE = 100;
+ private static final int POLL_INTERVAL_MS = 1000;
+ private static final int BATCHES_LIMIT = 5; // Exit after receiving this
many batches
+ private static final int MAX_EMPTY_POLLS = 5; // Exit if no messages after
consecutive empty polls
+
+ // Error recovery configuration
+ private static final int MAX_RETRY_ATTEMPTS = 5;
+ private static final int INITIAL_BACKOFF_MS = 100;
+ private static final int MAX_BACKOFF_MS = 5000;
+
+ // Thread pool for message processing (separate from Netty threads)
+ // Size based on workload: CPU-bound = availableProcessors, I/O-bound = 2x
or more
+ private static final int PROCESSING_THREADS =
Runtime.getRuntime().availableProcessors();
+
+ private static volatile boolean running = true;
+
+ private AsyncConsumer() {
+ // Utility class
+ }
+
+ public static void main(String[] args) {
+ AsyncIggyTcpClient client = null;
+ ExecutorService processingPool = null;
+
+ // Handle Ctrl+C gracefully
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ log.info("Shutdown signal received, stopping consumer...");
+ running = false;
+ }));
+
+ try {
+ log.info("=== Async Consumer Example (Backpressure + Error
Handling) ===");
+
+ // Create thread pool for message processing
+ processingPool = Executors.newFixedThreadPool(PROCESSING_THREADS,
r -> {
+ Thread t = new Thread(r, "message-processor");
+ t.setDaemon(true);
+ return t;
+ });
+
+ log.info("Created processing thread pool with {} threads",
PROCESSING_THREADS);
+
+ // 1. Connect and authenticate
+ log.info("Connecting to Iggy server at {}:{}...", IGGY_HOST,
IGGY_PORT);
+ client = Iggy.tcpClientBuilder()
+ .async()
+ .host(IGGY_HOST)
+ .port(IGGY_PORT)
+ .credentials(USERNAME, PASSWORD)
+ .buildAndLogin()
+ .join();
+
+ log.info("Connected successfully");
+
+ // 2. Poll messages continuously with backpressure
+ AsyncIggyTcpClient finalClient = client;
+ pollMessagesAsync(finalClient, processingPool).join();
+
+ log.info("=== Consumer stopped gracefully ===");
+
+ } catch (RuntimeException e) {
+ log.error("Consumer failed", e);
+ System.exit(1);
+ } finally {
+ // Cleanup
+ if (client != null) {
+ try {
+ client.close().join();
+ log.info("Client closed");
+ } catch (RuntimeException e) {
+ log.error("Error closing client", e);
+ }
+ }
+
+ if (processingPool != null) {
+ processingPool.shutdown();
+ try {
+ if (!processingPool.awaitTermination(5, TimeUnit.SECONDS))
{
+ processingPool.shutdownNow();
+ }
+ log.info("Processing thread pool shut down");
+ } catch (InterruptedException e) {
+ processingPool.shutdownNow();
+ }
+ }
+ }
+ }
+
+ private static CompletableFuture<Void> pollMessagesAsync(
+ AsyncIggyTcpClient client, ExecutorService processingPool) {
+ log.info("Starting async polling loop (limit: {} batches)...",
BATCHES_LIMIT);
+
+ AtomicInteger totalReceived = new AtomicInteger(0);
+ AtomicInteger emptyPolls = new AtomicInteger(0);
+ AtomicInteger consumedBatches = new AtomicInteger(0);
+ AtomicReference<BigInteger> offset = new
AtomicReference<>(BigInteger.ZERO);
+
+ // RECURSIVE ASYNC POLLING PATTERN:
+ // Each poll schedules the next poll after processing completes.
+ // This provides natural backpressure - we don't poll for new messages
+ // until we've finished processing the current batch.
+
+ CompletableFuture<Void> pollingLoop = new CompletableFuture<>();
+ pollBatch(client, processingPool, totalReceived, emptyPolls,
consumedBatches, offset, 0, pollingLoop);
+ return pollingLoop;
+ }
+
+ private static void pollBatch(
+ AsyncIggyTcpClient client,
+ ExecutorService processingPool,
+ AtomicInteger totalReceived,
+ AtomicInteger emptyPolls,
+ AtomicInteger consumedBatches,
+ AtomicReference<BigInteger> offset,
+ int retryAttempt,
+ CompletableFuture<Void> loopFuture) {
+ if (!running || consumedBatches.get() >= BATCHES_LIMIT) {
+ log.info(
+ "Finished consuming {} batches. Total messages received:
{}",
+ consumedBatches.get(),
+ totalReceived.get());
+ loopFuture.complete(null);
+ return;
+ }
+
+ StreamId streamId = StreamId.of(STREAM_NAME);
+ TopicId topicId = TopicId.of(TOPIC_NAME);
+ Consumer consumer = Consumer.of(CONSUMER_ID);
+
+ client.messages()
+ .pollMessages(
+ streamId,
+ topicId,
+ Optional.of(PARTITION_ID),
+ consumer,
+ PollingStrategy.offset(offset.get()),
+ (long) POLL_BATCH_SIZE,
+ false)
+ .thenComposeAsync(
+ polled -> {
+ // OFFLOAD TO PROCESSING POOL:
+ // We use thenComposeAsync with processingPool to
move message processing
+ // off the Netty event loop. This is critical for
heavy workloads.
+
+ int messageCount = polled.messages().size();
+
+ if (messageCount > 0) {
+ // Update offset for next poll
+ offset.updateAndGet(current ->
current.add(BigInteger.valueOf(messageCount)));
+ consumedBatches.incrementAndGet();
+
+ return processMessages(polled, totalReceived,
processingPool)
+ .thenRun(() -> emptyPolls.set(0));
+ } else {
+ int empty = emptyPolls.incrementAndGet();
+ if (empty >= MAX_EMPTY_POLLS) {
+ log.info("No more messages after {} empty
polls, finishing.", MAX_EMPTY_POLLS);
+ running = false;
+ return
CompletableFuture.completedFuture(null);
+ }
+ log.info("Caught up - no new messages.
Waiting...");
+ // Sleep without blocking Netty threads
+ return CompletableFuture.runAsync(
+ () -> {
+ try {
+ Thread.sleep(POLL_INTERVAL_MS);
+ } catch (InterruptedException e) {
+
Thread.currentThread().interrupt();
+ }
+ },
+ processingPool);
+ }
+ },
+ processingPool)
+ .thenRun(() -> {
+ // SUCCESS: Reset retry counter and schedule next poll
+ pollBatch(
+ client, processingPool, totalReceived, emptyPolls,
consumedBatches, offset, 0, loopFuture);
+ })
+ .exceptionally(e -> {
+ // ERROR RECOVERY WITH EXPONENTIAL BACKOFF:
+ // Don't give up on the first error. Retry with increasing
delays.
+ log.error(
+ "Error polling messages (attempt {}/{}): {}",
+ retryAttempt + 1,
+ MAX_RETRY_ATTEMPTS,
+ e.getMessage());
+
+ if (retryAttempt < MAX_RETRY_ATTEMPTS) {
+ int backoffMs = Math.min(INITIAL_BACKOFF_MS * (1 <<
retryAttempt), MAX_BACKOFF_MS);
+ log.info("Retrying in {} ms...", backoffMs);
+
+ // Schedule retry after backoff
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ Thread.sleep(backoffMs);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ pollBatch(
+ client,
+ processingPool,
+ totalReceived,
+ emptyPolls,
+ consumedBatches,
+ offset,
+ retryAttempt + 1,
+ loopFuture);
+ },
+ processingPool);
+ } else {
+ log.error("Max retry attempts reached. Stopping
consumer.");
+ loopFuture.completeExceptionally(e);
+ }
+ return null;
+ });
+ }
+
+ private static CompletableFuture<Void> processMessages(
+ PolledMessages polled, AtomicInteger totalReceived,
ExecutorService processingPool) {
+ // Process each message (this runs on processingPool, not Netty
threads)
+ return CompletableFuture.runAsync(
+ () -> {
+ int messageCount = polled.messages().size();
+
+ for (Message message : polled.messages()) {
+ String payload = new String(message.payload());
+
+ // Simulate message processing (in real app: parse,
validate, store, etc.)
+ // This could be CPU-intensive or involve blocking I/O
(database, HTTP calls)
+ processMessage(payload, message.header().offset());
+ }
+
+ int total = totalReceived.addAndGet(messageCount);
+ log.info("Processed {} messages (total: {})",
messageCount, total);
+ },
+ processingPool);
+ }
+
+ private static void processMessage(String payload, BigInteger offset) {
+ // In a real application, this would be your business logic:
+ // - Parse JSON
+ // - Validate data
+ // - Call external APIs
+ // - Update database
+ // - Send to downstream systems
+
+ // For this example, just log occasionally
+ if (offset.compareTo(BigInteger.valueOf(5)) < 0
+ ||
offset.mod(BigInteger.valueOf(100)).equals(BigInteger.ZERO)) {
+ log.debug("Processed message at offset {}: '{}'", offset, payload);
+ }
+
+ // Simulate some processing time (remove in real app)
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git
a/examples/java/src/main/java/org/apache/iggy/examples/async/AsyncConsumerExample.java
b/examples/java/src/main/java/org/apache/iggy/examples/async/AsyncConsumerExample.java
deleted file mode 100644
index 28f396c86..000000000
---
a/examples/java/src/main/java/org/apache/iggy/examples/async/AsyncConsumerExample.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iggy.examples.async;
-
-import org.apache.iggy.Iggy;
-import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
-import org.apache.iggy.consumergroup.Consumer;
-import org.apache.iggy.consumergroup.ConsumerGroupDetails;
-import org.apache.iggy.identifier.ConsumerId;
-import org.apache.iggy.identifier.StreamId;
-import org.apache.iggy.identifier.TopicId;
-import org.apache.iggy.message.PollingStrategy;
-import org.apache.iggy.stream.StreamDetails;
-import org.apache.iggy.topic.CompressionAlgorithm;
-import org.apache.iggy.topic.TopicDetails;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigInteger;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static java.util.Optional.empty;
-
-/**
- * Example demonstrating the true async Netty-based client.
- */
-public final class AsyncConsumerExample {
-
- private static final String STREAM_NAME = "async-test";
- private static final StreamId STREAM_ID = StreamId.of(STREAM_NAME);
- private static final String TOPIC_NAME = "events";
- private static final TopicId TOPIC_ID = TopicId.of(TOPIC_NAME);
- private static final String GROUP_NAME = "async-consumer";
- private static final ConsumerId GROUP_ID = ConsumerId.of(GROUP_NAME);
- private static final Logger log =
LoggerFactory.getLogger(AsyncConsumerExample.class);
-
- private AsyncConsumerExample() {}
-
- public static void main(String[] args) throws ExecutionException,
InterruptedException, TimeoutException {
- // First, setup the stream/topic/group using blocking client
- setupWithBlockingClient();
-
- // Now test the async client
- testAsyncClient();
- }
-
- private static void setupWithBlockingClient() {
- log.info("Setting up stream, topic, and consumer group...");
-
- var blockingClient =
- Iggy.tcpClientBuilder().blocking().credentials("iggy",
"iggy").buildAndLogin();
-
- // Create stream if needed
- Optional<StreamDetails> stream =
blockingClient.streams().getStream(STREAM_ID);
- if (!stream.isPresent()) {
- blockingClient.streams().createStream(STREAM_NAME);
- log.info("Created stream: {}", STREAM_NAME);
- }
-
- // Create topic if needed
- Optional<TopicDetails> topic =
blockingClient.topics().getTopic(STREAM_ID, TOPIC_ID);
- if (!topic.isPresent()) {
- blockingClient
- .topics()
- .createTopic(
- STREAM_ID,
- 1L,
- CompressionAlgorithm.None,
- BigInteger.ZERO,
- BigInteger.ZERO,
- empty(),
- TOPIC_NAME);
- log.info("Created topic: {}", TOPIC_NAME);
- }
-
- // Create consumer group if needed
- Optional<ConsumerGroupDetails> group =
- blockingClient.consumerGroups().getConsumerGroup(STREAM_ID,
TOPIC_ID, GROUP_ID);
- if (!group.isPresent()) {
- blockingClient.consumerGroups().createConsumerGroup(STREAM_ID,
TOPIC_ID, GROUP_NAME);
- log.info("Created consumer group: {}", GROUP_NAME);
- }
-
- // Join the consumer group
- blockingClient.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID,
GROUP_ID);
- log.info("Joined consumer group");
- }
-
- private static void testAsyncClient() throws ExecutionException,
InterruptedException, TimeoutException {
- log.info("Testing async client with Netty...");
-
- // Create async client
- AsyncIggyTcpClient asyncClient = new AsyncIggyTcpClient("localhost",
8090);
-
- // Connect asynchronously
- log.info("Connecting to server...");
- asyncClient
- .connect()
- .thenCompose(v -> {
- log.info("Connected! Logging in...");
- return asyncClient.users().login("iggy", "iggy");
- })
- .thenCompose(v -> {
- log.info("Logged in! Joining consumer group...");
- // Join the consumer group first
- return
asyncClient.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID, GROUP_ID);
- })
- .thenCompose(v -> {
- log.info("Joined consumer group! Now polling messages...");
- return asyncClient
- .messages()
- .pollMessages(
- STREAM_ID,
- TOPIC_ID,
- Optional.empty(),
- Consumer.group(GROUP_ID),
- PollingStrategy.next(),
- 10L,
- true);
- })
- .thenAccept(messages -> {
- log.info("Received {} messages",
messages.messages().size());
- messages.messages().forEach(msg -> log.info("Message: {}",
new String(msg.payload())));
- })
- .exceptionally(error -> {
- log.error("Error in async operation", error);
- return null;
- })
- .thenCompose(v -> {
- log.info("Closing connection...");
- return asyncClient.close();
- })
- .get(10, TimeUnit.SECONDS);
-
- log.info("Async test completed!");
- }
-}
diff --git
a/examples/java/src/main/java/org/apache/iggy/examples/async/AsyncProducer.java
b/examples/java/src/main/java/org/apache/iggy/examples/async/AsyncProducer.java
index 45329b062..710089ce7 100644
---
a/examples/java/src/main/java/org/apache/iggy/examples/async/AsyncProducer.java
+++
b/examples/java/src/main/java/org/apache/iggy/examples/async/AsyncProducer.java
@@ -19,6 +19,7 @@
package org.apache.iggy.examples.async;
+import org.apache.iggy.Iggy;
import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
import org.apache.iggy.identifier.StreamId;
import org.apache.iggy.identifier.TopicId;
@@ -29,23 +30,24 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigInteger;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* AsyncProducer demonstrates how to use the async client to send messages to
Apache Iggy.
- * This producer sends messages asynchronously and handles responses using
CompletableFuture.
+ *
+ * <p>This producer sends messages asynchronously and handles responses using
CompletableFuture.
+ *
+ * <p>Run with: {@code ./gradlew runAsyncProducer}
*/
-public class AsyncProducer {
+public final class AsyncProducer {
private static final Logger log =
LoggerFactory.getLogger(AsyncProducer.class);
- private static final String HOST = "127.0.0.1";
+ private static final String HOST = "localhost";
private static final int PORT = 8090;
private static final String USERNAME = "iggy";
private static final String PASSWORD = "iggy";
@@ -55,157 +57,139 @@ public class AsyncProducer {
private static final long PARTITION_ID = 0L;
private static final int MESSAGE_COUNT = 100;
- private static final int MESSAGE_SIZE = 256;
-
- private final AsyncIggyTcpClient client;
- private final AtomicInteger successCount = new AtomicInteger(0);
- private final AtomicInteger errorCount = new AtomicInteger(0);
+ private static final int MESSAGE_BATCH_SIZE = 10;
+ private static final int TOTAL_BATCHES = MESSAGE_COUNT /
MESSAGE_BATCH_SIZE;
- public AsyncProducer() {
- this.client = new AsyncIggyTcpClient(HOST, PORT);
+ private AsyncProducer() {
+ // Utility class
}
- public CompletableFuture<Void> start() {
- log.info("Starting AsyncProducer...");
+ public static void main(String[] args) {
+ AsyncIggyTcpClient client = null;
- return client.connect()
- .thenCompose(v -> {
- log.info("Connected to Iggy server at {}:{}", HOST, PORT);
- return client.users().login(USERNAME, PASSWORD);
- })
- .thenCompose(v -> {
- log.info("Logged in successfully as user: {}", USERNAME);
- return setupStreamAndTopic();
- })
- .thenCompose(v -> {
- log.info("Stream and topic setup complete");
- return sendMessages();
- })
- .thenRun(() -> {
- log.info("All messages sent. Success: {}, Errors: {}",
successCount.get(), errorCount.get());
- })
- .exceptionally(ex -> {
- log.error("Error in producer flow", ex);
- return null;
- });
+ try {
+ log.info("=== Async Producer Example ===");
+
+ // 1. Connect and authenticate using builder
+ log.info("Connecting to Iggy server at {}:{}...", HOST, PORT);
+ client = Iggy.tcpClientBuilder()
+ .async()
+ .host(HOST)
+ .port(PORT)
+ .credentials(USERNAME, PASSWORD)
+ .buildAndLogin()
+ .join();
+
+ log.info("Connected successfully");
+
+ // 2. Set up stream and topic
+ setupStreamAndTopic(client).join();
+ log.info("Stream and topic setup complete");
+
+ // 3. Send messages
+ AtomicInteger successCount = new AtomicInteger(0);
+ AtomicInteger errorCount = new AtomicInteger(0);
+
+ sendMessages(client, successCount, errorCount).join();
+ log.info("All messages sent. Success: {}, Errors: {}",
successCount.get(), errorCount.get());
+
+ log.info("=== Producer completed successfully ===");
+
+ } catch (RuntimeException e) {
+ log.error("Producer failed", e);
+ System.exit(1);
+ } finally {
+ if (client != null) {
+ try {
+ client.close().join();
+ log.info("Client closed");
+ } catch (RuntimeException e) {
+ log.error("Error closing client", e);
+ }
+ }
+ }
}
- private CompletableFuture<Void> setupStreamAndTopic() {
+ private static CompletableFuture<Void>
setupStreamAndTopic(AsyncIggyTcpClient client) {
log.info("Checking stream: {}", STREAM_NAME);
return client.streams()
.getStream(StreamId.of(STREAM_NAME))
- .thenCompose(stream -> {
- if (stream.isEmpty()) {
- log.info("Creating stream: {}", STREAM_NAME);
- return client.streams()
- .createStream(STREAM_NAME)
- .thenAccept(created -> log.info("Stream
created: {}", created.name()));
- } else {
- log.info("Stream exists: {}", STREAM_NAME);
+ .thenCompose(streamOpt -> {
+ if (streamOpt.isPresent()) {
+ log.info("Stream '{}' already exists", STREAM_NAME);
return CompletableFuture.completedFuture(null);
}
+ log.info("Creating stream: {}", STREAM_NAME);
+ return client.streams()
+ .createStream(STREAM_NAME)
+ .thenAccept(created -> log.info("Stream created:
{}", created.name()));
})
.thenCompose(v -> {
log.info("Checking topic: {}", TOPIC_NAME);
return client.topics().getTopic(StreamId.of(STREAM_NAME),
TopicId.of(TOPIC_NAME));
})
- .thenCompose(topic -> {
- if (topic.isEmpty()) {
- log.info("Creating topic: {}", TOPIC_NAME);
- return client.topics()
- .createTopic(
- StreamId.of(STREAM_NAME),
- 1L, // 1 partition
- CompressionAlgorithm.None,
- BigInteger.ZERO,
- BigInteger.ZERO,
- Optional.empty(),
- TOPIC_NAME)
- .thenAccept(created -> log.info("Topic
created: {}", created.name()));
- } else {
- log.info("Topic exists: {}", TOPIC_NAME);
+ .thenCompose(topicOpt -> {
+ if (topicOpt.isPresent()) {
+ log.info("Topic '{}' already exists", TOPIC_NAME);
return CompletableFuture.completedFuture(null);
}
+ log.info("Creating topic: {}", TOPIC_NAME);
+ return client.topics()
+ .createTopic(
+ StreamId.of(STREAM_NAME),
+ 1L,
+ CompressionAlgorithm.None,
+ BigInteger.ZERO,
+ BigInteger.ZERO,
+ Optional.empty(),
+ TOPIC_NAME)
+ .thenAccept(created -> log.info("Topic created:
{}", created.name()));
});
}
- private CompletableFuture<Void> sendMessages() {
- log.info("Sending {} messages...", MESSAGE_COUNT);
-
- CompletableFuture<?>[] futures = new CompletableFuture[MESSAGE_COUNT];
-
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- final int messageIndex = i;
- futures[i] = sendMessage(messageIndex).handle((result, ex) -> {
- if (ex != null) {
- log.error("Failed to send message {}: {}", messageIndex,
ex.getMessage());
- errorCount.incrementAndGet();
- } else {
- if (messageIndex % 10 == 0) {
- log.debug("Sent message {}", messageIndex);
- }
- successCount.incrementAndGet();
- }
- return null;
- });
-
- // Add a small delay between messages to avoid overwhelming the
server
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
- }
- }
-
- return CompletableFuture.allOf(futures);
- }
-
- private CompletableFuture<Void> sendMessage(int index) {
- // Create message payload
- String messageContent =
- String.format("Async message %d - %s - %s", index,
UUID.randomUUID(), System.currentTimeMillis());
-
- // Pad message to desired size
- while (messageContent.length() < MESSAGE_SIZE) {
- messageContent += " ";
- }
-
- // Use the factory method to create a message
- Message message = Message.of(messageContent);
+ private static CompletableFuture<Void> sendMessages(
+ AsyncIggyTcpClient client, AtomicInteger successCount,
AtomicInteger errorCount) {
+ log.info("Sending {} messages in {} batches...", MESSAGE_COUNT,
TOTAL_BATCHES);
- // Create partitioning strategy (use partition ID)
+ StreamId streamId = StreamId.of(STREAM_NAME);
+ TopicId topicId = TopicId.of(TOPIC_NAME);
Partitioning partitioning = Partitioning.partitionId(PARTITION_ID);
- // Send message using async client
- return client.messages()
- .sendMessages(StreamId.of(STREAM_NAME),
TopicId.of(TOPIC_NAME), partitioning, List.of(message));
- }
+ long startTime = System.currentTimeMillis();
- public CompletableFuture<Void> stop() {
- log.info("Stopping AsyncProducer...");
- return client.close().thenRun(() -> log.info("AsyncProducer stopped"));
- }
+ CompletableFuture<?>[] futures = new CompletableFuture[TOTAL_BATCHES];
- public static void main(String[] args) {
- AsyncProducer producer = new AsyncProducer();
+ for (int b = 0; b < TOTAL_BATCHES; b++) {
+ final int batchNum = b;
- CompletableFuture<Void> producerFuture = producer.start()
- .thenCompose(v -> {
- // Keep producer running for a while
- CompletableFuture<Void> delay = new CompletableFuture<>();
- CompletableFuture.delayedExecutor(2,
TimeUnit.SECONDS).execute(() -> delay.complete(null));
- return delay;
- })
- .thenCompose(v -> producer.stop());
+ List<Message> batch = new ArrayList<>(MESSAGE_BATCH_SIZE);
+ for (int i = 0; i < MESSAGE_BATCH_SIZE; i++) {
+ int messageId = batchNum * MESSAGE_BATCH_SIZE + i;
+ String payload = String.format(
+ "Async message %d - %s - %s", messageId,
UUID.randomUUID(), System.currentTimeMillis());
+ batch.add(Message.of(payload));
+ }
- try {
- producerFuture.get(30, TimeUnit.SECONDS);
- log.info("AsyncProducer completed successfully");
- } catch (InterruptedException | ExecutionException | TimeoutException
e) {
- log.error("AsyncProducer failed", e);
- System.exit(1);
+ futures[b] = client.messages()
+ .sendMessages(streamId, topicId, partitioning, batch)
+ .handle((result, ex) -> {
+ if (ex != null) {
+ log.error("Failed to send batch {}: {}", batchNum,
ex.getMessage());
+ errorCount.addAndGet(MESSAGE_BATCH_SIZE);
+ } else {
+ successCount.addAndGet(MESSAGE_BATCH_SIZE);
+ if ((batchNum + 1) % 5 == 0) {
+ log.info("Sent batch {}/{}", batchNum + 1,
TOTAL_BATCHES);
+ }
+ }
+ return null;
+ });
}
+
+ return CompletableFuture.allOf(futures).thenRun(() -> {
+ long elapsed = System.currentTimeMillis() - startTime;
+ log.info("Sent {} messages in {} ms", successCount.get(), elapsed);
+ });
}
}
diff --git a/foreign/java/README.md b/foreign/java/README.md
index 24bddb606..4fe010e9c 100644
--- a/foreign/java/README.md
+++ b/foreign/java/README.md
@@ -195,7 +195,14 @@ All exceptions thrown by the SDK inherit from
`IggyException`. This allows you t
## Examples
-See the [`examples`](examples/) module for basic consumer and producer
implementations using the SDK.
+See the **[Java Examples](../../examples/java/)** directory for runnable
applications demonstrating the SDK:
+
+- **BlockingProducer** — synchronous message production with batch sending
+- **BlockingConsumer** — synchronous consumption with polling loops
+- **AsyncProducer** — high-throughput async production with pipelining
+- **AsyncConsumer** — async consumption with backpressure and error recovery
+
+Each example includes comprehensive documentation on when to use blocking vs.
async clients, CompletableFuture patterns, thread pool management, and
performance characteristics.
For Apache Flink integration, see the [Flink Connector
Library](external-processors/iggy-connector-flink/iggy-connector-library/README.md).
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/ConsumerGroupsClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/ConsumerGroupsClient.java
index f27faf1a6..14d3fe68e 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/ConsumerGroupsClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/ConsumerGroupsClient.java
@@ -26,27 +26,67 @@ import org.apache.iggy.identifier.TopicId;
import java.util.concurrent.CompletableFuture;
/**
- * Async interface for consumer group operations.
+ * Async client interface for consumer group operations.
+ *
+ * <p>Consumer groups enable coordinated message consumption across multiple
clients.
+ * When a client joins a consumer group, the server assigns topic partitions
to that
+ * client. The partition assignment is rebalanced automatically when members
join or
+ * leave.
+ *
+ * <p><strong>Important:</strong> Consumer group membership is tied to the TCP
connection.
+ * If a client disconnects, it is automatically removed from the group and
partitions
+ * are reassigned to remaining members.
+ *
+ * <p>Usage example:
+ * <pre>{@code
+ * ConsumerGroupsClient groups = client.consumerGroups();
+ *
+ * // Join a group before polling with Consumer.group()
+ * groups.joinConsumerGroup(streamId, topicId, ConsumerId.of(1L))
+ * .thenRun(() -> System.out.println("Joined consumer group"))
+ * .thenCompose(v -> client.messages().pollMessages(
+ * streamId, topicId, Optional.empty(),
+ * Consumer.group(1L), PollingStrategy.next(), 100L, true));
+ *
+ * // Leave the group when done
+ * groups.leaveConsumerGroup(streamId, topicId, ConsumerId.of(1L));
+ * }</pre>
+ *
+ * @see org.apache.iggy.client.async.tcp.AsyncIggyTcpClient#consumerGroups()
*/
public interface ConsumerGroupsClient {
/**
* Joins a consumer group asynchronously.
*
- * @param streamId The stream identifier
- * @param topicId The topic identifier
- * @param groupId The consumer group identifier
- * @return A CompletableFuture that completes when the operation is done
+ * <p>The client becomes a member of the specified consumer group and will
be assigned
+ * one or more partitions to consume from. The membership is tied to this
TCP connection
+ * — disconnecting will automatically remove the client from the group.
+ *
+ * <p>A client must join the consumer group before polling messages with a
+ * {@link org.apache.iggy.consumergroup.Consumer#group(Long)} consumer
type.
+ *
+ * @param streamId the stream identifier containing the topic
+ * @param topicId the topic identifier
+ * @param groupId the consumer group identifier to join
+ * @return a {@link CompletableFuture} that completes when the client has
joined
+ * @throws org.apache.iggy.exception.IggyException if the consumer group
does not exist
*/
CompletableFuture<Void> joinConsumerGroup(StreamId streamId, TopicId
topicId, ConsumerId groupId);
/**
* Leaves a consumer group asynchronously.
*
- * @param streamId The stream identifier
- * @param topicId The topic identifier
- * @param groupId The consumer group identifier
- * @return A CompletableFuture that completes when the operation is done
+ * <p>The client is removed from the group and its assigned partitions are
redistributed
+ * among the remaining members. After leaving, the client can no longer
poll messages
+ * using a group consumer for this group until it joins again.
+ *
+ * @param streamId the stream identifier containing the topic
+ * @param topicId the topic identifier
+ * @param groupId the consumer group identifier to leave
+ * @return a {@link CompletableFuture} that completes when the client has
left
+ * @throws org.apache.iggy.exception.IggyException if the client is not a
member of the
+ * group
*/
CompletableFuture<Void> leaveConsumerGroup(StreamId streamId, TopicId
topicId, ConsumerId groupId);
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/MessagesClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/MessagesClient.java
index 2654e87c6..8215665b5 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/MessagesClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/MessagesClient.java
@@ -32,22 +32,66 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
- * Async client interface for message operations.
- * All methods return CompletableFuture for non-blocking operations.
+ * Async client interface for message operations (producing and consuming).
+ *
+ * <p>This is the core interface for interacting with Iggy's message streaming
+ * capabilities. It supports both sending (producing) and polling (consuming)
+ * messages asynchronously via {@link CompletableFuture}.
+ *
+ * <h2>Producing Messages</h2>
+ * <pre>{@code
+ * MessagesClient messages = client.messages();
+ *
+ * // Send messages with balanced partitioning (round-robin)
+ * var msgs = List.of(Message.of("order-created"),
Message.of("order-updated"));
+ * messages.sendMessages(streamId, topicId, Partitioning.balanced(), msgs)
+ * .thenRun(() -> System.out.println("Messages sent"));
+ *
+ * // Send with message key partitioning (ensures ordering per key)
+ * messages.sendMessages(streamId, topicId,
Partitioning.messagesKey("user-123"), msgs);
+ * }</pre>
+ *
+ * <h2>Consuming Messages</h2>
+ * <pre>{@code
+ * // Poll from the beginning
+ * messages.pollMessages(streamId, topicId, Optional.empty(),
+ * Consumer.of(1L), PollingStrategy.first(), 100L, true)
+ * .thenAccept(polled -> {
+ * for (var msg : polled.messages()) {
+ * System.out.println(new String(msg.payload()));
+ * }
+ * });
+ * }</pre>
+ *
+ * @see Partitioning
+ * @see PollingStrategy
+ * @see Consumer
+ * @see org.apache.iggy.client.async.tcp.AsyncIggyTcpClient#messages()
*/
public interface MessagesClient {
/**
- * Asynchronously polls messages from a topic.
+ * Polls messages from a topic partition asynchronously.
*
- * @param streamId the stream identifier
- * @param topicId the topic identifier
- * @param partitionId optional partition ID
- * @param consumer the consumer
- * @param strategy the polling strategy
- * @param count the number of messages to poll
- * @param autoCommit whether to auto-commit offsets
- * @return CompletableFuture that will complete with the polled messages
+ * <p>Messages are retrieved according to the specified {@link
PollingStrategy}, which
+ * controls where in the partition to start reading (e.g., from the
beginning, end,
+ * a specific offset, or a timestamp).
+ *
+ * <p>When {@code autoCommit} is {@code true}, the server automatically
stores the
+ * consumer's offset after returning the messages. This simplifies offset
management
+ * but provides at-least-once delivery semantics.
+ *
+ * @param streamId the stream identifier (numeric or string-based)
+ * @param topicId the topic identifier (numeric or string-based)
+ * @param partitionId optional partition ID to poll from; if empty, the
server selects
+ * the partition (required when using consumer groups)
+ * @param consumer the consumer identity, either individual ({@link
Consumer#of(Long)})
+ * or group ({@link Consumer#group(Long)})
+ * @param strategy the polling strategy controlling where to start
reading
+ * @param count the maximum number of messages to return
+ * @param autoCommit whether the server should automatically commit the
consumer offset
+ * @return a {@link CompletableFuture} that completes with the {@link
PolledMessages}
+ * containing the retrieved messages and their metadata
*/
CompletableFuture<PolledMessages> pollMessages(
StreamId streamId,
@@ -59,16 +103,20 @@ public interface MessagesClient {
boolean autoCommit);
/**
- * Asynchronously polls messages from a topic (convenience method).
+ * Polls messages from a topic partition asynchronously using numeric
identifiers.
*
- * @param streamId the stream ID
- * @param topicId the topic ID
+ * <p>This is a convenience overload that accepts raw numeric IDs instead
of typed
+ * identifier objects. See {@link #pollMessages(StreamId, TopicId,
Optional, Consumer,
+ * PollingStrategy, Long, boolean)} for full documentation.
+ *
+ * @param streamId the numeric stream ID
+ * @param topicId the numeric topic ID
* @param partitionId optional partition ID
- * @param consumerId the consumer ID
- * @param strategy the polling strategy
- * @param count the number of messages to poll
- * @param autoCommit whether to auto-commit offsets
- * @return CompletableFuture that will complete with the polled messages
+ * @param consumerId the numeric consumer ID
+ * @param strategy the polling strategy
+ * @param count the maximum number of messages to return
+ * @param autoCommit whether to auto-commit offsets
+ * @return a {@link CompletableFuture} that completes with the {@link
PolledMessages}
*/
default CompletableFuture<PolledMessages> pollMessages(
Long streamId,
@@ -89,25 +137,43 @@ public interface MessagesClient {
}
/**
- * Asynchronously sends messages to a topic.
+ * Sends messages to a topic asynchronously.
*
- * @param streamId the stream identifier
- * @param topicId the topic identifier
- * @param partitioning the partitioning strategy
- * @param messages the messages to send
- * @return CompletableFuture that will complete when messages are sent
+ * <p>Messages are routed to partitions according to the specified {@link
Partitioning}
+ * strategy:
+ * <ul>
+ * <li>{@link Partitioning#balanced()} — round-robin distribution across
partitions</li>
+ * <li>{@link Partitioning#partitionId(Long)} — send to a specific
partition</li>
+ * <li>{@link Partitioning#messagesKey(String)} — hash-based routing
that guarantees
+ * messages with the same key always go to the same partition,
preserving order</li>
+ * </ul>
+ *
+ * <p>Messages are batched into a single network request for efficiency.
For high
+ * throughput, accumulate messages and send them in larger batches rather
than one
+ * at a time.
+ *
+ * @param streamId the stream identifier (numeric or string-based)
+ * @param topicId the topic identifier (numeric or string-based)
+ * @param partitioning the partitioning strategy for routing messages
+ * @param messages the list of messages to send
+ * @return a {@link CompletableFuture} that completes when all messages
have been
+ * acknowledged by the server
+ * @throws org.apache.iggy.exception.IggyException if the stream or topic
does not exist
*/
CompletableFuture<Void> sendMessages(
StreamId streamId, TopicId topicId, Partitioning partitioning,
List<Message> messages);
/**
- * Asynchronously sends messages to a topic (convenience method).
+ * Sends messages to a topic asynchronously using numeric identifiers.
+ *
+ * <p>This is a convenience overload that accepts raw numeric IDs. See
+ * {@link #sendMessages(StreamId, TopicId, Partitioning, List)} for full
documentation.
*
- * @param streamId the stream ID
- * @param topicId the topic ID
+ * @param streamId the numeric stream ID
+ * @param topicId the numeric topic ID
* @param partitioning the partitioning strategy
- * @param messages the messages to send
- * @return CompletableFuture that will complete when messages are sent
+ * @param messages the list of messages to send
+ * @return a {@link CompletableFuture} that completes when messages are
acknowledged
*/
default CompletableFuture<Void> sendMessages(
Long streamId, Long topicId, Partitioning partitioning,
List<Message> messages) {
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/StreamsClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/StreamsClient.java
index f2cb77e10..c55e0ba9d 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/StreamsClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/StreamsClient.java
@@ -27,27 +27,126 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+/**
+ * Async client interface for stream management operations.
+ *
+ * <p>Streams are the top-level organizational unit in Iggy. Each stream
contains
+ * topics, which in turn contain partitions that hold the actual messages. All
+ * methods return {@link CompletableFuture} for non-blocking execution.
+ *
+ * <p>Usage example:
+ * <pre>{@code
+ * StreamsClient streams = client.streams();
+ *
+ * // Create a stream
+ * streams.createStream("orders")
+ * .thenAccept(details -> System.out.println("Created stream: " +
details.id()));
+ *
+ * // List all streams
+ * streams.getStreams()
+ * .thenAccept(list -> list.forEach(s -> System.out.println(s.name())));
+ * }</pre>
+ *
+ * @see org.apache.iggy.client.async.tcp.AsyncIggyTcpClient#streams()
+ */
public interface StreamsClient {
+ /**
+ * Gets detailed information about a specific stream by its numeric ID.
+ *
+ * <p>This is a convenience overload that wraps the numeric ID into a
{@link StreamId}.
+ *
+ * @param streamId the numeric stream identifier
+ * @return a {@link CompletableFuture} that completes with an {@link
Optional} containing
+ * the {@link StreamDetails} if the stream exists, or empty if not
found
+ */
default CompletableFuture<Optional<StreamDetails>> getStream(Long
streamId) {
return getStream(StreamId.of(streamId));
}
+ /**
+ * Gets detailed information about a specific stream.
+ *
+ * <p>The returned {@link StreamDetails} includes the stream's metadata
(ID, name, size,
+ * creation time) as well as the list of topics within the stream.
+ *
+ * @param streamId the stream identifier (numeric or string-based)
+ * @return a {@link CompletableFuture} that completes with an {@link
Optional} containing
+ * the {@link StreamDetails} if the stream exists, or empty if not
found
+ */
CompletableFuture<Optional<StreamDetails>> getStream(StreamId streamId);
+ /**
+ * Gets a list of all streams on the server.
+ *
+ * <p>Returns basic information about each stream without topic details.
Use
+ * {@link #getStream(StreamId)} for full details about a specific stream.
+ *
+ * @return a {@link CompletableFuture} that completes with a list of
{@link StreamBase}
+ * objects for all streams
+ */
CompletableFuture<List<StreamBase>> getStreams();
+ /**
+ * Creates a new stream with the given name.
+ *
+ * <p>The stream ID is assigned by the server. Stream names must be unique
across
+ * the server.
+ *
+ * @param name the name for the new stream
+ * @return a {@link CompletableFuture} that completes with the created
{@link StreamDetails}
+ * @throws org.apache.iggy.exception.IggyException if a stream with the
same name
+ * already exists
+ */
CompletableFuture<StreamDetails> createStream(String name);
+ /**
+ * Updates the name of an existing stream identified by its numeric ID.
+ *
+ * <p>This is a convenience overload that wraps the numeric ID into a
{@link StreamId}.
+ *
+ * @param streamId the numeric stream identifier
+ * @param name the new name for the stream
+ * @return a {@link CompletableFuture} that completes when the update is
done
+ */
default CompletableFuture<Void> updateStream(Long streamId, String name) {
return updateStream(StreamId.of(streamId), name);
}
+ /**
+ * Updates the name of an existing stream.
+ *
+ * @param streamId the stream identifier (numeric or string-based)
+ * @param name the new name for the stream
+ * @return a {@link CompletableFuture} that completes when the update is
done
+ * @throws org.apache.iggy.exception.IggyException if the stream does not
exist
+ */
CompletableFuture<Void> updateStream(StreamId streamId, String name);
+ /**
+ * Deletes a stream and all of its topics, partitions, and messages by
numeric ID.
+ *
+ * <p>This is a convenience overload that wraps the numeric ID into a
{@link StreamId}.
+ *
+ * <p><strong>Warning:</strong> This operation is irreversible and will
permanently
+ * delete all data within the stream.
+ *
+ * @param streamId the numeric stream identifier
+ * @return a {@link CompletableFuture} that completes when the deletion is
done
+ */
default CompletableFuture<Void> deleteStream(Long streamId) {
return deleteStream(StreamId.of(streamId));
}
+ /**
+ * Deletes a stream and all of its topics, partitions, and messages.
+ *
+ * <p><strong>Warning:</strong> This operation is irreversible and will
permanently
+ * delete all data within the stream.
+ *
+ * @param streamId the stream identifier (numeric or string-based)
+ * @return a {@link CompletableFuture} that completes when the deletion is
done
+ * @throws org.apache.iggy.exception.IggyException if the stream does not
exist
+ */
CompletableFuture<Void> deleteStream(StreamId streamId);
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/TopicsClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/TopicsClient.java
index a976ad65c..fcd8761d6 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/TopicsClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/TopicsClient.java
@@ -31,38 +31,73 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
- * Async client for topic operations.
+ * Async client interface for topic management operations.
+ *
+ * <p>Topics exist within streams and contain one or more partitions that hold
the actual
+ * messages. Each topic has configurable properties including compression,
message expiry,
+ * maximum size, and replication factor.
+ *
+ * <p>Usage example:
+ * <pre>{@code
+ * TopicsClient topics = client.topics();
+ *
+ * // Create a topic with 3 partitions and no message expiry
+ * topics.createTopic(
+ * StreamId.of(1L), 3L, CompressionAlgorithm.none(),
+ * BigInteger.ZERO, BigInteger.ZERO, Optional.empty(), "events")
+ * .thenAccept(details -> System.out.println("Topic created: " +
details.name()));
+ *
+ * // List all topics in a stream
+ * topics.getTopics(StreamId.of(1L))
+ * .thenAccept(list -> list.forEach(t -> System.out.println(t.name())));
+ * }</pre>
+ *
+ * @see org.apache.iggy.client.async.tcp.AsyncIggyTcpClient#topics()
*/
public interface TopicsClient {
/**
- * Gets topic details by stream ID and topic ID.
+ * Gets detailed information about a specific topic.
*
- * @param streamId The stream identifier
- * @param topicId The topic identifier
- * @return CompletableFuture with Optional TopicDetails
+ * <p>The returned {@link TopicDetails} includes the topic's metadata,
configuration
+ * (compression, expiry, max size), and partition information.
+ *
+ * @param streamId the stream identifier containing the topic
+ * @param topicId the topic identifier
+ * @return a {@link CompletableFuture} that completes with an {@link
Optional} containing
+ * the {@link TopicDetails} if found, or empty if the topic does
not exist
*/
CompletableFuture<Optional<TopicDetails>> getTopic(StreamId streamId,
TopicId topicId);
/**
- * Gets all topics in a stream.
+ * Gets a list of all topics within a stream.
*
- * @param streamId The stream identifier
- * @return CompletableFuture with list of Topics
+ * @param streamId the stream identifier
+ * @return a {@link CompletableFuture} that completes with a list of
{@link Topic} objects
*/
CompletableFuture<List<Topic>> getTopics(StreamId streamId);
/**
- * Creates a new topic.
+ * Creates a new topic within a stream.
+ *
+ * <p>The topic is created with the specified number of partitions and
configuration.
+ * Partition count cannot be changed after creation, but additional
partitions can be
+ * added via the partitions API.
*
- * @param streamId The stream identifier
- * @param partitionsCount Number of partitions
- * @param compressionAlgorithm Compression algorithm to use
- * @param messageExpiry Message expiry time in microseconds
- * @param maxTopicSize Maximum topic size in bytes
- * @param replicationFactor Optional replication factor
- * @param name Topic name
- * @return CompletableFuture with created TopicDetails
+ * @param streamId the stream identifier to create the topic in
+ * @param partitionsCount the initial number of partitions (must be
at least 1)
+ * @param compressionAlgorithm the compression algorithm for stored
messages
+ * (e.g., {@link CompressionAlgorithm#None})
+ * @param messageExpiry message expiry time in microseconds; {@link
BigInteger#ZERO}
+ * means messages never expire
+ * @param maxTopicSize maximum topic size in bytes; {@link
BigInteger#ZERO}
+ * means unlimited
+ * @param replicationFactor optional replication factor for the topic;
if empty,
+ * the server default is used
+ * @param name the topic name (must be unique within the
stream)
+ * @return a {@link CompletableFuture} that completes with the created
{@link TopicDetails}
+ * @throws org.apache.iggy.exception.IggyException if the stream does not
exist or a
+ * topic with the same name already exists
*/
CompletableFuture<TopicDetails> createTopic(
StreamId streamId,
@@ -74,16 +109,20 @@ public interface TopicsClient {
String name);
/**
- * Updates an existing topic.
+ * Updates the configuration of an existing topic.
*
- * @param streamId The stream identifier
- * @param topicId The topic identifier
- * @param compressionAlgorithm Compression algorithm to use
- * @param messageExpiry Message expiry time in microseconds
- * @param maxTopicSize Maximum topic size in bytes
- * @param replicationFactor Optional replication factor
- * @param name Topic name
- * @return CompletableFuture that completes when update is done
+ * <p>This allows changing the topic's name, compression, expiry, size
limit, and
+ * replication factor. Partition count is not affected by this operation.
+ *
+ * @param streamId the stream identifier containing the topic
+ * @param topicId the topic identifier to update
+ * @param compressionAlgorithm the new compression algorithm
+ * @param messageExpiry the new message expiry in microseconds
+ * @param maxTopicSize the new maximum topic size in bytes
+ * @param replicationFactor optional new replication factor
+ * @param name the new topic name
+ * @return a {@link CompletableFuture} that completes when the update is
done
+ * @throws org.apache.iggy.exception.IggyException if the topic does not
exist
*/
CompletableFuture<Void> updateTopic(
StreamId streamId,
@@ -95,11 +134,15 @@ public interface TopicsClient {
String name);
/**
- * Deletes a topic.
+ * Deletes a topic and all of its partitions and messages.
+ *
+ * <p><strong>Warning:</strong> This operation is irreversible and will
permanently
+ * delete all messages within the topic.
*
- * @param streamId The stream identifier
- * @param topicId The topic identifier
- * @return CompletableFuture that completes when deletion is done
+ * @param streamId the stream identifier containing the topic
+ * @param topicId the topic identifier to delete
+ * @return a {@link CompletableFuture} that completes when the deletion is
done
+ * @throws org.apache.iggy.exception.IggyException if the topic does not
exist
*/
CompletableFuture<Void> deleteTopic(StreamId streamId, TopicId topicId);
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/UsersClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/UsersClient.java
index db39f7d2c..796e21ed6 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/UsersClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/UsersClient.java
@@ -24,23 +24,57 @@ import org.apache.iggy.user.IdentityInfo;
import java.util.concurrent.CompletableFuture;
/**
- * Async client for user management operations.
+ * Async client interface for user authentication operations.
+ *
+ * <p>Authentication is required before performing any data operations on the
server.
+ * The client must successfully log in before creating streams, sending
messages, or
+ * consuming data.
+ *
+ * <p>Usage example:
+ * <pre>{@code
+ * UsersClient users = client.users();
+ *
+ * // Login and chain subsequent operations
+ * users.login("iggy", "iggy")
+ * .thenAccept(identity -> System.out.println("Logged in as user: " +
identity.userId()))
+ * .exceptionally(ex -> {
+ * System.err.println("Login failed: " + ex.getMessage());
+ * return null;
+ * });
+ * }</pre>
+ *
+ * <p>For convenience, credentials can be provided at client construction time
and used
+ * with {@link org.apache.iggy.client.async.tcp.AsyncIggyTcpClient#login()},
or the
+ * builder's {@link
org.apache.iggy.client.async.tcp.AsyncIggyTcpClientBuilder#buildAndLogin()}
+ * method can handle connection and login in a single step.
+ *
+ * @see org.apache.iggy.client.async.tcp.AsyncIggyTcpClient#users()
+ * @see
org.apache.iggy.client.async.tcp.AsyncIggyTcpClientBuilder#buildAndLogin()
*/
public interface UsersClient {
/**
- * Logs in to the server with the specified username and password.
+ * Logs in to the Iggy server with the specified credentials.
+ *
+ * <p>A successful login returns the authenticated user's identity
information
+ * and authorizes the connection for subsequent operations. Each TCP
connection
+ * maintains its own authentication state.
*
- * @param username The username to login with
- * @param password The password to login with
- * @return A CompletableFuture that completes with the user's identity
information
+ * @param username the username to authenticate with
+ * @param password the password to authenticate with
+ * @return a {@link CompletableFuture} that completes with the user's
+ * {@link IdentityInfo} on success
+ * @throws org.apache.iggy.exception.IggyException if the credentials are
invalid
*/
CompletableFuture<IdentityInfo> login(String username, String password);
/**
- * Logs out from the server.
+ * Logs out from the Iggy server and invalidates the current session.
+ *
+ * <p>After logout, the connection remains open but no data operations can
be
+ * performed until the client logs in again.
*
- * @return A CompletableFuture that completes when logout is successful
+ * @return a {@link CompletableFuture} that completes when logout is
successful
*/
CompletableFuture<Void> logout();
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/package-info.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/package-info.java
new file mode 100644
index 000000000..b93950ff5
--- /dev/null
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/package-info.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Async client interfaces for Apache Iggy message streaming.
+ *
+ * <p>This package defines the async client API where all operations return
+ * {@link java.util.concurrent.CompletableFuture} for non-blocking execution.
+ * The interfaces decouple the API contract from transport-specific
implementations
+ * (see {@link org.apache.iggy.client.async.tcp} for the TCP/Netty
implementation).
+ *
+ * <h2>Core Interfaces</h2>
+ * <ul>
+ * <li>{@link org.apache.iggy.client.async.MessagesClient} — send and poll
messages</li>
+ * <li>{@link org.apache.iggy.client.async.StreamsClient} — manage
streams</li>
+ * <li>{@link org.apache.iggy.client.async.TopicsClient} — manage topics</li>
+ * <li>{@link org.apache.iggy.client.async.UsersClient} — authentication</li>
+ * <li>{@link org.apache.iggy.client.async.ConsumerGroupsClient} — consumer
group membership</li>
+ * </ul>
+ *
+ * <h2>Getting Started</h2>
+ * <pre>{@code
+ * var client = AsyncIggyTcpClient.builder()
+ * .host("localhost")
+ * .port(8090)
+ * .credentials("iggy", "iggy")
+ * .buildAndLogin()
+ * .join();
+ *
+ * client.messages().sendMessages(
+ * StreamId.of(1L), TopicId.of(1L),
+ * Partitioning.balanced(),
+ * List.of(Message.of("hello")))
+ * .join();
+ *
+ * client.close().join();
+ * }</pre>
+ *
+ * @see org.apache.iggy.client.async.tcp.AsyncIggyTcpClient
+ */
+package org.apache.iggy.client.async;
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
index c2017ddf1..4729f0145 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
@@ -35,8 +35,54 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
- * Async TCP client for Apache Iggy using Netty.
- * This is a true async implementation with non-blocking I/O.
+ * Async TCP client for Apache Iggy message streaming, built on Netty.
+ *
+ * <p>This client provides fully non-blocking I/O for communicating with an
Iggy server
+ * over TCP using the binary protocol. All operations return {@link
CompletableFuture}
+ * instances, enabling efficient concurrent and reactive programming patterns.
+ *
+ * <h2>Lifecycle</h2>
+ * <p>The client follows a three-phase lifecycle:
+ * <ol>
+ * <li><strong>Build</strong> — configure the client via {@link #builder()}
or
+ * {@link org.apache.iggy.Iggy#tcpClientBuilder()}</li>
+ * <li><strong>Connect</strong> — establish the TCP connection with {@link
#connect()}</li>
+ * <li><strong>Login</strong> — authenticate with {@link #login()} or
+ * {@link UsersClient#login(String, String)}</li>
+ * </ol>
+ *
+ * <h2>Quick Start</h2>
+ * <pre>{@code
+ * // One-liner: build, connect, and login
+ * var client = AsyncIggyTcpClient.builder()
+ * .host("localhost")
+ * .port(8090)
+ * .credentials("iggy", "iggy")
+ * .buildAndLogin()
+ * .join();
+ *
+ * // Send a message
+ * client.messages().sendMessages(
+ * StreamId.of(1L), TopicId.of(1L),
+ * Partitioning.balanced(),
+ * List.of(Message.of("hello world")))
+ * .join();
+ *
+ * // Always close when done
+ * client.close().join();
+ * }</pre>
+ *
+ * <h2>Thread Safety</h2>
+ * <p>This client is thread-safe. Multiple threads can invoke operations
concurrently;
+ * the underlying Netty event loop serializes writes to the TCP connection
while
+ * response handling is performed asynchronously.
+ *
+ * <h2>Resource Management</h2>
+ * <p>Always call {@link #close()} when the client is no longer needed. This
shuts down
+ * the Netty event loop group and releases all associated resources.
+ *
+ * @see AsyncIggyTcpClientBuilder
+ * @see org.apache.iggy.Iggy#tcpClientBuilder()
*/
public class AsyncIggyTcpClient {
@@ -57,6 +103,14 @@ public class AsyncIggyTcpClient {
private TopicsClient topicsClient;
private UsersClient usersClient;
+ /**
+ * Creates a new async TCP client with default settings.
+ *
+ * <p>Prefer using {@link #builder()} for configuring the client.
+ *
+ * @param host the server hostname
+ * @param port the server port
+ */
public AsyncIggyTcpClient(String host, int port) {
this(host, port, null, null, null, null, null, null, false,
Optional.empty());
}
@@ -86,9 +140,9 @@ public class AsyncIggyTcpClient {
}
/**
- * Creates a new builder for configuring AsyncIggyTcpClient.
+ * Creates a new builder for configuring an {@code AsyncIggyTcpClient}.
*
- * @return a new Builder instance
+ * @return a new {@link AsyncIggyTcpClientBuilder} instance
*/
public static AsyncIggyTcpClientBuilder builder() {
return new AsyncIggyTcpClientBuilder();
@@ -97,7 +151,11 @@ public class AsyncIggyTcpClient {
/**
* Connects to the Iggy server asynchronously.
*
- * @return a CompletableFuture that completes when connected
+ * <p>This establishes the TCP connection using Netty's non-blocking I/O.
After the
+ * returned future completes, the sub-clients ({@link #messages()}, {@link
#streams()},
+ * etc.) become available. You must call this before performing any
operations.
+ *
+ * @return a {@link CompletableFuture} that completes when the connection
is established
*/
public CompletableFuture<Void> connect() {
connection = new AsyncTcpConnection(host, port, enableTls,
tlsCertificate);
@@ -113,9 +171,15 @@ public class AsyncIggyTcpClient {
/**
* Logs in using the credentials provided during client construction.
*
- * @return a CompletableFuture that completes when logged in
- * @throws IggyMissingCredentialsException if no credentials were provided
- * @throws IggyNotConnectedException if client is not connected
+ * <p>Credentials must have been set via
+ * {@link AsyncIggyTcpClientBuilder#credentials(String, String)} when
building
+ * the client. For explicit credential handling, use
+ * {@link UsersClient#login(String, String)} instead.
+ *
+ * @return a {@link CompletableFuture} that completes with the user's
+ * {@link IdentityInfo} on success
+ * @throws IggyMissingCredentialsException if no credentials were provided
at build time
+ * @throws IggyNotConnectedException if {@link #connect()} has not
been called
*/
public CompletableFuture<IdentityInfo> login() {
if (usersClient == null) {
@@ -128,7 +192,10 @@ public class AsyncIggyTcpClient {
}
/**
- * Gets the async users client.
+ * Returns the async users client for authentication operations.
+ *
+ * @return the {@link UsersClient} instance
+ * @throws IggyNotConnectedException if the client is not connected
*/
public UsersClient users() {
if (usersClient == null) {
@@ -138,7 +205,10 @@ public class AsyncIggyTcpClient {
}
/**
- * Gets the async messages client.
+ * Returns the async messages client for producing and consuming messages.
+ *
+ * @return the {@link MessagesClient} instance
+ * @throws IggyNotConnectedException if the client is not connected
*/
public MessagesClient messages() {
if (messagesClient == null) {
@@ -148,7 +218,10 @@ public class AsyncIggyTcpClient {
}
/**
- * Gets the async consumer groups client.
+ * Returns the async consumer groups client for group membership
management.
+ *
+ * @return the {@link ConsumerGroupsClient} instance
+ * @throws IggyNotConnectedException if the client is not connected
*/
public ConsumerGroupsClient consumerGroups() {
if (consumerGroupsClient == null) {
@@ -158,7 +231,10 @@ public class AsyncIggyTcpClient {
}
/**
- * Gets the async streams client.
+ * Returns the async streams client for stream management.
+ *
+ * @return the {@link StreamsClient} instance
+ * @throws IggyNotConnectedException if the client is not connected
*/
public StreamsClient streams() {
if (streamsClient == null) {
@@ -168,7 +244,10 @@ public class AsyncIggyTcpClient {
}
/**
- * Gets the async topics client.
+ * Returns the async topics client for topic management.
+ *
+ * @return the {@link TopicsClient} instance
+ * @throws IggyNotConnectedException if the client is not connected
*/
public TopicsClient topics() {
if (topicsClient == null) {
@@ -178,7 +257,12 @@ public class AsyncIggyTcpClient {
}
/**
- * Closes the connection and releases resources.
+ * Closes the TCP connection and releases all Netty resources.
+ *
+ * <p>This shuts down the event loop group gracefully. After calling this
method,
+ * the client cannot be reused — create a new instance if needed.
+ *
+ * @return a {@link CompletableFuture} that completes when all resources
are released
*/
public CompletableFuture<Void> close() {
if (connection != null) {
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/package-info.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/package-info.java
new file mode 100644
index 000000000..261a1cbd4
--- /dev/null
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/package-info.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Netty-based TCP implementation of the async Iggy client.
+ *
+ * <p>This package provides the concrete implementation of the async client
interfaces
+ * using Netty for non-blocking TCP communication with the Iggy server binary
protocol.
+ *
+ * <h2>Key Classes</h2>
+ * <ul>
+ * <li>{@link org.apache.iggy.client.async.tcp.AsyncIggyTcpClient} — main
client entry
+ * point; provides access to all sub-clients</li>
+ * <li>{@link org.apache.iggy.client.async.tcp.AsyncIggyTcpClientBuilder} —
fluent builder
+ * for configuring and constructing the client</li>
+ * <li>{@link org.apache.iggy.client.async.tcp.AsyncTcpConnection} — manages
the Netty
+ * channel, request serialization, and response correlation</li>
+ * </ul>
+ *
+ * <h2>Protocol Details</h2>
+ * <p>The Iggy binary protocol uses a simple framing scheme:
+ * <ul>
+ * <li><strong>Request:</strong> {@code [payload_size:4 LE][command:4
LE][payload:N]}</li>
+ * <li><strong>Response:</strong> {@code [status:4 LE][length:4
LE][payload:N]}</li>
+ * </ul>
+ * <p>Responses are matched to requests in FIFO order (the protocol does not
include
+ * request IDs). The {@link
org.apache.iggy.client.async.tcp.AsyncTcpConnection}
+ * serializes all writes through Netty's event loop to maintain ordering.
+ *
+ * @see org.apache.iggy.client.async.tcp.AsyncIggyTcpClient
+ */
+package org.apache.iggy.client.async.tcp;