This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new e3b213e7 [blog] Fluss Java Client Guide (#1253)
e3b213e7 is described below
commit e3b213e7e1ac2dbe6a1072d5878abfac984d3dbf
Author: Giannis Polyzos <[email protected]>
AuthorDate: Thu Jul 10 15:23:30 2025 +0300
[blog] Fluss Java Client Guide (#1253)
---
website/blog/2025-07-07-fluss-java-client.md | 403 +++++++++++++++++++++++++++
website/blog/assets/java_client/banner.png | Bin 0 -> 3143146 bytes
2 files changed, 403 insertions(+)
diff --git a/website/blog/2025-07-07-fluss-java-client.md
b/website/blog/2025-07-07-fluss-java-client.md
new file mode 100644
index 00000000..22bfda8f
--- /dev/null
+++ b/website/blog/2025-07-07-fluss-java-client.md
@@ -0,0 +1,403 @@
+---
+slug: fluss-java-client
+title: "Apache Fluss Java Client: A Deep Dive"
+authors: [giannis]
+---
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+
+
+## Introduction
+Apache Fluss is a streaming data storage system built for real-time analytics,
serving as a low-latency data layer in modern data Lakehouses.
+It supports sub-second streaming reads and writes, storing data in a columnar
format for efficiency, and offers two flexible table types: **append-only Log
Tables** and **updatable Primary Key Tables**.
+In practice, this means Fluss can ingest high-throughput event streams *(using
log tables)* while also maintaining *up-to-date* reference data or state
*(using primary key tables)*, a combination ideal for
+scenarios like IoT, where you might stream sensor readings and look up
information for those sensors in real-time, without
+the need for external K/V stores.
+<!-- truncate -->
+
+In this tutorial, we'll introduce the **Fluss Java Client** by walking through
a simple home IoT system example.
+We will use `Fluss's Admin client` to create a primary key table for sensor
information and a log table for sensor readings, then use the client
+to write data to these tables and read/enrich the streaming sensor data.
+
+By the end, you'll see how a sensor reading can be ingested into a log table
and immediately enriched with information from a primary key table (essentially
performing a real-time lookup join for streaming data enrichment).
+
+## Preflight Check
+The full source code can be found
[here](https://github.com/ververica/ververica-fluss-examples/tree/main/fluss-java-client).
+
+```shell
+docker compose up
+```
+
+The first thing we need to do is establish a connection to the Fluss cluster.
+The `Connection` is the main entry point for the Fluss client, from which we
obtain an `Admin` (for metadata operations) and Table instances (for data
operations)
+
+```java
+// Configure connection to Fluss cluster
+Configuration conf = new Configuration();
+conf.setString("bootstrap.servers", "localhost:9123"); // Fluss server
endpoint
+Connection connection = ConnectionFactory.createConnection(conf);
+
+// Get Admin client for managing databases and tables
+Admin admin = connection.getAdmin();
+```
+The above code snippet shows the bare minimum requirements for connecting and
interacting with a Fluss Cluster.
+For our example we will use the following mock data - to keep things simple -
which you can find below:
+```java
+public static final List<SensorReading> readings = List.of(
+ new SensorReading(1, LocalDateTime.of(2025, 6, 23, 9, 15), 22.5, 45.0,
1013.2, 87.5),
+ new SensorReading(2, LocalDateTime.of(2025, 6, 23, 9, 30), 23.1, 44.5,
1013.1, 88.0),
+ new SensorReading(3, LocalDateTime.of(2025, 6, 23, 9, 45), 21.8, 46.2,
1012.9, 86.9),
+ new SensorReading(4, LocalDateTime.of(2025, 6, 23, 10, 0), 24.0, 43.8,
1013.5, 89.2),
+ new SensorReading(5, LocalDateTime.of(2025, 6, 23, 10, 15), 22.9,
45.3, 1013.0, 87.8),
+ new SensorReading(6, LocalDateTime.of(2025, 6, 23, 10, 30), 23.4,
44.9, 1013.3, 88.3),
+ new SensorReading(7, LocalDateTime.of(2025, 6, 23, 10, 45), 21.7,
46.5, 1012.8, 86.5),
+ new SensorReading(8, LocalDateTime.of(2025, 6, 23, 11, 0), 24.2, 43.5,
1013.6, 89.5),
+ new SensorReading(9, LocalDateTime.of(2025, 6, 23, 11, 15), 23.0,
45.1, 1013.2, 87.9),
+ new SensorReading(10, LocalDateTime.of(2025, 6, 23, 11, 30), 22.6,
45.7, 1013.0, 87.4)
+);
+```
+
+```java
+public static final List<SensorInfo> sensorInfos = List.of(
+ new SensorInfo(1, "Outdoor Temp Sensor", "Temperature", "Roof",
LocalDate.of(2024, 1, 15), "OK", LocalDateTime.of(2025, 6, 23, 9, 15)),
+ new SensorInfo(2, "Main Lobby Sensor", "Humidity", "Lobby",
LocalDate.of(2024, 2, 20), "ERROR", LocalDateTime.of(2025, 6, 23, 9, 30)),
+ new SensorInfo(3, "Server Room Sensor", "Temperature", "Server Room",
LocalDate.of(2024, 3, 10), "MAINTENANCE", LocalDateTime.of(2025, 6, 23, 9, 45)),
+ new SensorInfo(4, "Warehouse Sensor", "Pressure", "Warehouse",
LocalDate.of(2024, 4, 5), "OK", LocalDateTime.of(2025, 6, 23, 10, 0)),
+ new SensorInfo(5, "Conference Room Sensor", "Humidity", "Conference
Room", LocalDate.of(2024, 5, 25), "OK", LocalDateTime.of(2025, 6, 23, 10, 15)),
+ new SensorInfo(6, "Office 1 Sensor", "Temperature", "Office 1",
LocalDate.of(2024, 6, 18), "LOW_BATTERY", LocalDateTime.of(2025, 6, 23, 10,
30)),
+ new SensorInfo(7, "Office 2 Sensor", "Humidity", "Office 2",
LocalDate.of(2024, 7, 12), "OK", LocalDateTime.of(2025, 6, 23, 10, 45)),
+ new SensorInfo(8, "Lab Sensor", "Temperature", "Lab",
LocalDate.of(2024, 8, 30), "ERROR", LocalDateTime.of(2025, 6, 23, 11, 0)),
+ new SensorInfo(9, "Parking Lot Sensor", "Pressure", "Parking Lot",
LocalDate.of(2024, 9, 14), "OK", LocalDateTime.of(2025, 6, 23, 11, 15)),
+ new SensorInfo(10, "Backyard Sensor", "Temperature", "Backyard",
LocalDate.of(2024, 10, 3), "OK", LocalDateTime.of(2025, 6, 23, 11, 30)),
+
+ // SEND SOME UPDATES
+ new SensorInfo(2, "Main Lobby Sensor", "Humidity", "Lobby",
LocalDate.of(2024, 2, 20), "ERROR", LocalDateTime.of(2025, 6, 23, 9, 48)),
+ new SensorInfo(8, "Lab Sensor", "Temperature", "Lab",
LocalDate.of(2024, 8, 30), "ERROR", LocalDateTime.of(2025, 6, 23, 11, 16))
+);
+```
+
+## Operating The Cluster
+Let's create a database for our IoT data, and within it define two tables:
+* **Sensor Readings Table:** A log table that will collect time-series
readings from sensors (like temperature and humidity readings). This table is
append-only (new records are added continuously, with no updates/deletes) which
is ideal for immutable event streams
+* **Sensor Information Table:** A primary key table that stores metadata for
each sensor (like sensor ID, location, type). Each `sensorId` will be unique
and acts as the primary key. This table can be updated as sensor info changes
(e.g., sensor relocated or reconfigured).
+
+Using the Admin client, we can programmatically create these tables.
+
+First, we'll ensure the database exists (creating it if not), then define
schemas for each table and create them:
+
+### Schema Definitions
+#### Log table (sensor readings)
+
+```java
+public static Schema getSensorReadingsSchema() {
+ return Schema.newBuilder()
+ .column("sensorId", DataTypes.INT())
+ .column("timestamp", DataTypes.TIMESTAMP())
+ .column("temperature", DataTypes.DOUBLE())
+ .column("humidity", DataTypes.DOUBLE())
+ .column("pressure", DataTypes.DOUBLE())
+ .column("batteryLevel", DataTypes.DOUBLE())
+ .build();
+}
+```
+
+#### Primary Key table (sensor information)
+```java
+public static Schema getSensorInfoSchema() {
+ return Schema.newBuilder()
+ .column("sensorId", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("type", DataTypes.STRING())
+ .column("location", DataTypes.STRING())
+ .column("installationDate", DataTypes.DATE())
+ .column("state", DataTypes.STRING())
+ .column("lastUpdated", DataTypes.TIMESTAMP())
+ .primaryKey("sensorId") <-- Define a Primary Key
+ .build();
+}
+```
+
+### Table Creation
+```java
+public static void setupTables(Admin admin) throws ExecutionException,
InterruptedException {
+ TableDescriptor readingsDescriptor = TableDescriptor.builder()
+ .schema(getSensorReadingsSchema())
+ .distributedBy(3, "sensorId")
+ .comment("This is the sensor readings table")
+ .build();
+
+ // drop the tables or ignore if they exist
+ admin.dropTable(readingsTablePath, true).get();
+ admin.dropTable(sensorInfoTablePath, true).get();
+
+ admin.createTable(readingsTablePath, readingsDescriptor, true).get();
+
+ TableDescriptor sensorInfoDescriptor = TableDescriptor.builder()
+ .schema(getSensorInfoSchema())
+ .distributedBy(3, "sensorId")
+ .comment("This is the sensor information table")
+ .build();
+
+ admin.createTable(sensorInfoTablePath, sensorInfoDescriptor, true).get();
+}
+```
+We specify a distribution with `.distributedBy(3, "sensorId")`.
+Fluss tables are partitioned into buckets (similar to partitions in Kafka
topics) for scalability.
+Here we use 3 buckets, meaning data gets distributed across 3 buckets.
Multiple buckets allow for higher throughput or to parallelize reads/writes.
+If using multiple buckets, Fluss would hash on the bucket key (`sensorId` in
our case) to assign records to buckets.
+
+For the `sensor_readings` table, we define a schema without any primary key.
In Fluss, a table created without a primary key clause is a Log Table.
+A log table only supports appending new records (no updates or deletes),
making it perfect for immutable time-series data or logs.
+
+In the log table, specifying a bucket key like `sensorId` ensures all readings
from the same sensor end up to the same bucket providing strict ordering
guarantees.
+
+With our tables created let's go and write some data.
+
+## Table Writes
+With our tables in place, let's insert some data using the Fluss Java API.
+The client allows us to write or read data from it.
+We'll demonstrate two patterns:
+* **Upserting** into the primary key table (sensor information).
+* **Appending** to the log table (sensor readings).
+
+Fluss provides specialized writer interfaces for each table type: an
**UpsertWriter** for primary key tables and an **AppendWriter** for log tables.
+Under the hood, the Fluss client currently expects data as **GenericRow**
objects (a generic row data format).
+
+> **Note:** Internally Fluss uses **InternalRow** as an optimized, binary
representation of data for better performance and memory efficiency.
+> **GenericRow** is a generic implementation of InternalRow. This allows
developers to interact with data easily while Fluss processes it efficiently
using the underlying binary format.
+
+Since we are creating **Pojos** though this means that we need to convert
these into a GenericRow in order to write them into Fluss.
+
+```java
+public static GenericRow energyReadingToRow(SensorReading reading) {
+ GenericRow row = new
GenericRow(SensorReading.class.getDeclaredFields().length);
+ row.setField(0, reading.sensorId());
+ row.setField(1, TimestampNtz.fromLocalDateTime(reading.timestamp()));
+ row.setField(2, reading.temperature());
+ row.setField(3, reading.humidity());
+ row.setField(4, reading.pressure());
+ row.setField(5, reading.batteryLevel());
+ return row;
+}
+public static GenericRow sensorInfoToRow(SensorInfo sensorInfo) {
+ GenericRow row = new
GenericRow(SensorInfo.class.getDeclaredFields().length);
+ row.setField(0, sensorInfo.sensorId());
+ row.setField(1, BinaryString.fromString(sensorInfo.name()));
+ row.setField(2, BinaryString.fromString(sensorInfo.type()));
+ row.setField(3, BinaryString.fromString(sensorInfo.location()));
+ row.setField(4, (int) sensorInfo.installationDate().toEpochDay());
+ row.setField(5, BinaryString.fromString(sensorInfo.state()));
+ row.setField(6, TimestampNtz.fromLocalDateTime(sensorInfo.lastUpdated()));
+ return row;
+}
+```
+**Note:** For certain data types like `String` or `LocalDateTime` we need to
use certain functions like
+`BinaryString.fromString("string_value")` or
`TimestampNtz.fromLocalDateTime(datetime)` otherwise you might
+come across some conversion exceptions.
+
+Let's start by writing data to the `Log Table`. This requires getting an
`AppendWriter` as follows:
+
+```java
+logger.info("Creating table writer for table {} ...",
AppUtils.SENSOR_READINGS_TBL);
+Table table = connection.getTable(AppUtils.getSensorReadingsTablePath());
+AppendWriter writer = table.newAppend().createWriter();
+
+AppUtils.readings.forEach(reading -> {
+ GenericRow row = energyReadingToRow(reading);
+ writer.append(row);
+});
+writer.flush();
+
+logger.info("Sensor Readings Written Successfully.");
+```
+At this point we have successfully written 10 sensor readings to our table.
+
+
+Next, let's write data to the `Primary Key Table`. This requires getting an
`UpsertWriter` as follows:
+```java
+logger.info("Creating table writer for table {} ...",
AppUtils.SENSOR_INFORMATION_TBL);
+Table sensorInfoTable = connection.getTable(AppUtils.getSensorInfoTablePath());
+UpsertWriter upsertWriter = sensorInfoTable.newUpsert().createWriter();
+
+AppUtils.sensorInfos.forEach(sensorInfo -> {
+ GenericRow row = sensorInfoToRow(sensorInfo);
+ upsertWriter.upsert(row);
+});
+
+upsertWriter.flush();
+```
+At this point we have successfully written 10 sensor information records to
our table, because
+updates will be handled on the primary key and merged.
+
+## Scans & Lookups
+Now comes the real-time data enrichment part of our example.
+We want to simulate a process where each incoming sensor reading is
immediately looked up against the sensor information table to add context (like
location and type) to the raw reading.
+This is a common pattern in streaming systems, often achieved with lookup
joins.
+
+With the Fluss Java client, we can do this by combining a **log scanner on the
readings table** with **point lookups on the sensor information table**.
+
+To consume data from a Fluss table, we use a **Scanner*.
+For a log table, Fluss provides a **LogScanner** that allows us to **subscribe
to one or more buckets** and poll for new records.
+
+```java
+LogScanner logScanner = readingsTable.newScan()
+ .createLogScanner();
+```
+
+```java
+Lookuper sensorInforLookuper = sensorInfoTable
+ .newLookup()
+ .createLookuper();
+```
+
+We set up a scanner on the `sensor_readings` table, and next we need to
subscribe to all its buckets, and then poll for any available records:
+```java
+int numBuckets = readingsTable.getTableInfo().getNumBuckets();
+for (int i = 0; i < numBuckets; i++) {
+ logger.info("Subscribing to Bucket {}.", i);
+ logScanner.subscribeFromBeginning(i);
+}
+```
+
+Start polling for records. For each incoming record we will use the
**Lookuper** to `lookup` sensor information from the primary key table,
+and creating a **SensorReadingEnriched** record.
+```java
+ while (true) {
+ logger.info("Polling for records...");
+ ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
+ for (TableBucket bucket : scanRecords.buckets()) {
+ for (ScanRecord record : scanRecords.records(bucket)) {
+ InternalRow row = record.getRow();
+
+ logger.info("Received reading from sensor '{}' at '{}'.",
row.getInt(0), row.getTimestampNtz(1, 6).toString());
+ logger.info("Performing lookup to get the information for sensor
'{}'. ", row.getInt(0));
+ LookupResult lookupResult = sensorInforLookuper.lookup(row).get();
+ SensorInfo sensorInfo = lookupResult.getRowList().stream().map(r
-> new SensorInfo(
+ r.getInt(0),
+ r.getString(1).toString(),
+ r.getString(2).toString(),
+ r.getString(3).toString(),
+ LocalDate.ofEpochDay(r.getInt(4)),
+ r.getString(5).toString(),
+ LocalDateTime.parse(r.getTimestampNtz(6, 6).toString(),
formatter)
+ )).findFirst().get();
+ logger.info("Retrieved information for '{}' with id: {}",
sensorInfo.name(), sensorInfo.sensorId());
+
+ SensorReading reading = new SensorReading(
+ row.getInt(0),
+ LocalDateTime.parse(row.getTimestampNtz(1, 6).toString(),
formatter),
+ row.getDouble(2),
+ row.getDouble(3),
+ row.getDouble(4),
+ row.getDouble(5)
+ );
+
+ SensorReadingEnriched readingEnriched = new SensorReadingEnriched(
+ reading.sensorId(),
+ reading.timestamp(),
+ reading.temperature(),
+ reading.humidity(),
+ reading.pressure(),
+ reading.batteryLevel(),
+ sensorInfo.name(),
+ sensorInfo.type(),
+ sensorInfo.location(),
+ sensorInfo.state()
+ );
+ logger.info("Bucket: {} - {}", bucket, readingEnriched);
+ logger.info("---------------------------------------");
+ }
+ }
+}
+```
+Let's summarize what's happening here:
+* We create a LogScanner for the `sensor_readings` table using
*table.newScan().createLogScanner()*.
+* We subscribe to each bucket of the table from the beginning (offset 0).
Subscribing `from beginning` means we'll read all existing data from the start;
alternatively, one could subscribe from the latest position to only get new
incoming data or based on other attributes like time. In our case, since we
just inserted data, from-beginning will capture those inserts.
+* We then call `poll(Duration)` on the scanner to retrieve available records,
waiting up to the given timeout (1 second here). This returns a `ScanRecords`
batch containing any records that were present. We iterate over each
`TableBucket` and then over each `ScanRecord` within that bucket.
+* For each record, we extract the fields via the InternalRow interface (which
provides typed access to each column in the row) and **convert them into a
Pojo**.
+* Next, for each reading, we perform a **lookup** on the
**sensor_information** table to get the sensor's info. We construct a key
(GenericRow with just the sensor_id) and use
**sensorTable.newLookup().createLookuper().lookup(key)**. This performs a point
lookup by primary key and returns a `LookupResult future`; we call `.get()` to
get the result synchronously. If present, we retrieve the InternalRow of the
sensor information and **convert it into a Pojo**.
+* We then combine the data: logging an enriched message that includes the
sensor's information alongside the reading values.
+
+Fluss's lookup API gives us quick primary-key retrieval from a table, which is
exactly what we need to enrich the streaming data.
+In a real application, this enrichment could be done on the fly in a streaming
job (and indeed **Fluss is designed to support high-QPS lookup joins in
real-time pipelines**), but here we're simulating it with client calls for
clarity.
+
+If you run the above code found
[here](https://github.com/ververica/ververica-fluss-examples), you should see
an output like the following:
+```shell
+16:07:13.594 INFO [DownloadRemoteLog-[sensors_db.sensor_readings_tbl]]
c.a.f.c.t.s.l.RemoteLogDownloader$DownloadRemoteLogThread - Starting
+16:07:13.599 INFO [main] com.ververica.scanner.FlussScanner - Subscribing to
Bucket 0.
+16:07:13.599 INFO [main] com.ververica.scanner.FlussScanner - Subscribing to
Bucket 1.
+16:07:13.600 INFO [main] com.ververica.scanner.FlussScanner - Subscribing to
Bucket 2.
+16:07:13.600 INFO [main] com.ververica.scanner.FlussScanner - Polling for
records...
+16:07:13.965 INFO [main] com.ververica.scanner.FlussScanner - Received
reading from sensor '3' at '2025-06-23T09:45'.
+16:07:13.966 INFO [main] com.ververica.scanner.FlussScanner - Performing
lookup to get the information for sensor '3'.
+16:07:14.032 INFO [main] com.ververica.scanner.FlussScanner - Retrieved
information for 'Server Room Sensor' with id: 3
+16:07:14.033 INFO [main] com.ververica.scanner.FlussScanner - Bucket:
TableBucket{tableId=2, bucket=1} - SensorReadingEnriched[sensorId=3,
timestamp=2025-06-23T09:45, temperature=21.8, humidity=46.2, pressure=1012.9,
batteryLevel=86.9, name=Server Room Sensor, type=Temperature, location=Server
Room, state=MAINTENANCE]
+16:07:14.045 INFO [main] com.ververica.scanner.FlussScanner -
---------------------------------------
+16:07:14.046 INFO [main] com.ververica.scanner.FlussScanner - Received
reading from sensor '4' at '2025-06-23T10:00'.
+16:07:14.046 INFO [main] com.ververica.scanner.FlussScanner - Performing
lookup to get the information for sensor '4'.
+16:07:14.128 INFO [main] com.ververica.scanner.FlussScanner - Retrieved
information for 'Warehouse Sensor' with id: 4
+16:07:14.128 INFO [main] com.ververica.scanner.FlussScanner - Bucket:
TableBucket{tableId=2, bucket=1} - SensorReadingEnriched[sensorId=4,
timestamp=2025-06-23T10:00, temperature=24.0, humidity=43.8, pressure=1013.5,
batteryLevel=89.2, name=Warehouse Sensor, type=Pressure, location=Warehouse,
state=OK]
+16:07:14.129 INFO [main] com.ververica.scanner.FlussScanner -
---------------------------------------
+16:07:14.129 INFO [main] com.ververica.scanner.FlussScanner - Received
reading from sensor '8' at '2025-06-23T11:00'.
+16:07:14.129 INFO [main] com.ververica.scanner.FlussScanner - Performing
lookup to get the information for sensor '8'.
+16:07:14.229 INFO [main] com.ververica.scanner.FlussScanner - Retrieved
information for 'Lab Sensor' with id: 8
+16:07:14.229 INFO [main] com.ververica.scanner.FlussScanner - Bucket:
TableBucket{tableId=2, bucket=1} - SensorReadingEnriched[sensorId=8,
timestamp=2025-06-23T11:00, temperature=24.2, humidity=43.5, pressure=1013.6,
batteryLevel=89.5, name=Lab Sensor, type=Temperature, location=Lab, state=ERROR]
+16:07:14.229 INFO [main] com.ververica.scanner.FlussScanner -
---------------------------------------
+```
+
+## Column Pruning Scans
+Column pruning lets you fetch only the columns you need, **reducing network
overhead and improving read performance**. With Fluss’s Java client, you can
specify a subset of columns in your scan:
+```java
+LogScanner logScanner = readingsTable.newScan()
+ .project(List.of("sensorId", "timestamp", "temperature"))
+ .createLogScanner();
+```
+
+Let's break this down:
+* `.project(...)` instructs the client to request only the specified columns
(sensorId,timestamp and temperature) from the server.
+* Fluss’s columnar storage means non-requested columns (e.g., humidity, etc.)
**aren’t transmitted, saving bandwidth and reducing client-side parsing
overhead**.
+* You can combine projection with filters or lookups to further optimize your
data access patterns.
+
+Example output:
+```shell
+16:12:35.114 INFO [main] com.ververica.scanner.FlussScanner - Subscribing to
Bucket 0.
+16:12:35.114 INFO [main] com.ververica.scanner.FlussScanner - Subscribing to
Bucket 1.
+16:12:35.114 INFO [main] com.ververica.scanner.FlussScanner - Subscribing to
Bucket 2.
+16:12:35.114 INFO [main] com.ververica.scanner.FlussScanner - Polling for
records...
+16:12:35.171 INFO [main] com.ververica.scanner.FlussScanner - Bucket:
TableBucket{tableId=2, bucket=1} - (3,2025-06-23T09:45,21.8)
+16:12:35.172 INFO [main] com.ververica.scanner.FlussScanner -
---------------------------------------
+16:12:35.172 INFO [main] com.ververica.scanner.FlussScanner - Bucket:
TableBucket{tableId=2, bucket=1} - (4,2025-06-23T10:00,24.0)
+16:12:35.172 INFO [main] com.ververica.scanner.FlussScanner -
---------------------------------------
+16:12:35.172 INFO [main] com.ververica.scanner.FlussScanner - Bucket:
TableBucket{tableId=2, bucket=1} - (8,2025-06-23T11:00,24.2)
+16:12:35.172 INFO [main] com.ververica.scanner.FlussScanner -
---------------------------------------
+16:12:35.172 INFO [main] com.ververica.scanner.FlussScanner - Bucket:
TableBucket{tableId=2, bucket=1} - (10,2025-06-23T11:30,22.6)
+```
+Notice, how only the requested columns are returned from the server.
+
+## Conclusion
+In this blog post, we've introduced the Fluss Java Client by guiding you
through a full example of creating tables, writing data, and reading/enriching
data in real-time.
+We covered how to use the `Admin` client to define a **Primary Key table**
(for reference data that can be updated) and a **Log table** (for immutable
event streams), and how to use the Fluss client to upsert and append data
accordingly.
+We also demonstrated reading from a log table using a scanner and performing a
lookup on a primary key table to enrich the streaming data on the fly.
+
+This IoT sensor scenario is just one example of Fluss in action and also
highlights the **Stream/Table duality** within the same system.
+Fluss's ability to handle high-throughput append streams and fast key-based
lookups makes it well-suited for real-time analytics use cases like this and
many others.
+With this foundation, you can explore more advanced features of Fluss to build
robust real-time data applications. Happy streaming! 🌊
+
+And before you go 😊 don’t forget to give Fluss 🌊 some ❤️ via ⭐ on
[GitHub](https://github.com/alibaba/fluss)
diff --git a/website/blog/assets/java_client/banner.png
b/website/blog/assets/java_client/banner.png
new file mode 100644
index 00000000..0eb13af7
Binary files /dev/null and b/website/blog/assets/java_client/banner.png differ