This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 07847f5ed [Docs] Add Iceberg quickstart guide (#1800)
07847f5ed is described below
commit 07847f5ed43334a18b136b342484d26004f80359
Author: MehulBatra <[email protected]>
AuthorDate: Thu Oct 16 07:04:40 2025 +0530
[Docs] Add Iceberg quickstart guide (#1800)
---------
Co-authored-by: luoyuxia <[email protected]>
---
fluss-dist/src/main/assemblies/plugins.xml | 1 -
.../docs/quickstart/{flink.md => flink-iceberg.md} | 181 +++++++++++----------
website/docs/quickstart/flink.md | 6 +-
3 files changed, 101 insertions(+), 87 deletions(-)
diff --git a/fluss-dist/src/main/assemblies/plugins.xml
b/fluss-dist/src/main/assemblies/plugins.xml
index e41d79006..b42e8fe0e 100644
--- a/fluss-dist/src/main/assemblies/plugins.xml
+++ b/fluss-dist/src/main/assemblies/plugins.xml
@@ -39,7 +39,6 @@
<include>org.apache.flink:flink-shaded-hadoop-2-uber</include>
</includes>
</dependencySet>
-
</dependencySets>
<files>
diff --git a/website/docs/quickstart/flink.md
b/website/docs/quickstart/flink-iceberg.md
similarity index 73%
copy from website/docs/quickstart/flink.md
copy to website/docs/quickstart/flink-iceberg.md
index 1fc282c2c..b32fc7e54 100644
--- a/website/docs/quickstart/flink.md
+++ b/website/docs/quickstart/flink-iceberg.md
@@ -1,12 +1,12 @@
---
-title: Real-Time Analytics with Flink
-sidebar_position: 1
+title: Real-Time Analytics with Flink (Iceberg)
+sidebar_position: 2
---
-# Real-Time Analytics With Flink
+# Real-Time Analytics With Flink (Iceberg)
This guide will get you up and running with Apache Flink to do real-time
analytics, covering some powerful features of Fluss,
-including integrating with Paimon.
+including integrating with Apache Iceberg.
The guide is derived from [TPC-H](https://www.tpc.org/tpch/) **Q5**.
For more information on working with Flink, refer to the [Apache Flink
Engine](engine-flink/getting-started.md) section.
@@ -28,16 +28,38 @@ We will use `docker compose` to spin up the required
components for this tutoria
1. Create a working directory for this guide.
```shell
-mkdir fluss-quickstart-flink
-cd fluss-quickstart-flink
+mkdir fluss-quickstart-flink-iceberg
+cd fluss-quickstart-flink-iceberg
```
-2. Create a `docker-compose.yml` file with the following content:
+2. Create a `lib` directory and download the required Hadoop jar file:
+
+```shell
+mkdir lib
+wget -O lib/hadoop-apache-3.3.5-2.jar
https://repo1.maven.org/maven2/io/trino/hadoop/hadoop-apache/3.3.5-2/hadoop-apache-3.3.5-2.jar```
+
+This jar file provides Hadoop 3.3.5 dependencies required for Iceberg's Hadoop
catalog integration.
+
+:::info
+The `lib` directory serves as a staging area for additional jars needed by the
Fluss coordinator server. The docker-compose configuration (see step 3) mounts
this directory and copies all jars to `/opt/fluss/plugins/iceberg/` inside the
coordinator container at startup.
+
+You can add more jars to this `lib` directory based on your requirements:
+- **Cloud storage support**: For AWS S3 integration with Iceberg, add the
corresponding Iceberg bundle jars (e.g., `iceberg-aws-bundle`)
+- **Custom Hadoop configurations**: Add jars for specific HDFS distributions
or custom authentication mechanisms
+- **Other catalog backends**: Add jars needed for alternative Iceberg catalog
implementations (e.g., Rest, Hive, Glue)
+
+Any jar placed in the `lib` directory will be automatically loaded by the
Fluss coordinator server, making it available for Iceberg integration.
+:::
+
+3. Create a `docker-compose.yml` file with the following content:
```yaml
services:
- #begin Fluss cluster
+ zookeeper:
+ restart: always
+ image: zookeeper:3.9.2
+
coordinator-server:
image: fluss/fluss:$FLUSS_DOCKER_VERSION$
command: coordinatorServer
@@ -49,11 +71,14 @@ services:
zookeeper.address: zookeeper:2181
bind.listeners: FLUSS://coordinator-server:9123
remote.data.dir: /tmp/fluss/remote-data
- datalake.format: paimon
- datalake.paimon.metastore: filesystem
- datalake.paimon.warehouse: /tmp/paimon
+ datalake.format: iceberg
+ datalake.iceberg.type: hadoop
+ datalake.iceberg.warehouse: /tmp/iceberg
volumes:
- - shared-tmpfs:/tmp/paimon
+ - shared-tmpfs:/tmp/iceberg
+ - ./lib:/tmp/lib
+ entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/fluss/plugins/iceberg/
&& exec /docker-entrypoint.sh coordinatorServer"]
+
tablet-server:
image: fluss/fluss:$FLUSS_DOCKER_VERSION$
command: tabletServer
@@ -67,16 +92,12 @@ services:
data.dir: /tmp/fluss/data
remote.data.dir: /tmp/fluss/remote-data
kv.snapshot.interval: 0s
- datalake.format: paimon
- datalake.paimon.metastore: filesystem
- datalake.paimon.warehouse: /tmp/paimon
+ datalake.format: iceberg
+ datalake.iceberg.type: hadoop
+ datalake.iceberg.warehouse: /tmp/iceberg
volumes:
- - shared-tmpfs:/tmp/paimon
- zookeeper:
- restart: always
- image: zookeeper:3.9.2
- #end
- #begin Flink cluster
+ - shared-tmpfs:/tmp/iceberg
+
jobmanager:
image: fluss/quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$
ports:
@@ -87,7 +108,8 @@ services:
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
volumes:
- - shared-tmpfs:/tmp/paimon
+ - shared-tmpfs:/tmp/iceberg
+
taskmanager:
image: fluss/quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$
depends_on:
@@ -101,9 +123,8 @@ services:
taskmanager.memory.process.size: 2048m
taskmanager.memory.framework.off-heap.size: 256m
volumes:
- - shared-tmpfs:/tmp/paimon
- #end
-
+ - shared-tmpfs:/tmp/iceberg
+
volumes:
shared-tmpfs:
driver: local
@@ -117,7 +138,7 @@ The Docker Compose environment consists of the following
containers:
- **Flink Cluster**: a Flink `JobManager` and a Flink `TaskManager` container
to execute queries.
**Note:** The `fluss/quickstart-flink` image is based on
[flink:1.20.1-java17](https://hub.docker.com/layers/library/flink/1.20-java17/images/sha256:bf1af6406c4f4ad8faa46efe2b3d0a0bf811d1034849c42c1e3484712bc83505)
and
-includes the [fluss-flink](engine-flink/getting-started.md),
[paimon-flink](https://paimon.apache.org/docs/1.0/flink/quick-start/) and
+includes the [fluss-flink](engine-flink/getting-started.md),
[iceberg-flink](https://iceberg.apache.org/docs/latest/flink/) and
[flink-connector-faker](https://flink-packages.org/packages/flink-faker) to
simplify this guide.
3. To start all containers, run:
@@ -126,7 +147,7 @@ docker compose up -d
```
This command automatically starts all the containers defined in the Docker
Compose configuration in detached mode.
-Run
+Run
```shell
docker container ls -a
```
@@ -136,7 +157,7 @@ You can also visit http://localhost:8083/ to see if Flink
is running normally.
:::note
- If you want to additionally use an observability stack, follow one of the
provided quickstart guides [here](maintenance/observability/quickstart.md) and
then continue with this guide.
-- If you want to run with your own Flink environment, remember to download the
[fluss-flink connector jar](/downloads),
[flink-connector-faker](https://github.com/knaufk/flink-faker/releases),
[paimon-flink connector
jar](https://paimon.apache.org/docs/1.0/flink/quick-start/) and then put them
to `FLINK_HOME/lib/`.
+- If you want to run with your own Flink environment, remember to download the
[fluss-flink connector jar](/downloads),
[flink-connector-faker](https://github.com/knaufk/flink-faker/releases) and
[iceberg-flink connector jar](https://iceberg.apache.org/docs/latest/flink/)
and then put them to `FLINK_HOME/lib/`.
- All the following commands involving `docker compose` should be executed in
the created working directory that contains the `docker-compose.yml` file.
:::
@@ -252,27 +273,27 @@ the `fluss_orders` table with information from the
`fluss_customer` and `fluss_n
```sql title="Flink SQL"
INSERT INTO enriched_orders
-SELECT o.order_key,
- o.cust_key,
+SELECT o.order_key,
+ o.cust_key,
o.total_price,
- o.order_date,
+ o.order_date,
o.order_priority,
o.clerk,
c.name,
c.phone,
- c.acctbal,
+ c.acctbal,
c.mktsegment,
n.name
-FROM fluss_order o
-LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
+FROM fluss_order o
+LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON o.cust_key = c.cust_key
-LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n`
+LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n`
ON c.nation_key = n.nation_key;
```
## Run Ad-hoc Queries on Fluss Tables
-You can now perform real-time analytics directly on Fluss tables.
+You can now perform real-time analytics directly on Fluss tables.
For instance, to calculate the number of orders placed by a specific customer,
you can execute the following SQL query to obtain instant, real-time results.
```sql title="Flink SQL"
@@ -299,7 +320,7 @@ SELECT * FROM enriched_orders LIMIT 2;
| 10715776 | 2 | 924.43 | 2024-11-04 | medium | Clerk3 |
Rita Booke | (925) 775-0717 | 172.39 | FURNITURE | UNITED |
+-----------+----------+-------------+------------+----------------+--------+------------+----------------+--------------+-----------------+-------------+
```
-If you are interested in a specific customer, you can retrieve their details
by performing a lookup on the `cust_key`.
+If you are interested in a specific customer, you can retrieve their details
by performing a lookup on the `cust_key`.
```sql title="Flink SQL"
-- lookup by primary key
@@ -346,26 +367,26 @@ The following SQL query should return an empty result.
SELECT * FROM fluss_customer WHERE `cust_key` = 1;
```
-## Integrate with Paimon
+## Integrate with Iceberg
### Start the Lakehouse Tiering Service
-To integrate with [Apache Paimon](https://paimon.apache.org/), you need to
start the `Lakehouse Tiering Service`.
-Open a new terminal, navigate to the `fluss-quickstart-flink` directory, and
execute the following command within this directory to start the service:
+To integrate with [Apache Iceberg](https://iceberg.apache.org/), you need to
start the `Lakehouse Tiering Service`.
+Open a new terminal, navigate to the `fluss-quickstart-flink-iceberg`
directory, and execute the following command within this directory to start the
service:
```shell
docker compose exec jobmanager \
/opt/flink/bin/flink run \
/opt/flink/opt/fluss-flink-tiering-$FLUSS_VERSION$.jar \
--fluss.bootstrap.servers coordinator-server:9123 \
- --datalake.format paimon \
- --datalake.paimon.metastore filesystem \
- --datalake.paimon.warehouse /tmp/paimon
+ --datalake.format iceberg \
+ --datalake.iceberg.type hadoop \
+ --datalake.iceberg.warehouse /tmp/iceberg
```
-You should see a Flink Job to tier data from Fluss to Paimon running in the
[Flink Web UI](http://localhost:8083/).
+You should see a Flink Job to tier data from Fluss to Iceberg running in the
[Flink Web UI](http://localhost:8083/).
### Streaming into Fluss datalake-enabled tables
By default, tables are created with data lake integration disabled, meaning
the Lakehouse Tiering Service will not tier the table's data to the data lake.
-To enable lakehouse functionality as a tiered storage solution for a table,
you must create the table with the configuration option `table.datalake.enabled
= true`.
+To enable lakehouse functionality as a tiered storage solution for a table,
you must create the table with the configuration option `table.datalake.enabled
= true`.
Return to the `SQL client` and execute the following SQL statement to create a
table with data lake integration enabled:
```sql title="Flink SQL"
CREATE TABLE datalake_enriched_orders (
@@ -416,37 +437,38 @@ FROM fluss_order o
### Real-Time Analytics on Fluss datalake-enabled Tables
-The data for the `datalake_enriched_orders` table is stored in Fluss (for
real-time data) and Paimon (for historical data).
+The data for the `datalake_enriched_orders` table is stored in Fluss (for
real-time data) and Iceberg (for historical data).
+
+When querying the `datalake_enriched_orders` table, Fluss uses a union
operation that combines data from both Fluss and Iceberg to provide a complete
result set -- combines **real-time** and **historical** data.
-When querying the `datalake_enriched_orders` table, Fluss uses a union
operation that combines data from both Fluss and Paimon to provide a complete
result set -- combines **real-time** and **historical** data.
+If you wish to query only the data stored in Iceberg—offering high-performance
access without the overhead of unioning data—you can use the
`datalake_enriched_orders$lake` table by appending the `$lake` suffix.
+This approach also enables all the optimizations and features of a Flink
Iceberg table source, including [system
table](https://iceberg.apache.org/docs/latest/flink-queries/#inspecting-tables)
such as `datalake_enriched_orders$lake$snapshots`.
-If you wish to query only the data stored in Paimon—offering high-performance
access without the overhead of unioning data—you can use the
`datalake_enriched_orders$lake` table by appending the `$lake` suffix.
-This approach also enables all the optimizations and features of a Flink
Paimon table source, including [system
table](https://paimon.apache.org/docs/master/concepts/system-tables/) such as
`datalake_enriched_orders$lake$snapshots`.
-To query the snapshots directly from Paimon, use the following SQL:
```sql title="Flink SQL"
-- switch to batch mode
SET 'execution.runtime-mode' = 'batch';
```
+
```sql title="Flink SQL"
--- query snapshots in paimon
-SELECT snapshot_id, total_record_count FROM
datalake_enriched_orders$lake$snapshots;
+-- query snapshots in iceberg
+SELECT snapshot_id, operation FROM datalake_enriched_orders$lake$snapshots;
```
**Sample Output:**
```shell
+-------------+--------------------+
-| snapshot_id | total_record_count |
+| snapshot_id | operation |
+-------------+--------------------+
-| 1 | 650 |
+| 1 | append |
+-------------+--------------------+
```
-**Note:** Make sure to wait for the checkpoints (~30s) to complete before
querying the snapshots, otherwise the result will be empty.
+**Note:** Make sure to wait for the configured `datalake.freshness` (~30s) to
complete before querying the snapshots, otherwise the result will be empty.
-Run the following SQL to do analytics on Paimon data:
+Run the following SQL to do analytics on Iceberg data:
```sql title="Flink SQL"
--- to sum prices of all orders in paimon
+-- to sum prices of all orders in iceberg
SELECT sum(total_price) as sum_price FROM datalake_enriched_orders$lake;
```
**Sample Output:**
@@ -458,54 +480,47 @@ SELECT sum(total_price) as sum_price FROM
datalake_enriched_orders$lake;
+------------+
```
-To achieve results with sub-second data freshness, you can query the table
directly, which seamlessly unifies data from both Fluss and Paimon:
+To achieve results with sub-second data freshness, you can query the table
directly, which seamlessly unifies data from both Fluss and Iceberg:
+
```sql title="Flink SQL"
--- to sum prices of all orders in fluss and paimon
+-- to sum prices of all orders (combining fluss and iceberg data)
SELECT sum(total_price) as sum_price FROM datalake_enriched_orders;
```
-The result looks like:
-```
+
+**Sample Output:**
+```shell
+------------+
| sum_price |
+------------+
| 1777908.36 |
+------------+
```
+
You can execute the real-time analytics query multiple times, and the results
will vary with each run as new data is continuously written to Fluss in
real-time.
-Finally, you can use the following command to view the files stored in Paimon:
+Finally, you can use the following command to view the files stored in Iceberg:
```shell
-docker compose exec taskmanager tree /tmp/paimon/fluss.db
+docker compose exec taskmanager tree /tmp/iceberg/fluss.db
```
**Sample Output:**
```shell
-/tmp/paimon/fluss.db
+/tmp/iceberg/fluss.db
└── datalake_enriched_orders
- ├── bucket-0
- │ ├── changelog-aef1810f-85b2-4eba-8eb8-9b136dec5bdb-0.orc
- │ └── data-aef1810f-85b2-4eba-8eb8-9b136dec5bdb-1.orc
- ├── manifest
- │ ├── manifest-aaa007e1-81a2-40b3-ba1f-9df4528bc402-0
- │ ├── manifest-aaa007e1-81a2-40b3-ba1f-9df4528bc402-1
- │ ├── manifest-list-ceb77e1f-7d17-4160-9e1f-f334918c6e0d-0
- │ ├── manifest-list-ceb77e1f-7d17-4160-9e1f-f334918c6e0d-1
- │ └── manifest-list-ceb77e1f-7d17-4160-9e1f-f334918c6e0d-2
- ├── schema
- │ └── schema-0
- └── snapshot
- ├── EARLIEST
- ├── LATEST
- └── snapshot-1
-```
-The files adhere to Paimon's standard format, enabling seamless querying with
other engines such as
[StarRocks](https://docs.starrocks.io/docs/data_source/catalog/paimon_catalog/).
+ ├── data
+ │ └── 00000-0-abc123.parquet
+ └── metadata
+ ├── snap-1234567890123456789-1-abc123.avro
+ └── v1.metadata.json
+```
+The files adhere to Iceberg's standard format, enabling seamless querying with
other engines such as
[Spark](https://iceberg.apache.org/docs/latest/spark-queries/) and
[Trino](https://trino.io/docs/current/connector/iceberg.html).
## Clean up
-After finishing the tutorial, run `exit` to exit Flink SQL CLI Container and
then run
+After finishing the tutorial, run `exit` to exit Flink SQL CLI Container and
then run
```shell
docker compose down -v
```
to stop all containers.
## Learn more
-Now that you're up and running with Fluss and Flink, check out the [Apache
Flink Engine](engine-flink/getting-started.md) docs to learn more features with
Flink or [this guide](/maintenance/observability/quickstart.md) to learn how to
set up an observability stack for Fluss and Flink.
+Now that you're up and running with Fluss and Flink with Iceberg, check out
the [Apache Flink Engine](engine-flink/getting-started.md) docs to learn more
features with Flink or [this guide](/maintenance/observability/quickstart.md)
to learn how to set up an observability stack for Fluss and Flink.
\ No newline at end of file
diff --git a/website/docs/quickstart/flink.md b/website/docs/quickstart/flink.md
index 1fc282c2c..ee8c47a50 100644
--- a/website/docs/quickstart/flink.md
+++ b/website/docs/quickstart/flink.md
@@ -1,9 +1,9 @@
---
-title: Real-Time Analytics with Flink
+title: Real-Time Analytics with Flink (Paimon)
sidebar_position: 1
---
-# Real-Time Analytics With Flink
+# Real-Time Analytics With Flink (Paimon)
This guide will get you up and running with Apache Flink to do real-time
analytics, covering some powerful features of Fluss,
including integrating with Paimon.
@@ -442,7 +442,7 @@ SELECT snapshot_id, total_record_count FROM
datalake_enriched_orders$lake$snapsh
| 1 | 650 |
+-------------+--------------------+
```
-**Note:** Make sure to wait for the checkpoints (~30s) to complete before
querying the snapshots, otherwise the result will be empty.
+**Note:** Make sure to wait for the configured `datalake.freshness` (~30s) to
complete before querying the snapshots, otherwise the result will be empty.
Run the following SQL to do analytics on Paimon data:
```sql title="Flink SQL"