lvyanquan commented on code in PR #3968: URL: https://github.com/apache/flink-cdc/pull/3968#discussion_r2241443779
########## docs/content.zh/docs/connectors/pipeline-connectors/postgres.md: ########## @@ -0,0 +1,403 @@ +--- +title: "Postgres" +weight: 2 +type: docs +aliases: +- /connectors/pipeline-connectors/Postgres +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Postgres Connector + +Postgres CDC Pipeline 连接器允许从 Postgres 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 本文描述了如何设置 Postgres CDC Pipeline 连接器。 +注意:因为Postgres的wal log日志中展示没有办法解析表结构变更记录,因此Postgres CDC Pipeline 不支持表结构变更。 Review Comment: > 因此Postgres CDC Pipeline 不支持表结构变更。 因此Postgres CDC Pipeline Source暂时不支持同步表结构变更。 ########## docs/content.zh/docs/connectors/pipeline-connectors/postgres.md: ########## @@ -0,0 +1,403 @@ +--- +title: "Postgres" +weight: 2 +type: docs +aliases: +- /connectors/pipeline-connectors/Postgres +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Postgres Connector + +Postgres CDC Pipeline 连接器允许从 Postgres 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 本文描述了如何设置 Postgres CDC Pipeline 连接器。 +注意:因为Postgres的wal log日志中展示没有办法解析表结构变更记录,因此Postgres CDC Pipeline 不支持表结构变更。 + +## 示例 + +从 Postgres 读取数据同步到 Doris 的 Pipeline 可以定义如下: + +```yaml +source: + type: posgtres + name: Postgres Source + hostname: 127.0.0.1 + port: 5432 + username: admin + password: pass + tables: adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.* + decoding.plugin.name: pgoutput + slot.name: pgtest + +sink: + type: doris + name: Doris Sink + fenodes: 127.0.0.1:8030 + username: root + password: pass + +pipeline: + name: Postgres to Doris Pipeline + parallelism: 4 +``` + +## 连接器配置项 + +<div class="highlight"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left" style="width: 10%">Option</th> + <th class="text-left" style="width: 8%">Required</th> + <th class="text-left" style="width: 7%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 65%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>hostname</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td> Postgres 数据库服务器的 IP 地址或主机名。</td> + </tr> + <tr> + <td>port</td> + <td>optional</td> + <td style="word-wrap: break-word;">5432</td> + <td>Integer</td> + <td>Postgres 数据库服务器的整数端口号。</td> + </tr> + <tr> + <td>username</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>连接到 Postgres 数据库服务器时要使用的 Postgres 用户的名称。</td> + </tr> + <tr> + <td>password</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>连接 Postgres 数据库服务器时使用的密码。</td> + </tr> + <tr> + <td>tables</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>需要监视的 Postgres 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。<br> + 需要注意的是,点号(.)被视为数据库和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。<br> + 例如,adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.*</td> + </tr> + <tr> + <td>slot.name</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>为从特定插件以流式传输方式获取某个数据库/模式的变更数据,所创建的 Postgre 逻辑解码槽(logical decoding slot)的名称。服务器使用这个槽(slot)将事件流式传输给你要配置的连接器(connector)。 + <br/>复制槽名称必须符合 <a href="https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION">PostgreSQL 复制插槽的命名规则</a>, 其规则如下: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."</td> + </tr> + <tr> + <td>tables.exclude</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>需要排除的 Postgres 数据库的表名,参数会在tables参数后发生排除作用。表名支持正则表达式,以排除满足正则表达式的多个表。<br> + 用法和tables参数相同</td> + </tr> + <tr> + <td>server-time-zone</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>数据库服务器中的会话时区, 例如: "Asia/Shanghai". + 它控制 Postgres 中的时间戳类型如何转换为字符串。 + 更多请参考 <a href="https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-data-types"> 这里</a>. + 如果没有设置,则使用ZoneId.systemDefault()来确定服务器时区。 + </td> + </tr> + <tr> + <td>scan.incremental.snapshot.chunk.size</td> + <td>optional</td> + <td style="word-wrap: break-word;">8096</td> + <td>Integer</td> + <td>表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。</td> + </tr> + <tr> + <td>scan.snapshot.fetch.size</td> + <td>optional</td> + <td style="word-wrap: break-word;">1024</td> + <td>Integer</td> + <td>读取表快照时每次读取数据的最大条数。</td> + </tr> + <tr> + <td>scan.startup.mode</td> + <td>optional</td> + <td style="word-wrap: break-word;">initial</td> + <td>String</td> + <td> Postgres CDC 消费者可选的启动模式, + 合法的模式为 "initial","latest-offset","committed-offset"和 ""snapshot"。</td> + </tr> + <tr> + <td>scan.incremental.snapshot.backfill.skip</td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td> + 是否在快照读取阶段跳过 backfill 。<br> + 如果跳过 backfill ,快照阶段捕获表的更改将在稍后的 wal log 读取阶段被回放,而不是合并到快照中。<br> + 警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些wal log事件可能会被重放(仅保证 at-least-once )。 + 例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的wal log事件应进行特殊处理。 + </tr> + <tr> + <td>scan.lsn-commit.checkpoints-num-delay</td> + <td>optional</td> + <td style="word-wrap: break-word;">3</td> + <td>Integer</td> + <td>在开始提交LSN偏移量之前,允许的检查点延迟次数。 <br> + 检查点的 LSN 偏移量将以滚动方式提交,最早的那个检查点标识符将首先从延迟的检查点中提交。 + </td> + </tr> + <tr> + <td>connect.timeout</td> + <td>optional</td> + <td style="word-wrap: break-word;">30s</td> + <td>Duration</td> + <td>连接器在尝试连接到 Postgres 数据库服务器后超时前应等待的最长时间。该时长不能少于250毫秒。</td> + </tr> + <tr> + <td>connect.max-retries</td> + <td>optional</td> + <td style="word-wrap: break-word;">3</td> + <td>Integer</td> + <td>连接器应重试以建立 Postgres 数据库服务器连接的最大重试次数。</td> + </tr> + <tr> + <td>connection.pool.size</td> + <td>optional</td> + <td style="word-wrap: break-word;">20</td> + <td>Integer</td> + <td>连接池大小。</td> + </tr> + <tr> + <td>jdbc.properties.*</td> + <td>optional</td> + <td style="word-wrap: break-word;">20</td> + <td>String</td> + <td>传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'.</td> + </tr> + <tr> + <td>heartbeat.interval</td> + <td>optional</td> + <td style="word-wrap: break-word;">30s</td> + <td>Duration</td> + <td>用于跟踪最新可用wal commited offset 偏移的发送心跳事件的间隔。</td> + </tr> + <tr> + <td>debezium.*</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 Postgres 服务器捕获数据更改。 + 例如: <code>'debezium.snapshot.mode' = 'never'</code>. + 查看更多关于 <a href="https://debezium.io/documentation/reference/1.9/connectors/postgresql.html"> Debezium 的 Postgres 连接器属性</a></td> + </tr> + <tr> + <td>scan.incremental.close-idle-reader.enabled</td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。<br> + 若 flink 版本大于等于 1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true,可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。</td> + </tr> + <tr> + <td>chunk-meta.group.size</td> + <td>optional</td> + <td style="word-wrap: break-word;">1000</td> + <td>String</td> + <td> + 分块元数据的组大小,如果元数据大小超过该组大小,则元数据将被划分为多个组。 + </td> + </tr> +<tr> + <td>metadata.list</td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>String</td> + <td> + 源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts。 + </td> + </tr> + </tbody> +</table> +</div> + +注意: +1. 配置选项`tables`指定 Postgres CDC 需要采集的表,格式为`db.schema1.tabe1,db.schema2.table2`,其中所有的db需要为同一个db,这是因为postgres链接url中需要指定dbname,目前cdc只支持链接一个db。 + +## 启动模式 + +配置选项`scan.startup.mode`指定 Postgres CDC 使用者的启动模式。有效枚举包括: +- `initial` (默认):在连接器首次启动时,会对被监控的数据库表执行一次初始快照(snapshot),然后继续读取复制插槽中的变更事件。。 +- `latest-offset`:在首次启动时不执行快照操作,而是直接从复制插槽的末尾开始读取,即只获取连接器启动之后发生的变更。 +- `committed-offset`:跳过快照阶段,从复制插槽中已确认的 confirmed_flush_lsn 偏移量开始读取事件,即从上次提交的位置继续读取变更。 +- `snapshot`: 只进行快照阶段,跳过增量阶段,快照阶段读取结束后退出。 + +### 可用的指标 + +指标系统能够帮助了解分片分发的进展, 下面列举出了支持的 Flink 指标 [Flink metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/): + +| Group | Name | Type | Description | +|------------------------|----------------------------|-------|----------------| +| namespace.schema.table | isSnapshotting | Gauge | 表是否在快照读取阶段 | +| namespace.schema.table | isStreamReading | Gauge | 表是否在增量读取阶段 | +| namespace.schema.table | numTablesSnapshotted | Gauge | 已经被快照读取完成的表的数量 | +| namespace.schema.table | numTablesRemaining | Gauge | 还没有被快照读取的表的数据 | +| namespace.schema.table | numSnapshotSplitsProcessed | Gauge | 正在处理的分片的数量 | +| namespace.schema.table | numSnapshotSplitsRemaining | Gauge | 还没有被处理的分片的数量 | +| namespace.schema.table | numSnapshotSplitsFinished | Gauge | 已经处理完成的分片的数据 | +| namespace.schema.table | snapshotStartTime | Gauge | 快照读取阶段开始的时间 | +| namespace.schema.table | snapshotEndTime | Gauge | 快照读取阶段结束的时间 | + +注意: +1. Group 名称是 `namespace.schema.table`,这里的 `namespace` 是实际的数据库名称, `schema` 是实际的 schema 名称, `table` 是实际的表名称。 + +## 数据类型映射 + + +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th> + <th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th> + </tr> + </thead> + <tbody> + <tr> + <td> + SMALLINT<br> + INT2<br> + SMALLSERIAL<br> + SERIAL2</td> + <td>SMALLINT</td> + </tr> + <tr> + <td> + INTEGER<br> + SERIAL</td> + <td>INT</td> + </tr> + <tr> + <td> + BIGINT<br> + BIGSERIAL</td> + <td>BIGINT</td> + </tr> + <tr> + <td></td> Review Comment: Something missed. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresMetadataAccessor.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.source.MetadataAccessor; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils; + +import io.debezium.connector.postgresql.PostgresPartition; + +import javax.annotation.Nullable; + +import java.util.List; + +/** {@link MetadataAccessor} for {@link PostgresDataSource}. */ +@Internal +public class PostgresMetadataAccessor implements MetadataAccessor { + + private final PostgresSourceConfig sourceConfig; + + private final PostgresPartition partition; Review Comment: This variable doesn't seem to be actually used. ########## docs/content/docs/connectors/pipeline-connectors/postgres.md: ########## @@ -0,0 +1,398 @@ +--- +title: "Postgres" +weight: 2 +type: docs +aliases: +- /connectors/pipeline-connectors/Postgres +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Postgres Connector + +Postgres connector allows reading snapshot data and incremental data from Postgres database and provides end-to-end full-database data synchronization capabilities. +This document describes how to setup the Postgres connector. +Note: Since the Postgres WAL log cannot parse table structure change records, Postgres CDC Pipeline does not support table structure changes. Review Comment: > Postgres CDC Pipeline does not support table structure changes. Postgres CDC Pipeline Source does not support synchronizing table structure changes currently. ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/PostgreSQLSource.java: ########## @@ -44,7 +44,7 @@ public static <T> Builder<T> builder() { /** Builder class of {@link PostgreSQLSource}. */ public static class Builder<T> { - private String pluginName = "decoderbufs"; + private String pluginName = "pgoutput"; Review Comment: Is this change necessary? ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java: ########## @@ -42,7 +42,7 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory { private static final String JDBC_DRIVER = "org.postgresql.Driver"; - private String pluginName = "decoderbufs"; + private String pluginName = "pgoutput"; Review Comment: Is this change necessary? ########## docs/content/docs/connectors/pipeline-connectors/postgres.md: ########## @@ -0,0 +1,398 @@ +--- +title: "Postgres" +weight: 2 +type: docs +aliases: +- /connectors/pipeline-connectors/Postgres +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Postgres Connector + +Postgres connector allows reading snapshot data and incremental data from Postgres database and provides end-to-end full-database data synchronization capabilities. +This document describes how to setup the Postgres connector. +Note: Since the Postgres WAL log cannot parse table structure change records, Postgres CDC Pipeline does not support table structure changes. + +## Example + +An example of the pipeline for reading data from Postgres and sink to Doris can be defined as follows: + +```yaml +source: + type: posgtres + name: Postgres Source + hostname: 127.0.0.1 + port: 5432 + username: admin + password: pass + tables: adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.* + decoding.plugin.name: pgoutput + slot.name: pgtest + +sink: + type: doris + name: Doris Sink + fenodes: 127.0.0.1:8030 + username: root + password: pass + +pipeline: + name: Postgres to Doris Pipeline + parallelism: 4 +``` + +## Connector Options + +<div class="highlight"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left" style="width: 10%">Option</th> + <th class="text-left" style="width: 8%">Required</th> + <th class="text-left" style="width: 7%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 65%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>hostname</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>IP address or hostname of the Postgres database server.</td> + </tr> + <tr> + <td>port</td> + <td>optional</td> + <td style="word-wrap: break-word;">5432</td> + <td>Integer</td> + <td>Integer port number of the Postgres database server.</td> + </tr> + <tr> + <td>username</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Name of the Postgres database to use when connecting to the Postgres database server.</td> + </tr> + <tr> + <td>password</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Password to use when connecting to the Postgres database server.</td> + </tr> + <tr> + <td>tables</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Table name of the Postgres database to monitor. The table-name also supports regular expressions to monitor multiple tables that satisfy the regular expressions. <br> + It is important to note that the dot (.) is treated as a delimiter for database and table names. + If there is a need to use a dot (.) in a regular expression to match any character, it is necessary to escape the dot with a backslash.<br> + 例如,adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.*</td> + </tr> + <tr> + <td>slot.name</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in + for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring. + <br/>Slot names must conform to <a href="https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION">PostgreSQL replication slot naming rules</a>, which state: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."</td> + </tr> + <tr> + <td>tables.exclude</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Table name of the Postgres database to exclude, parameter will have an exclusion effect after the tables parameter. The table-name also supports regular expressions to exclude multiple tables that satisfy the regular expressions. <br> + The usage is the same as the tables parameter</td> + </tr> + <tr> + <td>server-time-zone</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The session time zone in the database server, for example: "Asia/Shanghai".It controls how TIMESTAMP values in Postgres are converted to strings.For more information, please refer to <a href="https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-data-types"> here</a>.If not set, the system default time zone (ZoneId.systemDefault()) will be used to determine the server time zone. + </td> + </tr> + <tr> + <td>scan.incremental.snapshot.chunk.size</td> + <td>optional</td> + <td style="word-wrap: break-word;">8096</td> + <td>Integer</td> + <td>The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.</td> + </tr> + <tr> + <td>scan.snapshot.fetch.size</td> + <td>optional</td> + <td style="word-wrap: break-word;">1024</td> + <td>Integer</td> + <td>The maximum fetch size for per poll when read table snapshot.</td> + </tr> + <tr> + <td>scan.startup.mode</td> + <td>optional</td> + <td style="word-wrap: break-word;">initial</td> + <td>String</td> + <td> Optional startup mode for Postgres CDC consumer, valid enumerations are "initial","latest-offset","committed-offset"or"snapshot"。</td> + </tr> + <tr> + <td>scan.incremental.close-idle-reader.enabled</td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether to close idle readers at the end of the snapshot phase. <br> + The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.<br> + If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true, + so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true' + </td> + </tr> + <tr> + <td>scan.lsn-commit.checkpoints-num-delay</td> + <td>optional</td> + <td style="word-wrap: break-word;">3</td> + <td>Integer</td> + <td>The number of checkpoint delays before starting to commit the LSN offsets. <br> + The checkpoint LSN offsets will be committed in rolling fashion, the earliest checkpoint identifier will be committed first from the delayed checkpoints. + </td> + </tr> + <tr> + <td>connect.timeout</td> + <td>optional</td> + <td style="word-wrap: break-word;">30s</td> + <td>Duration</td> + <td>The maximum time that the connector should wait after trying to connect to the Postgres database server before timing out. + This value cannot be less than 250ms.</td> + </tr> + <tr> + <td>connect.max-retries</td> + <td>optional</td> + <td style="word-wrap: break-word;">3</td> + <td>Integer</td> + <td>The max retry times that the connector should retry to build Postgres database server connection.</td> + </tr> + <tr> + <td>connection.pool.size</td> + <td>optional</td> + <td style="word-wrap: break-word;">20</td> + <td>Integer</td> + <td>The connection pool size.</td> + </tr> + <tr> + <td>jdbc.properties.*</td> + <td>optional</td> + <td style="word-wrap: break-word;">20</td> + <td>String</td> + <td>Option to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'.</td> + </tr> + <tr> + <td>heartbeat.interval</td> + <td>optional</td> + <td style="word-wrap: break-word;">30s</td> + <td>Duration</td> + <td>The interval of sending heartbeat event for tracing the latest available wal log offsets.</td> + </tr> + <tr> + <td>debezium.*</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from postgres server. + For example: <code>'debezium.snapshot.mode' = 'never'</code>. + See more about the <a href="https://debezium.io/documentation/reference/1.9/connectors/postgresql.html">Debezium's Postgres Connector properties</a></td> + </tr> + <tr> + <td>chunk-meta.group.size</td> + <td>optional</td> + <td style="word-wrap: break-word;">1000</td> + <td>String</td> + <td> + The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups. + </td> + </tr> + <tr> + <td>metadata.list</td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>String</td> + <td> + List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts. + </td> + </tr> + </tbody> +</table> +</div> + + +Note: +1. The configuration option tables specifies the tables to be captured by Postgres CDC, in the format db.schema1.table1,db.schema2.table2. All db values must be the same, as the PostgreSQL connection URL requires a single database name. Currently, CDC only supports connecting to one database. + +## Startup Reading Position + +The config option `scan.startup.mode` specifies the startup mode for PostgreSQL CDC consumer. The valid enumerations are: + +- `initial` (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the replication slot. +- `latest-offset`: Never to perform snapshot on the monitored database tables upon first startup, just read from + the end of the replication which means only have the changes since the connector was started. +- `committed-offset`: Skip snapshot phase and start reading events from a `confirmed_flush_lsn` offset of replication slot. +- `snapshot`: Only the snapshot phase is performed and exits after the snapshot phase reading is completed. + +## Available Source metrics + +Metrics can help understand the progress of assignments, and the following are the supported [Flink metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/): + +| Group | Name | Type | Description | +|------------------------|----------------------------|-------|-----------------------------------------------------| +| namespace.schema.table | isSnapshotting | Gauge | Weather the table is snapshotting or not | +| namespace.schema.table | isStreamReading | Gauge | Weather the table is stream reading or not | +| namespace.schema.table | numTablesSnapshotted | Gauge | The number of tables that have been snapshotted | +| namespace.schema.table | numTablesRemaining | Gauge | The number of tables that have not been snapshotted | +| namespace.schema.table | numSnapshotSplitsProcessed | Gauge | The number of splits that is being processed | +| namespace.schema.table | numSnapshotSplitsRemaining | Gauge | The number of splits that have not been processed | +| namespace.schema.table | numSnapshotSplitsFinished | Gauge | The number of splits that have been processed | +| namespace.schema.table | snapshotStartTime | Gauge | The time when the snapshot started | +| namespace.schema.table | snapshotEndTime | Gauge | The time when the snapshot ended | + +Notice: +1. The group name is `namespace.schema.table`, where `namespace` is the actual database name, `schema` is the actual schema name, and `table` is the actual table name. + +## Data Type Mapping + + +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th> + <th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th> + </tr> + </thead> + <tbody> + <tr> + <td> + SMALLINT<br> + INT2<br> + SMALLSERIAL<br> + SERIAL2</td> + <td>SMALLINT</td> + </tr> + <tr> + <td> + INTEGER<br> + SERIAL</td> + <td>INT</td> + </tr> + <tr> + <td> + BIGINT<br> + BIGSERIAL</td> + <td>BIGINT</td> + </tr> + <tr> + <td></td> + <td>DECIMAL(20, 0)</td> + </tr> + <tr> + <td>BIGINT</td> + <td>BIGINT</td> + </tr> + <tr> + <td> + REAL<br> + FLOAT4</td> + <td>FLOAT</td> + </tr> + <tr> + <td> + FLOAT8<br> + DOUBLE PRECISION</td> + <td>DOUBLE</td> + </tr> + <tr> + <td> + NUMERIC(p, s)<br> + DECIMAL(p, s)</td> + <td>DECIMAL(p, s)</td> + </tr> + <tr> + <td>BOOLEAN</td> + <td>BOOLEAN</td> + </tr> + <tr> + <td>DATE</td> + <td>DATE</td> + </tr> + <tr> + <td>TIME [(p)] [WITHOUT TIMEZONE]</td> + <td>TIME [(p)] [WITHOUT TIMEZONE]</td> + </tr> + <tr> + <td>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</td> + <td>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</td> + </tr> + <tr> + <td> + CHAR(n)<br> + CHARACTER(n)<br> + VARCHAR(n)<br> + CHARACTER VARYING(n)<br> + TEXT</td> + <td>STRING</td> Review Comment: Precision was loss? This is not consistent with PostgresTypeUtils. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source.reader; + +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory; +import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState; +import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics; +import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter; +import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils; +import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.connector.base.source.reader.RecordEmitter; + +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.relational.TableId; +import org.apache.kafka.connect.source.SourceRecord; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent; +import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getTableId; +import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord; +import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent; + +/** The {@link RecordEmitter} implementation for pipeline oracle connector. */ Review Comment: > pipeline oracle connector PostgreSQL pipeline connector ########## docs/content/docs/connectors/pipeline-connectors/postgres.md: ########## @@ -0,0 +1,398 @@ +--- +title: "Postgres" +weight: 2 +type: docs +aliases: +- /connectors/pipeline-connectors/Postgres +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Postgres Connector + +Postgres connector allows reading snapshot data and incremental data from Postgres database and provides end-to-end full-database data synchronization capabilities. +This document describes how to setup the Postgres connector. +Note: Since the Postgres WAL log cannot parse table structure change records, Postgres CDC Pipeline does not support table structure changes. + +## Example + +An example of the pipeline for reading data from Postgres and sink to Doris can be defined as follows: + +```yaml +source: + type: posgtres + name: Postgres Source + hostname: 127.0.0.1 + port: 5432 + username: admin + password: pass + tables: adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.* + decoding.plugin.name: pgoutput + slot.name: pgtest + +sink: + type: doris + name: Doris Sink + fenodes: 127.0.0.1:8030 + username: root + password: pass + +pipeline: + name: Postgres to Doris Pipeline + parallelism: 4 +``` + +## Connector Options + +<div class="highlight"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left" style="width: 10%">Option</th> + <th class="text-left" style="width: 8%">Required</th> + <th class="text-left" style="width: 7%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 65%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>hostname</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>IP address or hostname of the Postgres database server.</td> + </tr> + <tr> + <td>port</td> + <td>optional</td> + <td style="word-wrap: break-word;">5432</td> + <td>Integer</td> + <td>Integer port number of the Postgres database server.</td> + </tr> + <tr> + <td>username</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Name of the Postgres database to use when connecting to the Postgres database server.</td> + </tr> + <tr> + <td>password</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Password to use when connecting to the Postgres database server.</td> + </tr> + <tr> + <td>tables</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Table name of the Postgres database to monitor. The table-name also supports regular expressions to monitor multiple tables that satisfy the regular expressions. <br> + It is important to note that the dot (.) is treated as a delimiter for database and table names. + If there is a need to use a dot (.) in a regular expression to match any character, it is necessary to escape the dot with a backslash.<br> + 例如,adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, [app|web].schema_\.*.order_\.*</td> + </tr> + <tr> + <td>slot.name</td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in + for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring. + <br/>Slot names must conform to <a href="https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION">PostgreSQL replication slot naming rules</a>, which state: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."</td> + </tr> + <tr> + <td>tables.exclude</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Table name of the Postgres database to exclude, parameter will have an exclusion effect after the tables parameter. The table-name also supports regular expressions to exclude multiple tables that satisfy the regular expressions. <br> + The usage is the same as the tables parameter</td> + </tr> + <tr> + <td>server-time-zone</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The session time zone in the database server, for example: "Asia/Shanghai".It controls how TIMESTAMP values in Postgres are converted to strings.For more information, please refer to <a href="https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-data-types"> here</a>.If not set, the system default time zone (ZoneId.systemDefault()) will be used to determine the server time zone. + </td> + </tr> + <tr> + <td>scan.incremental.snapshot.chunk.size</td> + <td>optional</td> + <td style="word-wrap: break-word;">8096</td> + <td>Integer</td> + <td>The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.</td> + </tr> + <tr> + <td>scan.snapshot.fetch.size</td> + <td>optional</td> + <td style="word-wrap: break-word;">1024</td> + <td>Integer</td> + <td>The maximum fetch size for per poll when read table snapshot.</td> + </tr> + <tr> + <td>scan.startup.mode</td> + <td>optional</td> + <td style="word-wrap: break-word;">initial</td> + <td>String</td> + <td> Optional startup mode for Postgres CDC consumer, valid enumerations are "initial","latest-offset","committed-offset"or"snapshot"。</td> + </tr> + <tr> + <td>scan.incremental.close-idle-reader.enabled</td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether to close idle readers at the end of the snapshot phase. <br> + The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true.<br> + If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true, + so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true' + </td> + </tr> + <tr> + <td>scan.lsn-commit.checkpoints-num-delay</td> + <td>optional</td> + <td style="word-wrap: break-word;">3</td> + <td>Integer</td> + <td>The number of checkpoint delays before starting to commit the LSN offsets. <br> + The checkpoint LSN offsets will be committed in rolling fashion, the earliest checkpoint identifier will be committed first from the delayed checkpoints. + </td> + </tr> + <tr> + <td>connect.timeout</td> + <td>optional</td> + <td style="word-wrap: break-word;">30s</td> + <td>Duration</td> + <td>The maximum time that the connector should wait after trying to connect to the Postgres database server before timing out. + This value cannot be less than 250ms.</td> + </tr> + <tr> + <td>connect.max-retries</td> + <td>optional</td> + <td style="word-wrap: break-word;">3</td> + <td>Integer</td> + <td>The max retry times that the connector should retry to build Postgres database server connection.</td> + </tr> + <tr> + <td>connection.pool.size</td> + <td>optional</td> + <td style="word-wrap: break-word;">20</td> + <td>Integer</td> + <td>The connection pool size.</td> + </tr> + <tr> + <td>jdbc.properties.*</td> + <td>optional</td> + <td style="word-wrap: break-word;">20</td> + <td>String</td> + <td>Option to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'.</td> + </tr> + <tr> + <td>heartbeat.interval</td> + <td>optional</td> + <td style="word-wrap: break-word;">30s</td> + <td>Duration</td> + <td>The interval of sending heartbeat event for tracing the latest available wal log offsets.</td> + </tr> + <tr> + <td>debezium.*</td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from postgres server. + For example: <code>'debezium.snapshot.mode' = 'never'</code>. + See more about the <a href="https://debezium.io/documentation/reference/1.9/connectors/postgresql.html">Debezium's Postgres Connector properties</a></td> + </tr> + <tr> + <td>chunk-meta.group.size</td> + <td>optional</td> + <td style="word-wrap: break-word;">1000</td> + <td>String</td> + <td> + The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups. + </td> + </tr> + <tr> + <td>metadata.list</td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>String</td> + <td> + List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts. + </td> + </tr> + </tbody> +</table> +</div> + + +Note: +1. The configuration option tables specifies the tables to be captured by Postgres CDC, in the format db.schema1.table1,db.schema2.table2. All db values must be the same, as the PostgreSQL connection URL requires a single database name. Currently, CDC only supports connecting to one database. + +## Startup Reading Position + +The config option `scan.startup.mode` specifies the startup mode for PostgreSQL CDC consumer. The valid enumerations are: + +- `initial` (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the replication slot. +- `latest-offset`: Never to perform snapshot on the monitored database tables upon first startup, just read from + the end of the replication which means only have the changes since the connector was started. +- `committed-offset`: Skip snapshot phase and start reading events from a `confirmed_flush_lsn` offset of replication slot. +- `snapshot`: Only the snapshot phase is performed and exits after the snapshot phase reading is completed. + +## Available Source metrics + +Metrics can help understand the progress of assignments, and the following are the supported [Flink metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/): + +| Group | Name | Type | Description | +|------------------------|----------------------------|-------|-----------------------------------------------------| +| namespace.schema.table | isSnapshotting | Gauge | Weather the table is snapshotting or not | +| namespace.schema.table | isStreamReading | Gauge | Weather the table is stream reading or not | +| namespace.schema.table | numTablesSnapshotted | Gauge | The number of tables that have been snapshotted | +| namespace.schema.table | numTablesRemaining | Gauge | The number of tables that have not been snapshotted | +| namespace.schema.table | numSnapshotSplitsProcessed | Gauge | The number of splits that is being processed | +| namespace.schema.table | numSnapshotSplitsRemaining | Gauge | The number of splits that have not been processed | +| namespace.schema.table | numSnapshotSplitsFinished | Gauge | The number of splits that have been processed | +| namespace.schema.table | snapshotStartTime | Gauge | The time when the snapshot started | +| namespace.schema.table | snapshotEndTime | Gauge | The time when the snapshot ended | + +Notice: +1. The group name is `namespace.schema.table`, where `namespace` is the actual database name, `schema` is the actual schema name, and `table` is the actual table name. + +## Data Type Mapping + + +<div class="wy-table-responsive"> +<table class="colwidths-auto docutils"> + <thead> + <tr> + <th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th> + <th class="text-left">CDC type<a href="{% link dev/table/types.md %}"></a></th> + </tr> + </thead> + <tbody> + <tr> + <td> + SMALLINT<br> + INT2<br> + SMALLSERIAL<br> + SERIAL2</td> + <td>SMALLINT</td> + </tr> + <tr> + <td> + INTEGER<br> + SERIAL</td> + <td>INT</td> + </tr> + <tr> + <td> + BIGINT<br> + BIGSERIAL</td> + <td>BIGINT</td> + </tr> + <tr> + <td></td> Review Comment: Something missed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
