[ 
https://issues.apache.org/jira/browse/FLINK-39622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39622:
-----------------------------------
    Labels: pull-request-available  (was: )

> [postgres] CustomPostgresSchema re-reads JDBC metadata for every split, 
> causing O(N²) snapshot startup
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39622
>                 URL: https://issues.apache.org/jira/browse/FLINK-39622
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: 1.20.2
>            Reporter: Tomás Miguez
>            Priority: Major
>              Labels: pull-request-available
>
> Description
> CustomPostgresSchema#readTableSchema
> (`flink-connector-postgres-cdc/.../utils/CustomPostgresSchema.java`) calls
> jdbcConnection.readSchema(...) with the full set of captured table IDs, so a
> single call already populates Tables for every captured table. However the
> subsequent loop only iterates the tableIds argument (the subset requested for
> the current split) and only caches entries from that subset into
> schemasByTableId.
> As a result, when the snapshot phase requests schemas one split at a time, 
> each
> call re-reads JDBC metadata for all captured tables but throws most of the 
> work
> away. With N captured tables this becomes O(N²) JDBC metadata lookups during
> snapshot startup, which is very visible on Postgres instances with many
> captured tables (pg_catalog query load spikes). This makes Flink CDC unusable
> on applications that base their multitenancy on separation by schemas, for
> example, which is our case.
> Steps to reproduce
> 1. Configure the Postgres CDC source to capture a large number of tables 
> (e.g. 200+).
> 2. Start a fresh job (snapshot phase).
> 3. Observe snapshot startup latency and pg_catalog query volume scale with N².
> Proposed fix
> Iterate every table that readSchema discovered (tables.tableIds()) and cache
> all of them in schemasByTableId, while only adding the originally-requested
> subset to the returned tableChanges. Subsequent splits can then be served from
> the cache without another full metadata scan.
> A patch is available — happy to open a PR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to