[ 
https://issues.apache.org/jira/browse/FLINK-38881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tchivs updated FLINK-38881:
---------------------------
    Description: 
h1. PostgreSQL Partition Table Routing Support

h2. Background

When using Flink CDC to synchronize PostgreSQL partition tables, we encountered 
severe performance issues and functional defects.

h2. Problems Encountered

h3. Problem 1: Frequent Schema Refresh

*Symptom*: Schema was refreshed on every table access, causing massive 
unnecessary database queries.

*Root Cause*: {{PostgresSchema.tableFor()}} method calls {{refresh()}} when 
table doesn't exist, and {{refresh()}} reloads schema for ALL tables.

*Impact*:
* High database connection pressure
* High CPU and memory consumption
* Increased synchronization latency

h3. Problem 2: Full Schema Load for Single Table Query

*Symptom*: Even when requesting schema for a single table, all tables matching 
{{table.include.list}} were loaded.

*Root Cause*: {{PostgresConnection.readSchema()}} uses {{TableFilter}} for 
filtering, but the underlying SQL query returns all tables, then filters in 
memory.

*Impact*:
* With hundreds of partition tables, each query processes massive data
* Memory usage increases dramatically

h3. Problem 3: Partition Parent Table Missing Primary Key (PostgreSQL 10)

*Symptom*: PostgreSQL 10 partition parent tables have no primary key 
definition, forcing users to use regex patterns to match child partitions 
individually.

*Root Cause*: PostgreSQL 10 design defines primary keys on child partitions 
only, parent table itself cannot have PK constraints. Users cannot directly 
monitor the parent table and must use regex to match all child partitions.

*Note*: PostgreSQL 11+ supports {{partition.discovery.enabled}} option which 
can automatically discover partitions. But for PostgreSQL 10, this limitation 
leads to Problem 4 below.

h3. Problem 4: CreateTableEvent Storm (Caused by Problem 3)

*Symptom*: When using regex to match child partitions, each partition generates 
an independent {{CreateTableEvent}}, flooding downstream systems with table 
creation events.

*Root Cause*: Due to Problem 3, users must use regex patterns like 
{{public\.orders_\d+}} to match all child partitions. Flink CDC treats each 
matched child partition as an independent table.

*Impact*:

# *Downstream System Overload*:
#* Each child partition generates its own {{CreateTableEvent}}
#* Hundreds of partitions = hundreds of schema change events
#* Downstream systems overwhelmed with table creation events
#* Data scattered across multiple tables, cannot be queried uniformly

# *Massive Schema Loading*:
#* Each single table query triggers full schema load for ALL tables
#* Initialization takes extremely long time fetching schema for every child
#* High pressure on PostgreSQL database

# *Checkpoint Timeout Risk*:
#* Schema loading time too long
#* Database connections may timeout
#* Easily triggers Flink checkpoint timeout failures

# *Excessive Resource Consumption*:
#* {{EventSerializer}} with {{ListSerializer}} caches massive amounts of data
#* Each child table requires separate {{SchemaChangeEvent}} publishing
#* Memory pressure from storing schemas for all partitions

h2. Optimization Approach

h3. Approach 1: Partition Table Routing Mechanism

*Solution*: Introduce {{partition.tables}} config to route child partition 
events to parent table.

{code}
partition.tables: "public.orders:public\.orders_\d{6}"
{code}

*Implementation*:
* {{PostgresPartitionRules}}: Parse config, extract parent table and child 
regex patterns
* {{PostgresPartitionRouter}}: Match child table ID to parent table ID via regex
* During WAL event processing, automatically replace child partition TableId 
with parent

h3. Approach 2: Partition-Aware Table Filter

*Solution*: Filter out child partition tables at Debezium level, keep only 
parent tables.

*Implementation*:
* {{PostgresPartitionConnectorConfig}}: Extend {{PostgresConnectorConfig}}
* {{PartitionAwareTableFilters}}: Wrap original filter, exclude tables matching 
child patterns
* Child partitions won't appear in {{table.include.list}} results

h3. Approach 3: Schema Routing and Caching

*Solution*: Create dedicated Schema class supporting partition table routing 
and caching.

*Implementation*:
* {{PostgresPartitionRoutingSchema}}: Extend {{PostgresSchema}}
* Override {{tableFor()}} method to route child table requests to parent
* Parent table schema loaded once, reused by all child partitions

h3. Approach 4: Primary Key Inheritance

*Solution*: Inherit primary key from representative child partition to parent 
table.

*Implementation*:
* Query {{pg_inherits}} to get parent-child relationships
* Select one child partition as representative
* Read child partition's PK definition, apply to parent table schema

h2. Core Classes

||Class||Responsibility||
|{{PostgresPartitionRules}}|Parse {{partition.tables}} config, extract parent 
and child regex|
|{{PostgresPartitionRouter}}|Route child TableId to parent via regex matching|
|{{PostgresPartitionConnectorConfig}}|Extend Debezium config with 
partition-aware table filters|
|{{PostgresPartitionRoutingSchema}}|Extend PostgresSchema with routing and PK 
inheritance|
|{{PatternCache}}|Regex compilation cache to avoid repeated compilation|

h2. Performance Comparison

||Metric||Before||After||
|Schema refresh count|Every access|First load only|
|Single table query|Load all tables|Load requested table only|
|CreateTableEvent|N (N = partition count)|1 (parent table)|
|Memory usage|High (all partition schemas)|Low (parent schema only)|
|DB query pressure|High|Low|


  was:
h1. PostgreSQL Partition Table Routing Support

h2. Background

When using Flink CDC to synchronize PostgreSQL partition tables, we encountered 
severe performance issues and functional defects.

h2. Problems Encountered

h3. Problem 1: Frequent Schema Refresh

*Symptom*: Schema was refreshed on every table access, causing massive 
unnecessary database queries.

*Root Cause*: {{PostgresSchema.tableFor()}} method calls {{refresh()}} when 
table doesn't exist, and {{refresh()}} reloads schema for ALL tables.

*Impact*:
* High database connection pressure
* High CPU and memory consumption
* Increased synchronization latency

h3. Problem 2: Full Schema Load for Single Table Query

*Symptom*: Even when requesting schema for a single table, all tables matching 
{{table.include.list}} were loaded.

*Root Cause*: {{PostgresConnection.readSchema()}} uses {{TableFilter}} for 
filtering, but the underlying SQL query returns all tables, then filters in 
memory.

*Impact*:
* With hundreds of partition tables, each query processes massive data
* Memory usage increases dramatically

h3. Problem 3: CreateTableEvent Storm

*Symptom*: Each partition child table generates an independent 
{{CreateTableEvent}}, flooding downstream systems with table creation events.

*Root Cause*: Flink CDC treats each partition child table as an independent 
table.

*Impact*:
* Downstream systems (Kafka, Doris, etc.) must handle hundreds of create table 
events
* Data scattered across multiple tables, cannot be queried uniformly
* Doesn't match business expectations (users expect to see parent table, not 
child partitions)

