loserwang1024 commented on code in PR #3950:
URL: https://github.com/apache/flink-cdc/pull/3950#discussion_r2052058018
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java:
##########
@@ -261,6 +261,110 @@ void testStartupFromLatestOffset(boolean
parallelismSnapshot) throws Exception {
result.getJobClient().get().cancel().get();
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true})
+ public void testStartupFromCommittedOffset(boolean parallelismSnapshot)
throws Exception {
+ setup(parallelismSnapshot);
+ initializePostgresTable(POSTGRES_CONTAINER, "inventory");
+
Review Comment:
How this test show commit offset? Maybe we can :
1. Create a slot in advince.
2. Insert 2 data into database.
3. commit use PostgresStreamFetchTask#commitCurrentOffset as latest offset
4. Insert another 2 data into database.
Then start the flink job with existed slot in commit-offset mode.
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnectionUtils.java:
##########
@@ -0,0 +1,55 @@
+package io.debezium.connector.postgresql.connection;
+
+import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import io.debezium.connector.postgresql.SourceInfo;
+import io.debezium.connector.postgresql.Utils;
+import io.debezium.time.Conversions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PostgresConnectionUtils {
Review Comment:
@phamvinh1712 Move it to io.debezium.connector.postgresql.Utils? It already
has io.debezium.connector.postgresql.Utils#currentOffset, some logical is same
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]