This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 8deddab6ebdca7ad1add74071994c2ce763065f0 Author: Leonard Xu <@users.noreply.github.com> AuthorDate: Tue Apr 23 19:12:21 2024 +0800 [minor][cdc-connector][db2] Fix typos and improve the code style --- .../base/relational/handler/SchemaChangeEventHandler.java | 1 + .../apache/flink/cdc/connectors/db2/source/Db2SourceBuilder.java | 2 ++ .../connectors/db2/source/handler/Db2SchemaChangeEventHandler.java | 2 +- .../flink/cdc/connectors/db2/source/utils/Db2ConnectionUtils.java | 1 + .../apache/flink/cdc/connectors/db2/source/utils/Db2TypeUtils.java | 2 +- .../org/apache/flink/cdc/connectors/db2/table/StartupOptions.java | 1 + .../org/apache/flink/cdc/connectors/db2/source/Db2SourceITCase.java | 6 ++---- .../java/org/apache/flink/cdc/connectors/tests/Db2E2eITCase.java | 2 ++ 8 files changed, 11 insertions(+), 6 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/handler/SchemaChangeEventHandler.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/handler/SchemaChangeEventHandler.java index bdc9f39ff..ede51c8c3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/handler/SchemaChangeEventHandler.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/handler/SchemaChangeEventHandler.java @@ -26,5 +26,6 @@ import java.util.Map; /** This handler helps to parse the source struct in SchemaChangeEvent and generate source info. */ @Experimental public interface SchemaChangeEventHandler { + Map<String, Object> parseSource(SchemaChangeEvent event); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceBuilder.java index d75647089..884aaabdb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceBuilder.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.db2.source; +import org.apache.flink.cdc.common.annotation.PublicEvolving; import org.apache.flink.cdc.connectors.base.options.StartupOptions; import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; import org.apache.flink.cdc.connectors.db2.source.config.Db2SourceConfigFactory; @@ -36,6 +37,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * <p>Check the Java docs of each individual method to learn more about the settings to build a * {@link Db2IncrementalSource}. */ +@PublicEvolving public class Db2SourceBuilder<T> { private final Db2SourceConfigFactory configFactory = new Db2SourceConfigFactory(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/handler/Db2SchemaChangeEventHandler.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/handler/Db2SchemaChangeEventHandler.java index c2608c5fb..968aac567 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/handler/Db2SchemaChangeEventHandler.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/handler/Db2SchemaChangeEventHandler.java @@ -29,7 +29,7 @@ import static io.debezium.connector.db2.SourceInfo.CHANGE_LSN_KEY; import static io.debezium.connector.db2.SourceInfo.COMMIT_LSN_KEY; /** - * This SqlServerSchemaChangeEventHandler helps to parse the source struct in SchemaChangeEvent and + * This Db2SchemaChangeEventHandler helps to parse the source struct in SchemaChangeEvent and * generate source info. */ public class Db2SchemaChangeEventHandler implements SchemaChangeEventHandler { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2ConnectionUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2ConnectionUtils.java index d6bb935ac..58bffa63c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2ConnectionUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2ConnectionUtils.java @@ -35,6 +35,7 @@ import java.util.Set; /** Utils for Db2 connection. */ public class Db2ConnectionUtils { + private static final Logger LOG = LoggerFactory.getLogger(Db2ConnectionUtils.class); public static Db2Connection createDb2Connection( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2TypeUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2TypeUtils.java index 9ffda19a2..2c0f6a104 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2TypeUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/utils/Db2TypeUtils.java @@ -24,7 +24,7 @@ import io.debezium.relational.Column; import java.sql.Types; -/** Utilities for converting from Db2 types to Flink types. */ +/** Utilities for converting from Db2 types to Flink SQL types. */ public class Db2TypeUtils { /** Returns a corresponding Flink data type from a debezium {@link Column}. */ diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/StartupOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/StartupOptions.java index 21a8ec649..bac47c6b1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/StartupOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/StartupOptions.java @@ -23,6 +23,7 @@ import java.util.Objects; /** Debezium startup options. */ public final class StartupOptions { + public final StartupMode startupMode; /** diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceITCase.java index f28254e9c..c3221aed8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.db2.source; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.cdc.connectors.db2.Db2TestBase; +import org.apache.flink.cdc.connectors.db2.source.Db2SourceBuilder.Db2IncrementalSource; import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.RpcServiceSharing; @@ -43,10 +44,7 @@ import java.util.List; import static java.lang.String.format; import static org.testcontainers.containers.Db2Container.DB2_PORT; -/** - * IT tests for {@link - * org.apache.flink.cdc.connectors.db2.source.Db2SourceBuilder.Db2IncrementalSource}. - */ +/** IT tests for {@link Db2IncrementalSource}. */ public class Db2SourceITCase extends Db2TestBase { @Rule public final Timeout timeoutPerTest = Timeout.seconds(300); diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/Db2E2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/Db2E2eITCase.java index c958d60e5..29ecd1e59 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/Db2E2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/Db2E2eITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.test.utils.TestUtils; import org.apache.flink.cdc.connectors.db2.Db2TestBase; import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -33,6 +34,7 @@ import org.testcontainers.containers.Db2Container; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.images.builder.ImageFromDockerfile; import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.awaitility.core.ConditionTimeoutException; import org.testcontainers.utility.DockerImageName; import java.net.URISyntaxException;