h3. Problem 4: Partition Parent Table Missing Primary Key (PostgreSQL 10)

*Symptom*: PostgreSQL 10 partition parent tables have no primary key 
definition, forcing users to use regex patterns to match child partitions 
individually.

*Root Cause*: PostgreSQL 10 design defines primary keys on child partitions 
only, parent table itself cannot have PK constraints. This forces CDC to treat 
each child partition as a separate table.

*Impact*:

# *CreateTableEvent Storm*:
#* Each child partition generates its own {{CreateTableEvent}}
#* Hundreds of partitions = hundreds of schema change events
#* Downstream systems overwhelmed with table creation events

# *Massive Schema Loading*:
#* Each single table query triggers full schema load for ALL tables
#* Initialization takes extremely long time fetching schema for every child
#* High pressure on PostgreSQL database

# *Checkpoint Timeout Risk*:
#* Schema loading time too long
#* Database connections may timeout
#* Easily triggers Flink checkpoint timeout failures

# *Excessive Resource Consumption*:
#* {{EventSerializer}} with {{ListSerializer}} caches massive amounts of data
#* Each child table requires separate {{SchemaChangeEvent}} publishing
#* Memory pressure from storing schemas for all partitions

h2. Optimization Approach

h3. Approach 1: Partition Table Routing Mechanism

