JNSimba opened a new pull request, #4390: URL: https://github.com/apache/flink-cdc/pull/4390
This closes https://issues.apache.org/jira/browse/FLINK-39633. ## Problem During the snapshot backfill phase of PostgreSQL CDC (incremental snapshot), the WAL stream from the logical replication slot may carry change records for captured tables other than the one whose chunk is currently being snapshotted. `IncrementalSourceScanFetcher#isChangeRecordInChunkRange` does not filter records by tableId before delegating to `JdbcSourceFetchTaskContext#isRecordBetween`, which calls `getDatabaseSchema().tableFor(record.tableId)`. If the foreign table's schema is not yet present in `RelationAwarePostgresSchema`'s lazy cache, the lookup returns null and `ChunkUtils.getSplitColumn` throws NPE on `null.primaryKeyColumns()`: ``` java.lang.NullPointerException: Cannot invoke "io.debezium.relational.Table.primaryKeyColumns()" because "table" is null at org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils.getSplitColumn(ChunkUtils.java:45) at org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext.getSplitType(PostgresSourceFetchTaskContext.java:291) at org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext.isRecordBetween(JdbcSourceFetchTaskContext.java:76) at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.isChangeRecordInChunkRange(IncrementalSourceScanFetcher.java:265) ``` The streaming reader (`IncrementalSourceStreamFetcher#shouldEmit`) already guards against this case via `finishedSplitsInfo.containsKey(tableId)` before invoking `isRecordBetween`. The scan fetcher was missing the symmetric guard. ## Fix Filter change records by tableId in `IncrementalSourceScanFetcher#isChangeRecordInChunkRange` before delegating to `isRecordBetween`. Records whose tableId does not equal `currentSnapshotSplit.getTableId()` are skipped. ## Test plan - [x] Added `IncrementalSourceScanFetcherTest` with three cases: - foreign-table change record is filtered out (no NPE, `isRecordBetween` is never invoked) - same-table change record is still delegated to `isRecordBetween` - non-data change records (watermark / signal) short-circuit without calling `getTableId` - [x] All existing `flink-cdc-base` unit tests still pass (16/16) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
