Di Wu created FLINK-39633:
-----------------------------
Summary: PostgreSQL CDC backfill throws NullPointerException when
WAL stream carries records for other captured tables
Key: FLINK-39633
URL: https://issues.apache.org/jira/browse/FLINK-39633
Project: Flink
Issue Type: Improvement
Components: Flink CDC
Affects Versions: cdc-3.6.0
Reporter: Di Wu
During the snapshot backfill phase of PostgreSQL CDC (incremental snapshot),
the WAL
stream from the logical replication slot may carry change records for
captured tables
other than the one whose chunk is currently being snapshotted.
{\{IncrementalSourceScanFetcher#isChangeRecordInChunkRange}} does not filter
records by
tableId before delegating to \{{JdbcSourceFetchTaskContext#isRecordBetween}},
which calls
{\{getDatabaseSchema().tableFor(record.tableId)}}. If that table's schema is
not yet
present in \{{RelationAwarePostgresSchema}}'s cache, the lookup returns null
and
{\{ChunkUtils.getSplitColumn}} invokes \{{primaryKeyColumns()}} on null,
throwing NPE and
aborting the snapshot split.
h2. Stack trace
{noformat}
java.lang.NullPointerException: Cannot invoke
"io.debezium.relational.Table.primaryKeyColumns()" because "table" is null
at
org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils.getSplitColumn(ChunkUtils.java:45)
at
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext.getSplitType(PostgresSourceFetch
TaskContext.java:291)
at
org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext.isRecordBetween(JdbcSourceFetc
hTaskContext.java:76)
at
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.isChangeRecordInChunkRange(I
ncrementalSourceScanFetcher.java:265)
at
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollWithBuffer(IncrementalSo
urceScanFetcher.java:182)
at
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(Incremental
SourceScanFetcher.java:122)
{noformat}
h2. Root cause
The PostgreSQL logical replication slot streams WAL changes for *all* tables
in the
publication, regardless of which snapshot split is currently being processed.
During
backfill (between low and high watermark), \{{pollWithBuffer}} iterates
change events and
calls \{{isChangeRecordInChunkRange}} on each. The current implementation in
{\{flink-cdc-base}} is:
{code:java}
private boolean isChangeRecordInChunkRange(SourceRecord record) {
if (taskContext.isDataChangeRecord(record)) {
return taskContext.isRecordBetween(
record,
currentSnapshotSplit.getSplitStart(),
currentSnapshotSplit.getSplitEnd());
}
return false;
}
{code}
It does not verify that the record's tableId matches
{\{currentSnapshotSplit.getTableId()}}. When a record for another captured
table flows
through:
* \{{JdbcSourceFetchTaskContext#getSplitKey}} calls
{\{getDatabaseSchema().tableFor(record.tableId)}};
* \{{RelationAwarePostgresSchema}} loads relations lazily from \{{Relation}}
messages, so
for a table whose schema has not yet been observed the lookup returns null;
* \{{ChunkUtils.getSplitColumn(null, ...)}} calls
\{{null.primaryKeyColumns()}} -> NPE.
Even when the schema happens to be cached, comparing a record from table B
against chunk
bounds that were derived from table A's primary key is semantically incorrect
and would
silently pollute the output buffer.
For contrast, the streaming reader
(\{{IncrementalSourceStreamFetcher#shouldEmit}}) already defends against this
case via
{\{finishedSplitsInfo.containsKey(tableId)}} before invoking
\{{isRecordBetween}}. The scan
fetcher is missing the symmetric tableId guard.
h2. Reproduction
* PostgreSQL source with incremental snapshot enabled (default) and
{\{scan.incremental.snapshot.backfill.skip = false}}.
* Two or more tables included in the publication, e.g. \{{public.table_a}}
and
{\{public.table_b}}.
* While Flink CDC is snapshotting \{{table_a}}, generate INSERT/UPDATE
traffic on
{\{table_b}} so its WAL records arrive within \{{table_a}}'s backfill
window.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)