loserwang1024 commented on code in PR #4113:
URL: https://github.com/apache/flink-cdc/pull/4113#discussion_r2329390546
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java:
##########
@@ -133,6 +135,17 @@ public void notifyCheckpointComplete(long checkpointId)
throws Exception {
}
}
+ @Override
+ protected void onSplitFinished(Map finishedSplitIds) {
+ super.onSplitFinished(finishedSplitIds);
+
+ if (this.sourceConfig.getStartupOptions().isSnapshotOnly()) {
+ PostgresDialect dialect = (PostgresDialect) this.dialect;
+ boolean removed = dialect.removeSlot(dialect.getSlotName());
+ LOG.info("Remove slot '{}' result is {}.", dialect.getSlotName(),
removed);
+ }
+ }
+
Review Comment:
```suggestion
@Override
protected void onSplitFinished(Map<String, SourceSplitState>
finishedSplitIds) {
super.onSplitFinished(finishedSplitIds);
for (SourceSplitState splitState : finishedSplitIds.values()) {
SourceSplitBase sourceSplit = splitState.toSourceSplit();
if (sourceSplit.isStreamSplit() ) {
StreamSplit streamSplit = sourceSplit.asStreamSplit();
if(streamSplit.getStartingOffset().isAtOrAfter(streamSplit.getEndingOffset())){
PostgresDialect dialect = (PostgresDialect) this.dialect;
boolean removed =
dialect.removeSlot(dialect.getSlotName());
LOG.info("Remove slot '{}' result is {}.",
dialect.getSlotName(), removed);
}
}
}
}
```
It seems that even a snapshot split is finished, the global slot name will
be removed.
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java:
##########
@@ -193,16 +191,16 @@ private void maybeCreateSlotForBackFillReadTask(
/** Drop slot for backfill task and close replication connection. */
private void maybeDropSlotForBackFillReadTask(
- PostgresReplicationConnection replicationConnection, boolean
skipSnapshotBackfill) {
+ PostgresConnection connection, String slotName, boolean
skipSnapshotBackfill) {
// if skip backfill, no need to create slot here
if (skipSnapshotBackfill) {
return;
}
try {
- replicationConnection.close(true);
+ connection.dropReplicationSlot(slotName);
Review Comment:
Why needs change here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]