Di Wu created FLINK-39633:
-----------------------------

             Summary: PostgreSQL CDC backfill throws NullPointerException when 
WAL stream carries records for other captured tables
                 Key: FLINK-39633
                 URL: https://issues.apache.org/jira/browse/FLINK-39633
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
    Affects Versions: cdc-3.6.0
            Reporter: Di Wu


During the snapshot backfill phase of PostgreSQL CDC (incremental snapshot), 
the WAL

  stream from the logical replication slot may carry change records for 
captured tables

  other than the one whose chunk is currently being snapshotted.                
                                              

  {\{IncrementalSourceScanFetcher#isChangeRecordInChunkRange}} does not filter 
records by

  tableId before delegating to \{{JdbcSourceFetchTaskContext#isRecordBetween}}, 
which calls                                    

  {\{getDatabaseSchema().tableFor(record.tableId)}}. If that table's schema is 
not yet                                         

  present in \{{RelationAwarePostgresSchema}}'s cache, the lookup returns null 
and                                             

  {\{ChunkUtils.getSplitColumn}} invokes \{{primaryKeyColumns()}} on null, 
throwing NPE and                                     

  aborting the snapshot split.                                                  
                                              

                                                                                
                                              

  h2. Stack trace                                                               
                                              

                  

  {noformat}

  java.lang.NullPointerException: Cannot invoke 
"io.debezium.relational.Table.primaryKeyColumns()" because "table" is null

      at 
org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils.getSplitColumn(ChunkUtils.java:45)
                  

      at 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext.getSplitType(PostgresSourceFetch

  TaskContext.java:291)                                                         
                                              

      at 
org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext.isRecordBetween(JdbcSourceFetc

  hTaskContext.java:76)                                                         
                                              

      at 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.isChangeRecordInChunkRange(I

  ncrementalSourceScanFetcher.java:265)                                         
                                              

      at 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollWithBuffer(IncrementalSo

  urceScanFetcher.java:182)                                                     
                                              

      at 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(Incremental

  SourceScanFetcher.java:122)                                                   
                                              

  {noformat}      

                                                                                
                                              

  h2. Root cause 

 

  The PostgreSQL logical replication slot streams WAL changes for *all* tables 
in the

  publication, regardless of which snapshot split is currently being processed. 
During

  backfill (between low and high watermark), \{{pollWithBuffer}} iterates 
change events and                                    

  calls \{{isChangeRecordInChunkRange}} on each. The current implementation in

  {\{flink-cdc-base}} is:                                                       
                                               

                  

  {code:java}                                                                   
                                              

  private boolean isChangeRecordInChunkRange(SourceRecord record) {

      if (taskContext.isDataChangeRecord(record)) {                             
                                              

          return taskContext.isRecordBetween(

                  record,                                                       
                                              

                  currentSnapshotSplit.getSplitStart(),

                  currentSnapshotSplit.getSplitEnd());                          
                                              

      }

      return false;                                                             
                                              

  }               

  {code}

 

  It does not verify that the record's tableId matches                          
                                              

  {\{currentSnapshotSplit.getTableId()}}. When a record for another captured 
table flows

  through:                                                                      
                                              

                  

  * \{{JdbcSourceFetchTaskContext#getSplitKey}} calls                           
                                               

    {\{getDatabaseSchema().tableFor(record.tableId)}};

  * \{{RelationAwarePostgresSchema}} loads relations lazily from \{{Relation}} 
messages, so                                     

    for a table whose schema has not yet been observed the lookup returns null; 
                                              

  * \{{ChunkUtils.getSplitColumn(null, ...)}} calls 
\{{null.primaryKeyColumns()}} -> NPE.                                       

                                                                                
                                              

  Even when the schema happens to be cached, comparing a record from table B 
against chunk                                    

  bounds that were derived from table A's primary key is semantically incorrect 
and would                                     

  silently pollute the output buffer.                                           
                                              

                  

  For contrast, the streaming reader                                            
                                              

  (\{{IncrementalSourceStreamFetcher#shouldEmit}}) already defends against this 
case via

  {\{finishedSplitsInfo.containsKey(tableId)}} before invoking 
\{{isRecordBetween}}. The scan                                   

  fetcher is missing the symmetric tableId guard.                               
                                              

  

  h2. Reproduction                                                              
                                              

                  

  * PostgreSQL source with incremental snapshot enabled (default) and

    {\{scan.incremental.snapshot.backfill.skip = false}}.

  * Two or more tables included in the publication, e.g. \{{public.table_a}} 
and                                               

    {\{public.table_b}}.

  * While Flink CDC is snapshotting \{{table_a}}, generate INSERT/UPDATE 
traffic on                                            

    {\{table_b}} so its WAL records arrive within \{{table_a}}'s backfill 
window.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to