This is an automated email from the ASF dual-hosted git repository.

hongshun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new b169f7844 [FLINK-39294][docs][postgres][fluss] Add document about 
schema evolution from postgres to fluss. (#4337)
b169f7844 is described below

commit b169f784453b6d57fa16eb101a84b51785da88b6
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Mar 26 15:17:10 2026 +0800

    [FLINK-39294][docs][postgres][fluss] Add document about schema evolution 
from postgres to fluss. (#4337)
---
 .../docs/connectors/pipeline-connectors/fluss.md   |  25 +-
 .../connectors/pipeline-connectors/postgres.md     |  16 +-
 .../get-started/quickstart/postgres-to-fluss.md    | 406 ++++++++++++++++++++
 .../docs/connectors/pipeline-connectors/fluss.md   |  25 +-
 .../connectors/pipeline-connectors/postgres.md     |  14 +-
 .../get-started/quickstart/postgres-to-fluss.md    | 408 +++++++++++++++++++++
 6 files changed, 889 insertions(+), 5 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/fluss.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/fluss.md
index bca232b98..1ac06c19c 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/fluss.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/fluss.md
@@ -30,6 +30,7 @@ Fluss Pipeline 连接器可用作 Pipeline 的 *Data Sink*,将数据写入 [Fl
 ## What can the connector do?
 * 自动创建不存在的表 
 * 数据同步
+* Schema 变更同步(lenient 模式)
 
 How to create Pipeline
 ----------------
@@ -60,6 +61,7 @@ sink:
 pipeline:
   name: MySQL to Fluss Pipeline
   parallelism: 2
+  schema.change.behavior: LENIENT
 ```
 
 Pipeline Connector Options
@@ -140,7 +142,13 @@ Pipeline Connector Options
   * 桶数量由 `bucket.num` 选项控制
   * 数据分布由 `bucket.key` 
选项控制。对于主键表,若未指定分桶键,则分桶键默认为主键(不含分区键);对于无主键的日志表,若未指定分桶键,则数据将随机分配到各个桶中。 
 
-* 不支持 schema 变更同步。如果需要忽略 schema 变更,可使用 `schema.change.behavior: IGNORE`。
+* 支持在 `lenient` 模式下进行 Schema 变更同步,通过 `schema.change.behavior: lenient` 配置。支持以下 
Schema 变更事件:
+  * **新增列** — 新列会追加到 Fluss 表中。
+  * **删除列** — 在 lenient 模式下不会真正删除列,而是忽略该删除操作,后续写入时将该列的值设为 null。
+  * **重命名列** — 在 lenient 模式下,此操作会被转换为新增列 + 将旧列类型修改为可空的序列。
+  * **修改列类型** — 不支持。
+
+  要启用 Schema 变更同步,请在 pipeline 中配置 `schema.change.behavior: lenient`。如果想要忽略所有 
Schema 变更,使用 `schema.change.behavior: IGNORE`。
 
 * 关于数据同步, Pipeline 连接器使用 [Fluss Java 
Client](https://fluss.apache.org/docs/apis/java-client/) 向 Fluss 写入数据.
 
@@ -236,6 +244,21 @@ Data Type Mapping
       <td>BYTES</td>
       <td></td>
     </tr>
+    <tr>
+      <td>ARRAY</td>
+      <td>ARRAY</td>
+      <td>元素类型递归映射。</td>
+    </tr>
+    <tr>
+      <td>MAP</td>
+      <td>MAP</td>
+      <td>键和值类型递归映射。</td>
+    </tr>
+    <tr>
+      <td>ROW</td>
+      <td>ROW</td>
+      <td>字段类型递归映射。</td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
index c3614401f..02fb272cf 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
@@ -27,7 +27,6 @@ under the License.
 # Postgres Connector
 
 Postgres CDC Pipeline 连接器允许从 Postgres 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 
本文描述了如何设置 Postgres CDC Pipeline 连接器。
-注意:因为Postgres的wal log日志中展示没有办法解析表结构变更记录,因此Postgres CDC Pipeline 
Source暂时不支持同步表结构变更。
 
 ## 示例
 
@@ -35,7 +34,7 @@ Postgres CDC Pipeline 连接器允许从 Postgres 数据库读取快照数据和
 
 ```yaml
 source:
-   type: posgtres
+   type: postgres
    name: Postgres Source
    hostname: 127.0.0.1
    port: 5432
@@ -45,6 +44,7 @@ source:
    tables: adb.\.*.\.*
    decoding.plugin.name:  pgoutput
    slot.name: pgtest
+   schema-change.enabled: true
 
 sink:
   type: fluss
@@ -59,6 +59,7 @@ sink:
 pipeline:
    name: Postgres to Fluss Pipeline
    parallelism: 4
+   schema.change.behavior: lenient
 ```
 
 ## 连接器配置项
@@ -282,6 +283,17 @@ pipeline:
         默认值为 false。
       </td>
     </tr>
+    <tr>
+      <td>schema-change.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>
+        是否开启 Postgres 源的 Schema 变更推导。开启后,连接器会通过对比 pgoutput Relation 消息与缓存的 
Schema 来推导 Schema 变更事件(新增列、删除列、重命名列、修改列类型)。<br>
+        需要将 <code>decoding.plugin.name</code> 设置为 <code>pgoutput</code>。<br>
+        默认值为 false。
+      </td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git a/docs/content.zh/docs/get-started/quickstart/postgres-to-fluss.md 
b/docs/content.zh/docs/get-started/quickstart/postgres-to-fluss.md
new file mode 100644
index 000000000..7ffb7852f
--- /dev/null
+++ b/docs/content.zh/docs/get-started/quickstart/postgres-to-fluss.md
@@ -0,0 +1,406 @@
+---
+title: "Postgres 同步到 Fluss"
+weight: 5
+type: docs
+aliases:
+- /get-started/quickstart/postgres-to-fluss
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Streaming ELT 同步 Postgres 到 Fluss
+
+这篇教程将展示如何基于 Flink CDC 快速构建 PostgreSQL 到 Fluss 的 Streaming ELT 
作业,包含整库同步和表结构变更同步的功能。
+本教程的演示都将在 Flink CDC CLI 中进行,无需一行 Java/Scala 代码,也无需安装 IDE。
+
+## 准备阶段
+准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。
+
+### 准备 Flink Standalone 集群
+1. 下载 [Flink 
1.20.3](https://archive.apache.org/dist/flink/flink-1.20.3/flink-1.20.3-bin-scala_2.12.tgz),解压后得到
 flink-1.20.3 目录。
+   使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.20.3 所在目录。
+
+   ```shell
+   cd flink-1.20.3
+   ```
+
+2. 通过在 conf/config.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。
+
+   ```yaml
+   execution:
+     checkpointing:
+       interval: 3s
+   ```
+
+3. 使用下面的命令启动 Flink 集群。
+
+   ```shell
+   ./bin/start-cluster.sh
+   ```  
+
+启动成功的话,可以在 [http://localhost:8081/](http://localhost:8081/) 访问到 Flink Web UI。
+
+多次执行 `start-cluster.sh` 可以拉起多个 TaskManager。
+
+### 准备 Docker 环境
+接下来的教程将以 `docker-compose` 的方式准备所需要的组件。
+
+使用下面的内容创建一个 `docker-compose.yml` 文件:
+
+   ```yaml
+   services:
+     # Fluss 集群
+     coordinator-server:
+       image: apache/fluss:0.9.0-incubating
+       command: coordinatorServer
+       depends_on:
+         - zookeeper
+       environment:
+         - |
+           FLUSS_PROPERTIES=
+           zookeeper.address: zookeeper:2181
+           bind.listeners: INTERNAL://coordinator-server:0, 
CLIENT://coordinator-server:9123
+           advertised.listeners: CLIENT://localhost:9123
+           internal.listener.name: INTERNAL
+           remote.data.dir: /tmp/fluss/remote-data
+           security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT
+           security.sasl.enabled.mechanisms: PLAIN
+           security.sasl.plain.jaas.config: 
org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required 
user_admin="admin-pass" user_developer="developer-pass" ;
+           super.users: User:admin
+       ports:
+         - "9123:9123"
+     tablet-server:
+       image: apache/fluss:0.9.0-incubating
+       command: tabletServer
+       depends_on:
+         - coordinator-server
+       environment:
+         - |
+           FLUSS_PROPERTIES=
+           zookeeper.address: zookeeper:2181
+           bind.listeners: INTERNAL://tablet-server:0, 
CLIENT://tablet-server:9123
+           advertised.listeners: CLIENT://localhost:9124
+           internal.listener.name: INTERNAL
+           tablet-server.id: 0
+           kv.snapshot.interval: 0s
+           data.dir: /tmp/fluss/data
+           remote.data.dir: /tmp/fluss/remote-data
+           security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT
+           security.sasl.enabled.mechanisms: PLAIN
+           security.sasl.plain.jaas.config: 
org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required 
user_admin="admin-pass" user_developer="developer-pass" ;
+           super.users: User:admin
+       ports:
+         - "9124:9123"
+     zookeeper:
+       restart: always
+       image: zookeeper:3.9.2
+     # PostgreSQL
+     postgres:
+       image: postgres:14.5
+       environment:
+         POSTGRES_USER: root
+         POSTGRES_PASSWORD: password
+         POSTGRES_DB: postgres
+       ports:
+         - "5432:5432"
+       volumes:
+         - postgres_data:/var/lib/postgresql/data
+       command:
+         - "postgres"
+         - "-c"
+         - "wal_level=logical"
+         - "-c"
+         - "max_replication_slots=5"
+         - "-c"
+         - "max_wal_senders=5"
+         - "-c"
+         - "hot_standby=on"
+   volumes:
+     postgres_data:
+   ```
+
+该 Docker Compose 中包含的容器有:
+- **Fluss**(coordinator-server, tablet-server, zookeeper):目标数据湖仓
+- **PostgreSQL**:源数据库,已开启逻辑复制(`wal_level=logical`)
+
+在 `docker-compose.yml` 所在目录下执行下面的命令来启动本教程需要的组件:
+
+   ```shell
+   docker-compose up -d
+   ```
+
+该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 `docker ps` 
来观察上述的容器是否正常启动了。
+
+#### 在 PostgreSQL 数据库中准备数据
+1. 连接 PostgreSQL 数据库
+
+   ```shell
+   psql -h localhost -p 5432 -U root postgres
+   ```
+   密码为:`password`
+
+2. 创建 `adb` 数据库并切换
+
+   ```sql
+   CREATE DATABASE adb;
+   \c adb
+   ```
+
+3. 创建 Schema 和表,并插入数据
+
+    ```sql
+    -- 创建 Schema
+    CREATE SCHEMA hr;
+    CREATE SCHEMA sales;
+   
+    -- 创建表
+    CREATE TABLE hr.employees(
+       ID INT PRIMARY KEY NOT NULL,
+       NAME TEXT NOT NULL,
+       AGE INT NOT NULL,
+       ADDRESS CHAR(50),
+       SALARY REAL
+    );
+   
+    CREATE TABLE sales.orders(
+       ID INT PRIMARY KEY NOT NULL,
+       PRODUCT TEXT NOT NULL,
+       QUANTITY INT NOT NULL,
+       REGION CHAR(50),
+       AMOUNT REAL
+    );
+   
+    -- 插入数据
+    INSERT INTO hr.employees (ID, NAME, AGE, ADDRESS, SALARY)
+    VALUES (1, 'Paul', 32, 'California', 20000.00);
+   
+    INSERT INTO sales.orders (ID, PRODUCT, QUANTITY, REGION, AMOUNT)
+    VALUES (1, 'Laptop', 5, 'East', 49999.50);
+    ```
+
+## 通过 Flink CDC CLI 提交任务
+1. 下载下面列出的二进制压缩包,并解压得到目录 `flink-cdc-{{< param Version >}}`;
+   [flink-cdc-{{< param Version 
>}}-bin.tar.gz](https://www.apache.org/dyn/closer.lua/flink/flink-cdc-{{< param 
Version >}}/flink-cdc-{{< param Version >}}-bin.tar.gz)
+   flink-cdc-{{< param Version >}} 下会包含 `bin`、`lib`、`log`、`conf` 四个目录。
+
+2. 下载下面列出的 connector 包,并且移动到 `lib` 目录下;
+   **下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译。**
+   **请注意,您需要将 jar 移动到 Flink CDC Home 的 lib 目录,而非 Flink Home 的 lib 目录下。**
+   - [Postgres pipeline connector {{< param Version 
>}}](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-postgres/{{<
 param Version >}}/flink-cdc-pipeline-connector-postgres-{{< param Version 
>}}.jar)
+   - [Fluss pipeline connector {{< param Version 
>}}](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-fluss/{{<
 param Version >}}/flink-cdc-pipeline-connector-fluss-{{< param Version >}}.jar)
+
+3. 编写任务配置 yaml 文件。
+   下面给出了一个整库同步的示例文件 `postgres-to-fluss.yaml`:
+
+   ```yaml
+   
################################################################################
+   # Description: Sync Postgres all tables to Fluss
+   
################################################################################
+   source:
+     type: postgres
+     hostname: localhost
+     port: 5432
+     username: root
+     password: password
+     tables: adb.\.*.\.*
+     decoding.plugin.name: pgoutput
+     slot.name: pgtest
+     schema-change.enabled: true
+   
+   sink:
+     type: fluss
+     bootstrap.servers: localhost:9123
+     properties.client.security.protocol: sasl
+     properties.client.security.sasl.mechanism: PLAIN
+     properties.client.security.sasl.username: developer
+     properties.client.security.sasl.password: developer-pass
+   
+   pipeline:
+     name: Postgres to Fluss Pipeline
+     parallelism: 2
+     schema.change.behavior: LENIENT
+   
+   ```
+
+   其中:
+   - source 中的 `tables: adb.\.*.\.*` 通过正则匹配同步 `adb` 数据库下所有 Schema 
的所有表。所有表必须属于同一个数据库。
+   - `schema-change.enabled: true` 开启基于 pgoutput Relation 消息的 Schema 变更推导。需要 
`decoding.plugin.name` 设置为 `pgoutput`。
+   - `schema.change.behavior: LENIENT` 必须显式设置,否则可能会被默认的 `conf.yaml` 配置覆盖。
+
+4. 最后,通过命令行提交任务到 Flink Standalone cluster
+   ```shell
+   bash bin/flink-cdc.sh postgres-to-fluss.yaml
+   ```
+   提交成功后,返回信息如:
+   ```shell
+   Pipeline has been submitted to cluster.
+   Job ID: ae30f4580f1918bebf16752d4963dc54
+   Job Description: Postgres to Fluss Pipeline
+   ```
+   在 Flink Web UI,可以看到一个名为 `Postgres to Fluss Pipeline` 的任务正在运行。
+
+### 在 Fluss 中查询数据
+要查询已同步到 Fluss 的数据,需要配置 Flink SQL Client。
+
+1. 下载 
[fluss-flink-1.20-0.9.0-incubating.jar](https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-1.20/0.9.0-incubating/fluss-flink-1.20-0.9.0-incubating.jar)
 并放入 Flink 的 `lib` 目录。
+
+2. 启动 Flink SQL Client:
+   ```shell
+   bin/sql-client.sh
+   ```
+
+3. 创建 Fluss Catalog 并查询数据:
+   ```sql
+   SET 'execution.runtime-mode' = 'batch';
+   SET 'sql-client.execution.result-mode' = 'tableau';
+   
+   CREATE CATALOG developer_catalog WITH (
+       'type' = 'fluss',
+       'bootstrap.servers' = 'localhost:9123',
+       'client.security.protocol' = 'SASL',
+       'client.security.sasl.mechanism' = 'PLAIN',
+       'client.security.sasl.username' = 'developer',
+       'client.security.sasl.password' = 'developer-pass'
+   );
+   
+   USE CATALOG developer_catalog;
+   SHOW DATABASES;
+   +---------------+
+   | database name |
+   +---------------+
+   |         fluss |
+   |            hr |
+   |         sales |
+   +---------------+
+   ```
+
+4. 查询已同步的表:
+   ```sql
+   SELECT * FROM `developer_catalog`.`hr`.`employees` LIMIT 20;
+    +----+------+-----+--------------------------------+---------+
+    | id | name | age |                        address |  salary |
+    +----+------+-----+--------------------------------+---------+
+    |  1 | Paul |  32 | California                 ... | 20000.0 |
+    +----+------+-----+--------------------------------+---------+
+   ```
+
+### 同步表结构变更
+PostgreSQL 的表结构变更是**数据驱动**的 — DDL 变更不会立即被捕获,只有当下一个 DML 消息触发 pgoutput 插件发送 
Relation 消息时才会被识别。
+
+连接 PostgreSQL 数据库:
+
+   ```shell
+   psql -h localhost -p 5432 -U root adb
+   ```
+
+#### 新增列
+在 PostgreSQL 侧新增列并插入数据:
+
+   ```sql
+   ALTER TABLE hr.employees
+   ADD COLUMN EMAIL TEXT,
+   ADD COLUMN DEPARTMENT TEXT;
+   
+   INSERT INTO hr.employees (ID, NAME, AGE, ADDRESS, SALARY, EMAIL, 
DEPARTMENT) VALUES
+   (4, 'David', 32, 'Guangzhou', 8000.0, '[email protected]', 'IT'),
+   (5, 'Eva', 27, 'Hangzhou', 7100.0, '[email protected]', 'HR');
+   ```
+
+查询 Fluss,可以发现新的列已经创建了,而且存量数据对应列为 NULL:
+
+   ```sql
+   SELECT * FROM `developer_catalog`.`hr`.`employees` LIMIT 20;
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+
+    | id |  name | age |                        address |  salary |            
 email | department |
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+
+    |  1 |  Paul |  32 | California                 ... | 20000.0 |            
<NULL> |     <NULL> |
+    |  4 | David |  32 | Guangzhou                  ... |  8000.0 | 
[email protected] |         IT |
+    |  5 |   Eva |  27 | Hangzhou                   ... |  7100.0 |   
[email protected] |         HR |
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+
+   ```
+
+#### 删除列
+在 PostgreSQL 侧删除列并插入数据:
+
+   ```sql
+   ALTER TABLE hr.employees
+   DROP COLUMN ADDRESS;
+   
+   INSERT INTO hr.employees (ID, NAME, AGE, SALARY, EMAIL, DEPARTMENT) VALUES
+   (6, 'Frank', 35, 9000.0, '[email protected]', 'Finance'),
+   (7, 'Grace', 29, 7600.0, '[email protected]', 'Marketing');
+   ```
+
+查询 Fluss:
+
+   ```sql
+   SELECT * FROM `developer_catalog`.`hr`.`employees` LIMIT 20;
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+
+    | id |  name | age |                        address |  salary |            
 email | department |
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+
+    |  1 |  Paul |  32 | California                 ... | 20000.0 |            
<NULL> |     <NULL> |
+    |  4 | David |  32 | Guangzhou                  ... |  8000.0 | 
[email protected] |         IT |
+    |  5 |   Eva |  27 | Hangzhou                   ... |  7100.0 |   
[email protected] |         HR |
+    |  6 | Frank |  35 |                         <NULL> |  9000.0 | 
[email protected] |    Finance |
+    |  7 | Grace |  29 |                         <NULL> |  7600.0 | 
[email protected] |  Marketing |
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+
+   ```
+
+#### 重命名列
+在 PostgreSQL 侧重命名列并插入数据:
+
+   ```sql
+   ALTER TABLE hr.employees
+   RENAME COLUMN EMAIL TO WORK_EMAIL;
+   
+   INSERT INTO hr.employees (ID, NAME, AGE, SALARY, WORK_EMAIL, DEPARTMENT) 
VALUES
+   (8, 'Henry', 31, 8800.0, '[email protected]', 'Sales'),
+   (9, 'Ivy', 26, 6900.0, '[email protected]', 'Support');
+   ```
+
+查询 Fluss 查看重命名后的列:
+
+   ```sql
+   SELECT * FROM `developer_catalog`.`hr`.`employees` LIMIT 20;
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+-------------------+
+    | id |  name | age |                        address |  salary |            
 email | department |        work_email |
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+-------------------+
+    |  1 |  Paul |  32 | California                 ... | 20000.0 |            
<NULL> |     <NULL> |            <NULL> |
+    |  4 | David |  32 | Guangzhou                  ... |  8000.0 | 
[email protected] |         IT |            <NULL> |
+    |  5 |   Eva |  27 | Hangzhou                   ... |  7100.0 |   
[email protected] |         HR |            <NULL> |
+    |  6 | Frank |  35 |                         <NULL> |  9000.0 | 
[email protected] |    Finance |            <NULL> |
+    |  7 | Grace |  29 |                         <NULL> |  7600.0 | 
[email protected] |  Marketing |            <NULL> |
+    |  8 | Henry |  31 |                         <NULL> |  8800.0 |            
<NULL> |      Sales | [email protected] |
+    |  9 |   Ivy |  26 |                         <NULL> |  6900.0 |            
<NULL> |    Support |   [email protected] |
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+-------------------+
+   ```
+
+## 环境清理
+本教程结束后,在 `docker-compose.yml` 文件所在的目录下执行如下命令停止所有容器:
+
+   ```shell
+   docker-compose down -v
+   ```
+在 Flink 所在目录 `flink-1.20.3` 下执行如下命令停止 Flink 集群:
+
+   ```shell
+   ./bin/stop-cluster.sh
+   ```
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/pipeline-connectors/fluss.md 
b/docs/content/docs/connectors/pipeline-connectors/fluss.md
index 99902283b..a2d7c5324 100644
--- a/docs/content/docs/connectors/pipeline-connectors/fluss.md
+++ b/docs/content/docs/connectors/pipeline-connectors/fluss.md
@@ -31,6 +31,7 @@ The Fluss Pipeline connector can be used as the *Data Sink* 
of the pipeline, and
 ## What can the connector do?
 * Create table automatically if not exist
 * Data synchronization
+* Schema change synchronization (lenient mode)
 
 How to create Pipeline
 ----------------
@@ -61,6 +62,7 @@ sink:
 pipeline:
   name: MySQL to Fluss Pipeline
   parallelism: 2
+  schema.change.behavior: LENIENT
 ```
 
 Pipeline Connector Options
@@ -142,7 +144,13 @@ Pipeline Connector Options
   * The number of buckets is controlled by `bucket.num`
   * The distribution keys are controlled by option `bucket.key`. For primary 
key table and a bucket key is not specified, the bucket key will be used as 
primary key(excluding the partition key). For log table has no primary key and 
the bucket key is not specified, the data will be distributed to each bucket 
randomly. 
 
-* Not support schema change synchronization.If you want to ignore schema 
change, use `schema.change.behavior: IGNORE`.
+* Supports schema change synchronization in `lenient` mode with 
`schema.change.behavior: lenient`. The following schema change events are 
supported:
+  * **Add column** — new columns are appended to the Fluss table.
+  * **Drop column** — the column is not physically removed in lenient mode. 
The drop operation is ignored, and subsequent writes will set the column value 
to null.
+  * **Rename column** — in lenient mode, this is translated into an 
add-new-column + alter-old-column-type-to-nullable sequence.
+  * **Alter column type** — not supported. 
+
+  To enable schema change synchronization, configure the pipeline with 
`schema.change.behavior: lenient`. If you want to ignore all schema changes, 
use `schema.change.behavior: IGNORE`.
 
 * For data synchronization, the pipeline connector uses [Fluss Java 
Client](https://fluss.apache.org/docs/apis/java-client/)
   to write data to Fluss.
@@ -239,6 +247,21 @@ Data Type Mapping
       <td>BYTES</td>
       <td></td>
     </tr>
+    <tr>
+      <td>ARRAY</td>
+      <td>ARRAY</td>
+      <td>Element type is mapped recursively.</td>
+    </tr>
+    <tr>
+      <td>MAP</td>
+      <td>MAP</td>
+      <td>Key and value types are mapped recursively.</td>
+    </tr>
+    <tr>
+      <td>ROW</td>
+      <td>ROW</td>
+      <td>Field types are mapped recursively.</td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md 
b/docs/content/docs/connectors/pipeline-connectors/postgres.md
index b660b991e..1deb246f5 100644
--- a/docs/content/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md
@@ -28,7 +28,6 @@ under the License.
 
 Postgres connector allows reading snapshot data and incremental data from 
Postgres database and provides end-to-end full-database data synchronization 
capabilities.
 This document describes how to setup the Postgres connector.
-Note: Since the Postgres WAL log cannot parse table structure change records, 
Postgres CDC Pipeline Source does not support synchronizing table structure 
changes currently.
 
 ## Example
 
@@ -46,6 +45,7 @@ source:
    tables: adb.\.*.\.*
    decoding.plugin.name:  pgoutput
    slot.name: pgtest
+   schema-change.enabled: true
 
 sink:
   type: fluss
@@ -60,6 +60,7 @@ sink:
 pipeline:
    name: Postgres to Fluss Pipeline
    parallelism: 4
+   schema.change.behavior: lenient
 ```
 
 ## Connector Options
@@ -274,6 +275,17 @@ pipeline:
         Defaults to false.
       </td>
     </tr>
+    <tr>
+      <td>schema-change.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>
+        Whether to enable schema change inference for the Postgres source. 
When enabled, the connector infers schema change events (add column, drop 
column, rename column, alter column type) by comparing pgoutput Relation 
messages against the cached schema.<br>
+        Requires <code>decoding.plugin.name</code> to be set to 
<code>pgoutput</code>.<br>
+        Defaults to false.
+      </td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git a/docs/content/docs/get-started/quickstart/postgres-to-fluss.md 
b/docs/content/docs/get-started/quickstart/postgres-to-fluss.md
new file mode 100644
index 000000000..36c66a0ff
--- /dev/null
+++ b/docs/content/docs/get-started/quickstart/postgres-to-fluss.md
@@ -0,0 +1,408 @@
+---
+title: "Postgres to Fluss"
+weight: 5
+type: docs
+aliases:
+- /get-started/quickstart/postgres-to-fluss
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Streaming ELT from Postgres to Fluss
+
+This tutorial shows how to quickly build a Streaming ELT job from PostgreSQL 
to Fluss using Flink CDC, including
+full-database synchronization and schema change evolution.
+All exercises in this tutorial are performed in the Flink CDC CLI, and the 
entire process uses standard SQL syntax,
+without a single line of Java/Scala code or IDE installation.
+
+## Preparation
+Prepare a Linux or MacOS computer with Docker installed.
+
+### Prepare Flink Standalone cluster
+1. Download [Flink 
1.20.3](https://archive.apache.org/dist/flink/flink-1.20.3/flink-1.20.3-bin-scala_2.12.tgz),
 unzip and get flink-1.20.3 directory.
+   Use the following command to navigate to the Flink directory and set 
FLINK_HOME to the directory where flink-1.20.3 is located.
+
+   ```shell
+   cd flink-1.20.3
+   ```
+
+2. Enable checkpointing by appending the following parameters to the 
conf/config.yaml configuration file to perform a checkpoint every 3 seconds.
+
+   ```yaml
+   execution:
+     checkpointing:
+       interval: 3s
+   ```
+
+3. Start the Flink cluster using the following command.
+
+   ```shell
+   ./bin/start-cluster.sh
+   ```  
+
+If successfully started, you can access the Flink Web UI at 
[http://localhost:8081/](http://localhost:8081/).
+
+Executing `start-cluster.sh` multiple times can start multiple `TaskManager`s.
+
+### Prepare docker compose
+The following tutorial will prepare the required components using 
`docker-compose`.
+
+Create a `docker-compose.yml` file using the content provided below:
+
+   ```yaml
+   services:
+     # Fluss cluster
+     coordinator-server:
+       image: apache/fluss:0.9.0-incubating
+       command: coordinatorServer
+       depends_on:
+         - zookeeper
+       environment:
+         - |
+           FLUSS_PROPERTIES=
+           zookeeper.address: zookeeper:2181
+           bind.listeners: INTERNAL://coordinator-server:0, 
CLIENT://coordinator-server:9123
+           advertised.listeners: CLIENT://localhost:9123
+           internal.listener.name: INTERNAL
+           remote.data.dir: /tmp/fluss/remote-data
+           security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT
+           security.sasl.enabled.mechanisms: PLAIN
+           security.sasl.plain.jaas.config: 
org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required 
user_admin="admin-pass" user_developer="developer-pass" ;
+           super.users: User:admin
+       ports:
+         - "9123:9123"
+     tablet-server:
+       image: apache/fluss:0.9.0-incubating
+       command: tabletServer
+       depends_on:
+         - coordinator-server
+       environment:
+         - |
+           FLUSS_PROPERTIES=
+           zookeeper.address: zookeeper:2181
+           bind.listeners: INTERNAL://tablet-server:0, 
CLIENT://tablet-server:9123
+           advertised.listeners: CLIENT://localhost:9124
+           internal.listener.name: INTERNAL
+           tablet-server.id: 0
+           kv.snapshot.interval: 0s
+           data.dir: /tmp/fluss/data
+           remote.data.dir: /tmp/fluss/remote-data
+           security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT
+           security.sasl.enabled.mechanisms: PLAIN
+           security.sasl.plain.jaas.config: 
org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required 
user_admin="admin-pass" user_developer="developer-pass" ;
+           super.users: User:admin
+       ports:
+         - "9124:9123"
+     zookeeper:
+       restart: always
+       image: zookeeper:3.9.2
+     # PostgreSQL
+     postgres:
+       image: postgres:14.5
+       environment:
+         POSTGRES_USER: root
+         POSTGRES_PASSWORD: password
+         POSTGRES_DB: postgres
+       ports:
+         - "5432:5432"
+       volumes:
+         - postgres_data:/var/lib/postgresql/data
+       command:
+         - "postgres"
+         - "-c"
+         - "wal_level=logical"
+         - "-c"
+         - "max_replication_slots=5"
+         - "-c"
+         - "max_wal_senders=5"
+         - "-c"
+         - "hot_standby=on"
+   volumes:
+     postgres_data:
+   ```
+
+The Docker Compose includes the following services:
+- **Fluss** (coordinator-server, tablet-server, zookeeper): the target data 
lakehouse
+- **PostgreSQL**: the source database with logical replication enabled 
(`wal_level=logical`)
+
+To start all containers, run the following command in the directory that 
contains the `docker-compose.yml` file.
+
+   ```shell
+   docker-compose up -d
+   ```
+
+This command automatically starts all the containers defined in the Docker 
Compose configuration in a detached mode. Run `docker ps` to check whether 
these containers are running properly.
+
+#### Prepare records for PostgreSQL
+1. Connect to the PostgreSQL database
+
+   ```shell
+   psql -h localhost -p 5432 -U root postgres
+   ```
+   The password is: `password`
+
+2. Create the `adb` database and switch to it
+
+   ```sql
+   CREATE DATABASE adb;
+   \c adb
+   ```
+
+3. Create schemas and tables, then insert data
+
+    ```sql
+    -- Create schemas
+    CREATE SCHEMA hr;
+    CREATE SCHEMA sales;
+   
+    -- Create tables
+    CREATE TABLE hr.employees(
+       ID INT PRIMARY KEY NOT NULL,
+       NAME TEXT NOT NULL,
+       AGE INT NOT NULL,
+       ADDRESS CHAR(50),
+       SALARY REAL
+    );
+   
+    CREATE TABLE sales.orders(
+       ID INT PRIMARY KEY NOT NULL,
+       PRODUCT TEXT NOT NULL,
+       QUANTITY INT NOT NULL,
+       REGION CHAR(50),
+       AMOUNT REAL
+    );
+   
+    -- Insert data
+    INSERT INTO hr.employees (ID, NAME, AGE, ADDRESS, SALARY)
+    VALUES (1, 'Paul', 32, 'California', 20000.00);
+   
+    INSERT INTO sales.orders (ID, PRODUCT, QUANTITY, REGION, AMOUNT)
+    VALUES (1, 'Laptop', 5, 'East', 49999.50);
+    ```
+
+## Submit job with Flink CDC CLI
+1. Download the binary compressed packages listed below and extract them to 
the directory `flink-cdc-{{< param Version >}}`:    
+   [flink-cdc-{{< param Version 
>}}-bin.tar.gz](https://www.apache.org/dyn/closer.lua/flink/flink-cdc-{{< param 
Version >}}/flink-cdc-{{< param Version >}}-bin.tar.gz)
+   flink-cdc-{{< param Version >}} directory will contain four directories: 
`bin`, `lib`, `log`, and `conf`.
+
+2. Download the connector packages listed below and move them to the `lib` 
directory    
+   **Download links are available only for stable releases, SNAPSHOT 
dependencies need to be built based on master or release branches by yourself.**
+   **Please note that you need to move the jar to the lib directory of Flink 
CDC Home, not to the lib directory of Flink Home.**
+    - [Postgres pipeline connector {{< param Version 
>}}](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-postgres/{{<
 param Version >}}/flink-cdc-pipeline-connector-postgres-{{< param Version 
>}}.jar)
+    - [Fluss pipeline connector {{< param Version 
>}}](https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-fluss/{{<
 param Version >}}/flink-cdc-pipeline-connector-fluss-{{< param Version >}}.jar)
+
+3. Write task configuration yaml file.
+   Here is an example file for synchronizing the entire database 
`postgres-to-fluss.yaml`:
+
+   ```yaml
+   
################################################################################
+   # Description: Sync Postgres all tables to Fluss
+   
################################################################################
+   source:
+     type: postgres
+     hostname: localhost
+     port: 5432
+     username: root
+     password: password
+     tables: adb.\.*.\.*
+     decoding.plugin.name: pgoutput
+     slot.name: pgtest
+     schema-change.enabled: true
+   
+   sink:
+     type: fluss
+     bootstrap.servers: localhost:9123
+     properties.client.security.protocol: sasl
+     properties.client.security.sasl.mechanism: PLAIN
+     properties.client.security.sasl.username: developer
+     properties.client.security.sasl.password: developer-pass
+   
+   pipeline:
+     name: Postgres to Fluss Pipeline
+     parallelism: 2
+     schema.change.behavior: LENIENT
+   
+   ```
+   
+   Notice that:
+   - `tables: adb.\.*.\.*` in source synchronizes all tables across all 
schemas in the `adb` database through regular expression matching. All tables 
must belong to the same database.
+   - `schema-change.enabled: true` enables schema change inference based on 
pgoutput Relation messages. Requires `decoding.plugin.name` to be set to 
`pgoutput`.
+   - `schema.change.behavior: LENIENT` must be explicitly set, otherwise it 
may be overridden by the default `conf.yaml` configuration.
+
+4. Finally, submit the job to the Flink Standalone cluster using CLI.
+   ```shell
+   bash bin/flink-cdc.sh postgres-to-fluss.yaml
+   ```
+   After successful submission, the return information is as follows:
+   ```shell
+   Pipeline has been submitted to cluster.
+   Job ID: ae30f4580f1918bebf16752d4963dc54
+   Job Description: Postgres to Fluss Pipeline
+   ```
+   You can find a job named `Postgres to Fluss Pipeline` running through the 
Flink Web UI.
+
+### Query data in Fluss
+To query the synchronized data in Fluss, you need to set up the Flink SQL 
Client.
+
+1. Download 
[fluss-flink-1.20-0.9.0-incubating.jar](https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-1.20/0.9.0-incubating/fluss-flink-1.20-0.9.0-incubating.jar)
 and place it in the Flink `lib` directory.
+
+2. Start the Flink SQL Client:
+   ```shell
+   bin/sql-client.sh
+   ```
+
+3. Create a Fluss catalog and query data:
+```sql
+   SET 'execution.runtime-mode' = 'batch';
+   SET 'sql-client.execution.result-mode' = 'tableau';
+   
+   CREATE CATALOG developer_catalog WITH (
+       'type' = 'fluss',
+       'bootstrap.servers' = 'localhost:9123',
+       'client.security.protocol' = 'SASL',
+       'client.security.sasl.mechanism' = 'PLAIN',
+       'client.security.sasl.username' = 'developer',
+       'client.security.sasl.password' = 'developer-pass'
+   );
+   
+   USE CATALOG developer_catalog;
+   SHOW DATABASES;
+   +---------------+
+   | database name |
+   +---------------+
+   |         fluss |
+   |            hr |
+   |         sales |
+   +---------------+
+```
+
+4. Query the synchronized table:
+   ```sql
+   SELECT * FROM `developer_catalog`.`hr`.`employees` LIMIT 20;
+    +----+------+-----+--------------------------------+---------+
+    | id | name | age |                        address |  salary |
+    +----+------+-----+--------------------------------+---------+
+    |  1 | Paul |  32 | California                 ... | 20000.0 |
+    +----+------+-----+--------------------------------+---------+
+   ```
+
+### Synchronize Schema Changes
+PostgreSQL schema changes are **data-driven** — a DDL change will not be 
captured until the next DML message triggers a Relation message from the 
pgoutput plugin.
+
+Connect to the PostgreSQL database:
+
+   ```shell
+   psql -h localhost -p 5432 -U root adb
+   ```
+
+#### Add columns
+Add new columns and insert data on the PostgreSQL side:
+
+   ```sql
+   ALTER TABLE hr.employees
+   ADD COLUMN EMAIL TEXT,
+   ADD COLUMN DEPARTMENT TEXT;
+   
+   INSERT INTO hr.employees (ID, NAME, AGE, ADDRESS, SALARY, EMAIL, 
DEPARTMENT) VALUES
+   (4, 'David', 32, 'Guangzhou', 8000.0, '[email protected]', 'IT'),
+   (5, 'Eva', 27, 'Hangzhou', 7100.0, '[email protected]', 'HR');
+   ```
+
+Query Fluss — the new columns have been created, and existing rows show `NULL` 
for the new columns:
+
+   ```sql
+   SELECT * FROM `developer_catalog`.`hr`.`employees` LIMIT 20;
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+
+    | id |  name | age |                        address |  salary |            
 email | department |
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+
+    |  1 |  Paul |  32 | California                 ... | 20000.0 |            
<NULL> |     <NULL> |
+    |  4 | David |  32 | Guangzhou                  ... |  8000.0 | 
[email protected] |         IT |
+    |  5 |   Eva |  27 | Hangzhou                   ... |  7100.0 |   
[email protected] |         HR |
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+
+   ```
+
+#### Drop column
+Drop a column and insert data on the PostgreSQL side:
+
+   ```sql
+   ALTER TABLE hr.employees
+   DROP COLUMN ADDRESS;
+   
+   INSERT INTO hr.employees (ID, NAME, AGE, SALARY, EMAIL, DEPARTMENT) VALUES
+   (6, 'Frank', 35, 9000.0, '[email protected]', 'Finance'),
+   (7, 'Grace', 29, 7600.0, '[email protected]', 'Marketing');
+   ```
+
+Query Fluss:
+
+   ```sql
+   SELECT * FROM `developer_catalog`.`hr`.`employees` LIMIT 20;
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+
+    | id |  name | age |                        address |  salary |            
 email | department |
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+
+    |  1 |  Paul |  32 | California                 ... | 20000.0 |            
<NULL> |     <NULL> |
+    |  4 | David |  32 | Guangzhou                  ... |  8000.0 | 
[email protected] |         IT |
+    |  5 |   Eva |  27 | Hangzhou                   ... |  7100.0 |   
[email protected] |         HR |
+    |  6 | Frank |  35 |                         <NULL> |  9000.0 | 
[email protected] |    Finance |
+    |  7 | Grace |  29 |                         <NULL> |  7600.0 | 
[email protected] |  Marketing |
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+
+   ```
+
+#### Rename column
+Rename a column and insert data on the PostgreSQL side:
+
+   ```sql
+   ALTER TABLE hr.employees
+   RENAME COLUMN EMAIL TO WORK_EMAIL;
+   
+   INSERT INTO hr.employees (ID, NAME, AGE, SALARY, WORK_EMAIL, DEPARTMENT) 
VALUES
+   (8, 'Henry', 31, 8800.0, '[email protected]', 'Sales'),
+   (9, 'Ivy', 26, 6900.0, '[email protected]', 'Support');
+   ```
+
+Query Fluss to see the renamed column:
+
+   ```sql
+   SELECT * FROM `developer_catalog`.`hr`.`employees` LIMIT 20;
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+-------------------+
+    | id |  name | age |                        address |  salary |            
 email | department |        work_email |
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+-------------------+
+    |  1 |  Paul |  32 | California                 ... | 20000.0 |            
<NULL> |     <NULL> |            <NULL> |
+    |  4 | David |  32 | Guangzhou                  ... |  8000.0 | 
[email protected] |         IT |            <NULL> |
+    |  5 |   Eva |  27 | Hangzhou                   ... |  7100.0 |   
[email protected] |         HR |            <NULL> |
+    |  6 | Frank |  35 |                         <NULL> |  9000.0 | 
[email protected] |    Finance |            <NULL> |
+    |  7 | Grace |  29 |                         <NULL> |  7600.0 | 
[email protected] |  Marketing |            <NULL> |
+    |  8 | Henry |  31 |                         <NULL> |  8800.0 |            
<NULL> |      Sales | [email protected] |
+    |  9 |   Ivy |  26 |                         <NULL> |  6900.0 |            
<NULL> |    Support |   [email protected] |
+    
+----+-------+-----+--------------------------------+---------+-------------------+------------+-------------------+
+   ```
+
+## Clean up
+After finishing the tutorial, run the following command to stop all containers 
in the directory of `docker-compose.yml`:
+
+   ```shell
+   docker-compose down -v
+   ```
+Run the following command to stop the Flink cluster in the directory of Flink 
`flink-1.20.3`:
+
+   ```shell
+   ./bin/stop-cluster.sh
+   ```
+
+{{< top >}}


Reply via email to