*Solution*: Introduce {{partition.tables}} config to route child partition 
events to parent table.

{code}
partition.tables: "public.orders:public\.orders_\d{6}"
{code}

*Implementation*:
* {{PostgresPartitionRules}}: Parse config, extract parent table and child 
regex patterns
* {{PostgresPartitionRouter}}: Match child table ID to parent table ID via regex
* During WAL event processing, automatically replace child partition TableId 
with parent

h3. Approach 2: Partition-Aware Table Filter

*Solution*: Filter out child partition tables at Debezium level, keep only 
parent tables.

*Implementation*:
* {{PostgresPartitionConnectorConfig}}: Extend {{PostgresConnectorConfig}}
* {{PartitionAwareTableFilters}}: Wrap original filter, exclude tables matching 
child patterns
* Child partitions won't appear in {{table.include.list}} results

h3. Approach 3: Schema Routing and Caching

*Solution*: Create dedicated Schema class supporting partition table routing 
and caching.

*Implementation*:
* {{PostgresPartitionRoutingSchema}}: Extend {{PostgresSchema}}
* Override {{tableFor()}} method to route child table requests to parent
* Parent table schema loaded once, reused by all child partitions

h3. Approach 4: Primary Key Inheritance

*Solution*: Inherit primary key from representative child partition to parent 
table.

*Implementation*:
* Query {{pg_inherits}} to get parent-child relationships
* Select one child partition as representative
* Read child partition's PK definition, apply to parent table schema

h2. Core Classes

||Class||Responsibility||
|{{PostgresPartitionRules}}|Parse {{partition.tables}} config, extract parent 
and child regex|
|{{PostgresPartitionRouter}}|Route child TableId to parent via regex matching|
|{{PostgresPartitionConnectorConfig}}|Extend Debezium config with 
partition-aware table filters|
|{{PostgresPartitionRoutingSchema}}|Extend PostgresSchema with routing and PK 
inheritance|
|{{PatternCache}}|Regex compilation cache to avoid repeated compilation|

h2. Performance Comparison

||Metric||Before||After||
|Schema refresh count|Every access|First load only|
|Single table query|Load all tables|Load requested table only|
|CreateTableEvent|N (N = partition count)|1 (parent table)|
|Memory usage|High (all partition schemas)|Low (parent schema only)|
|DB query pressure|High|Low|



