Yang Guo created FLINK-38647:
--------------------------------

             Summary: NullPointerException in Postgres CDC pipeline when update 
data in table
                 Key: FLINK-38647
                 URL: https://issues.apache.org/jira/browse/FLINK-38647
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.5.0
            Reporter: Yang Guo


I am building a CDC pipeline to capture data change from Postgres table to 
Fluss. The CDC pipeline will fail when I update records in Postgres table. Here 
is the error:

```
java.lang.NullPointerException
at 
org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.lambda$inferStruct$1(DebeziumSchemaDataTypeInference.java:209)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
at 
org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.inferStruct(DebeziumSchemaDataTypeInference.java:210)
at 
org.apache.flink.cdc.connectors.postgres.source.PostgresSchemaDataTypeInference.inferStruct(PostgresSchemaDataTypeInference.java:44)
at 
org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:83)
at 
org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:58)
at 
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.extractDataRecord(DebeziumEventDeserializationSchema.java:156)
at 
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.extractBeforeDataRecord(DebeziumEventDeserializationSchema.java:146)
at 
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserializeDataChangeRecord(DebeziumEventDeserializationSchema.java:126)
at 
org.apache.flink.cdc.debezium.event.SourceRecordEventDeserializer.deserialize(SourceRecordEventDeserializer.java:49)
at 
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:105)
at 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:160)
at 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:118)
at 
org.apache.flink.cdc.connectors.postgres.source.reader.PostgresPipelineRecordEmitter.processElement(PostgresPipelineRecordEmitter.java:114)
at 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:88)
at 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:57)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
```

The Postgres table DDL is:


 
{code:java}
CREATE TABLE orders (
    order_id INT PRIMARY KEY,
    customer_name VARCHAR(100) NOT NULL,
    order_date DATE NOT NULL DEFAULT CURRENT_DATE,
    total_amount DECIMAL(10,2) NOT NULL
); {code}
Data Operations:

I can successfully get the data change records(+I, -D) for INSERT and DELETE in 
Fluss. But when UPDATE, the CDC pipeline will fail with NPE listed above. It 
looks failed `inferStruct`.
{code:java}

INSERT INTO orders (order_id, customer_name, order_date, total_amount) VALUES
(1001, 'John Smith', '2024-01-15', 299.99),
(1002, 'Emma Johnson', '2024-01-16', 150.50),
(1003, 'Michael Brown', '2024-01-17', 89.99),
(1004, 'Sarah Davis', '2024-01-18', 450.00),
(1005, 'David Wilson', '2024-01-19', 199.99);

DELETE FROM orders where order_id = '1004';

// this will make the CDC pipeline fail with NPE
UPDATE orders SET customer_name = 'James Taylor' WHERE order_id = 1005;{code}
 



*More Information:*

 

CDC Pipeline is Postgres to Fluss 

Versions: Postgresql@14, Flink 1.20.1, Flink CDC 3.5.0, Fluss 0.7 
 
CDC configuration yaml file:
{code:java}
source:   
  type: postgres   
  name: Postgres Source   
  hostname: 127.0.0.1   
  port: 5432   
  username: postgres   
  password: postgres   
  tables: postgres.public.orders   
  decoding.plugin.name:  pgoutput   
  slot.name: pgtest

sink:  
  type: fluss  
  name: Fluss Sink  
  bootstrap.servers: localhost:9123  
  # Security-related properties for the Fluss client 
  properties.client.security.protocol: PLAINTEXT 

pipeline:   
  name: Postgres to Fluss Pipeline   
  parallelism: 1 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to