Tao Wang created FLINK-38965:
--------------------------------

             Summary: Postgres cdc source encounted  
"java.lang.IllegalStateException: Duplicate key Optional"
                 Key: FLINK-38965
                 URL: https://issues.apache.org/jira/browse/FLINK-38965
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.5.0
            Reporter: Tao Wang
         Attachments: image-2026-01-23-18-17-50-692.png

connector: Postgress

description:

1、we got two tables with similar name.  

ndi_pg_user_sink_1

ndi_pg_userbsink_1

2、they have same schema.

3、pg as cdc source, we encounted the exception
{code:java}

java.lang.RuntimeException: SplitFetcher thread 1 received unexpected exception 
while polling the records       at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
 ~[flink-connector-base-ne-flink-1.14.0-1.0.13.jar:ne-flink-1.14.0-1.0.13]  at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
 ~[flink-connector-base-ne-flink-1.14.0-1.0.13.jar:ne-flink-1.14.0-1.0.13]      
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_421]       at 
java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) 
~[?:1.8.0_421]    at java.util.concurrent.FutureTask.run(FutureTask.java) 
~[?:1.8.0_421]  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_421]       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_421]       at java.lang.Thread.run(Thread.java:750) 
~[?:1.8.0_421]Caused by: org.apache.kafka.connect.errors.ConnectException: An 
exception occurred in the change event producer. This connector will be 
stopped. at 
io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) 
~[debezium-core-1.9.7.Final.jar:1.9.7.Final]    at 
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:214)
 ~[flink-connector-postgres-cdc-.jar:1.9.7.Final]    at 
com.ververica.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask.execute(PostgresStreamFetchTask.java:211)
 ~[flink-connector-postgres-cdc-.jar:]       at 
com.ververica.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask.execute(PostgresStreamFetchTask.java:95)
 ~[flink-connector-postgres-cdc-.jar:]    at 
com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
 ~[flink-cdc-base-.jar:]  ... 6 moreCaused by: java.lang.IllegalStateException: 
Duplicate key Optional.empty      at 
java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133) 
~[?:1.8.0_421]      at java.util.HashMap.merge(HashMap.java:1255) 
~[?:1.8.0_421]    at 
java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) 
~[?:1.8.0_421]     at 
java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) 
~[?:1.8.0_421]   at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) 
~[?:1.8.0_421]     at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) 
~[?:1.8.0_421]     at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) 
~[?:1.8.0_421]        at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
~[?:1.8.0_421] at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
~[?:1.8.0_421]  at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) 
~[?:1.8.0_421]    at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
~[?:1.8.0_421] at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) 
~[?:1.8.0_421]        at 
io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.handleRelationMessage(PgOutputMessageDecoder.java:319)
 ~[flink-connector-postgres-cdc-.jar:]     at 
io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:178)
 ~[flink-connector-postgres-cdc-.jar:]    at 
io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:33)
 ~[debezium-connector-postgres-1.9.7.Final.jar:]    at 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:604)
 ~[flink-connector-postgres-cdc-.jar:]        at 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:594)
 ~[flink-connector-postgres-cdc-.jar:]        at 
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:257)
 ~[flink-connector-postgres-cdc-.jar:1.9.7.Final]    at 
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:212)
 ~[flink-connector-postgres-cdc-.jar:1.9.7.Final]    at 
com.ververica.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask.execute(PostgresStreamFetchTask.java:211)
 ~[flink-connector-postgres-cdc-.jar:]       at 
com.ververica.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask.execute(PostgresStreamFetchTask.java:95)
 ~[flink-connector-postgres-cdc-.jar:]    at 
com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
 ~[flink-cdc-base-.jar:]  ... 6 more {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to