> PostgreSQL Partition Table Routing Support
> ------------------------------------------
>
>                 Key: FLINK-38881
>                 URL: https://issues.apache.org/jira/browse/FLINK-38881
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>            Reporter: tchivs
>            Priority: Major
>
> h1. PostgreSQL Partition Table Routing Support
> h2. Background
> When using Flink CDC to synchronize PostgreSQL partition tables, we 
> encountered severe performance issues and functional defects.
> h2. Problems Encountered
> h3. Problem 1: Frequent Schema Refresh
> *Symptom*: Schema was refreshed on every table access, causing massive 
> unnecessary database queries.
> *Root Cause*: {{PostgresSchema.tableFor()}} method calls {{refresh()}} when 
> table doesn't exist, and {{refresh()}} reloads schema for ALL tables.
> *Impact*:
> * High database connection pressure
> * High CPU and memory consumption
> * Increased synchronization latency
> h3. Problem 2: Full Schema Load for Single Table Query
> *Symptom*: Even when requesting schema for a single table, all tables 
> matching {{table.include.list}} were loaded.
> *Root Cause*: {{PostgresConnection.readSchema()}} uses {{TableFilter}} for 
> filtering, but the underlying SQL query returns all tables, then filters in 
> memory.
> *Impact*:
> * With hundreds of partition tables, each query processes massive data
> * Memory usage increases dramatically
> h3. Problem 3: Partition Parent Table Missing Primary Key (PostgreSQL 10)
> *Symptom*: PostgreSQL 10 partition parent tables have no primary key 
> definition, forcing users to use regex patterns to match child partitions 
> individually.
> *Root Cause*: PostgreSQL 10 design defines primary keys on child partitions 
> only, parent table itself cannot have PK constraints. Users cannot directly 
> monitor the parent table and must use regex to match all child partitions.
> *Note*: PostgreSQL 11+ supports {{partition.discovery.enabled}} option which 
> can automatically discover partitions. But for PostgreSQL 10, this limitation 
> leads to Problem 4 below.
> h3. Problem 4: CreateTableEvent Storm (Caused by Problem 3)
> *Symptom*: When using regex to match child partitions, each partition 
> generates an independent {{CreateTableEvent}}, flooding downstream systems 
> with table creation events.
> *Root Cause*: Due to Problem 3, users must use regex patterns like 
> {{public\.orders_\d+}} to match all child partitions. Flink CDC treats each 
> matched child partition as an independent table.
> *Impact*:
> # *Downstream System Overload*:
> #* Each child partition generates its own {{CreateTableEvent}}
> #* Hundreds of partitions = hundreds of schema change events
> #* Downstream systems overwhelmed with table creation events
> #* Data scattered across multiple tables, cannot be queried uniformly
> # *Massive Schema Loading*:
> #* Each single table query triggers full schema load for ALL tables
> #* Initialization takes extremely long time fetching schema for every child
> #* High pressure on PostgreSQL database
> # *Checkpoint Timeout Risk*:
> #* Schema loading time too long
> #* Database connections may timeout
> #* Easily triggers Flink checkpoint timeout failures
> # *Excessive Resource Consumption*:
> #* {{EventSerializer}} with {{ListSerializer}} caches massive amounts of data
> #* Each child table requires separate {{SchemaChangeEvent}} publishing
> #* Memory pressure from storing schemas for all partitions
> h2. Optimization Approach
> h3. Approach 1: Partition Table Routing Mechanism
> *Solution*: Introduce {{partition.tables}} config to route child partition 
> events to parent table.
> {code}
> partition.tables: "public.orders:public\.orders_\d{6}"
> {code}
> *Implementation*:
> * {{PostgresPartitionRules}}: Parse config, extract parent table and child 
> regex patterns
> * {{PostgresPartitionRouter}}: Match child table ID to parent table ID via 
> regex
> * During WAL event processing, automatically replace child partition TableId 
> with parent
> h3. Approach 2: Partition-Aware Table Filter
> *Solution*: Filter out child partition tables at Debezium level, keep only 
> parent tables.
> *Implementation*:
> * {{PostgresPartitionConnectorConfig}}: Extend {{PostgresConnectorConfig}}
> * {{PartitionAwareTableFilters}}: Wrap original filter, exclude tables 
> matching child patterns
> * Child partitions won't appear in {{table.include.list}} results
> h3. Approach 3: Schema Routing and Caching
> *Solution*: Create dedicated Schema class supporting partition table routing 
> and caching.
> *Implementation*:
> * {{PostgresPartitionRoutingSchema}}: Extend {{PostgresSchema}}
> * Override {{tableFor()}} method to route child table requests to parent
> * Parent table schema loaded once, reused by all child partitions
> h3. Approach 4: Primary Key Inheritance
> *Solution*: Inherit primary key from representative child partition to parent 
> table.
> *Implementation*:
> * Query {{pg_inherits}} to get parent-child relationships
> * Select one child partition as representative
> * Read child partition's PK definition, apply to parent table schema
> h2. Core Classes
> ||Class||Responsibility||
> |{{PostgresPartitionRules}}|Parse {{partition.tables}} config, extract parent 
> and child regex|
> |{{PostgresPartitionRouter}}|Route child TableId to parent via regex matching|
> |{{PostgresPartitionConnectorConfig}}|Extend Debezium config with 
> partition-aware table filters|
> |{{PostgresPartitionRoutingSchema}}|Extend PostgresSchema with routing and PK 
> inheritance|
> |{{PatternCache}}|Regex compilation cache to avoid repeated compilation|
> h2. Performance Comparison
> ||Metric||Before||After||
> |Schema refresh count|Every access|First load only|
> |Single table query|Load all tables|Load requested table only|
> |CreateTableEvent|N (N = partition count)|1 (parent table)|
> |Memory usage|High (all partition schemas)|Low (parent schema only)|
> |DB query pressure|High|Low|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to