Yang Guo created FLINK-38647:
--------------------------------
Summary: NullPointerException in Postgres CDC pipeline when update
data in table
Key: FLINK-38647
URL: https://issues.apache.org/jira/browse/FLINK-38647
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: cdc-3.5.0
Reporter: Yang Guo
I am building a CDC pipeline to capture data change from Postgres table to
Fluss. The CDC pipeline will fail when I update records in Postgres table. Here
is the error:
```
java.lang.NullPointerException
at
org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.lambda$inferStruct$1(DebeziumSchemaDataTypeInference.java:209)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
at
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
at
org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.inferStruct(DebeziumSchemaDataTypeInference.java:210)
at
org.apache.flink.cdc.connectors.postgres.source.PostgresSchemaDataTypeInference.inferStruct(PostgresSchemaDataTypeInference.java:44)
at
org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:83)
at
org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:58)
at
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.extractDataRecord(DebeziumEventDeserializationSchema.java:156)
at
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.extractBeforeDataRecord(DebeziumEventDeserializationSchema.java:146)
at
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserializeDataChangeRecord(DebeziumEventDeserializationSchema.java:126)
at
org.apache.flink.cdc.debezium.event.SourceRecordEventDeserializer.deserialize(SourceRecordEventDeserializer.java:49)
at
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:105)
at
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:160)
at
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:118)
at
org.apache.flink.cdc.connectors.postgres.source.reader.PostgresPipelineRecordEmitter.processElement(PostgresPipelineRecordEmitter.java:114)
at
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:88)
at
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:57)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
```
The Postgres table DDL is:
{code:java}
CREATE TABLE orders (
order_id INT PRIMARY KEY,
customer_name VARCHAR(100) NOT NULL,
order_date DATE NOT NULL DEFAULT CURRENT_DATE,
total_amount DECIMAL(10,2) NOT NULL
); {code}
Data Operations:
I can successfully get the data change records(+I, -D) for INSERT and DELETE in
Fluss. But when UPDATE, the CDC pipeline will fail with NPE listed above. It
looks failed `inferStruct`.
{code:java}
INSERT INTO orders (order_id, customer_name, order_date, total_amount) VALUES
(1001, 'John Smith', '2024-01-15', 299.99),
(1002, 'Emma Johnson', '2024-01-16', 150.50),
(1003, 'Michael Brown', '2024-01-17', 89.99),
(1004, 'Sarah Davis', '2024-01-18', 450.00),
(1005, 'David Wilson', '2024-01-19', 199.99);
DELETE FROM orders where order_id = '1004';
// this will make the CDC pipeline fail with NPE
UPDATE orders SET customer_name = 'James Taylor' WHERE order_id = 1005;{code}
*More Information:*
CDC Pipeline is Postgres to Fluss
Versions: Postgresql@14, Flink 1.20.1, Flink CDC 3.5.0, Fluss 0.7
CDC configuration yaml file:
{code:java}
source:
type: postgres
name: Postgres Source
hostname: 127.0.0.1
port: 5432
username: postgres
password: postgres
tables: postgres.public.orders
decoding.plugin.name: pgoutput
slot.name: pgtest
sink:
type: fluss
name: Fluss Sink
bootstrap.servers: localhost:9123
# Security-related properties for the Fluss client
properties.client.security.protocol: PLAINTEXT
pipeline:
name: Postgres to Fluss Pipeline
parallelism: 1 {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)