loserwang1024 commented on PR #3950:
URL: https://github.com/apache/flink-cdc/pull/3950#issuecomment-2826726826
This test have too much sleep logical which is unstable, thus I adjust this
test. Please rebase and add it.
```java
@Test
public void testStartupFromCommittedOffset() throws Exception {
setup(true);
initializePostgresTable(POSTGRES_CONTAINER, "inventory");
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO inventory.products VALUES
(default,'first','first description',0.1);");
statement.execute(
"INSERT INTO inventory.products VALUES
(default,'second','second description',0.2);");
}
// newly create slot's confirmed lsn is latest. We will test whether
committed mode starts from here.
String slotName = getSlotName();
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
// TODO: Remove it after adding publication to an existing
replication slot.
statement.execute("CREATE PUBLICATION dbz_publication FOR TABLE
inventory.products");
statement.execute(String.format("select
pg_create_logical_replication_slot('%s','pgoutput');", slotName));
}
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO inventory.products VALUES
(default,'thirth','thirth description',0.1);");
statement.execute(
"INSERT INTO inventory.products VALUES
(default,'forth','forth description',0.2);");
}
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " id INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3),"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'postgres-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' =
'true',"
+ " 'decoding.plugin.name' = 'pgoutput', "
+ " 'slot.name' = '%s',"
+ " 'scan.lsn-commit.checkpoints-num-delay'
= '0',"
+ " 'scan.startup.mode' = 'committed-offset'"
+ ")",
POSTGRES_CONTAINER.getHost(),
POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword(),
POSTGRES_CONTAINER.getDatabaseName(),
"inventory",
"products",
slotName);
String sinkDDL =
"CREATE TABLE sink "
+ " WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ") LIKE debezium_source (EXCLUDING OPTIONS)";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
TableResult result =
tEnv.executeSql("INSERT INTO sink SELECT * FROM
debezium_source");
waitForSinkSize("sink", 2);
String[] expected =
new String[] {
"112,thirth,thirth description,0.100",
"113,forth,forth description,0.200"
};
List<String> actual =
TestValuesTableFactory.getResultsAsStrings("sink");
Assertions.assertThat(actual).containsExactlyInAnyOrder(expected);
result.getJobClient().get().cancel().get();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]