This is an automated email from the ASF dual-hosted git repository. yux pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit a40b870f318d85a6df97eb766fdb4ee5dbc2fff3 Author: yuxiqian <[email protected]> AuthorDate: Mon Jan 5 19:44:14 2026 +0800 [FLINK-38838] Update spotless style version to AOSP 1.24.0 --- .../flink/cdc/common/data/binary/BinaryFormat.java | 2 ++ .../flink/cdc/common/text/ParsingException.java | 4 ++- .../apache/flink/cdc/common/text/TokenStream.java | 1 + .../flink/cdc/common/utils/Preconditions.java | 1 + .../flink/cdc/common/utils/TypeCheckUtils.java | 2 +- .../config/ElasticsearchSinkOptions.java | 32 ++++++++++++----- .../v2/Elasticsearch8AsyncWriter.java | 2 ++ .../coordinator/SessionManageOperator.java | 1 + .../utils/SessionCommitCoordinateHelper.java | 1 + .../catalog/OceanBaseCatalogException.java | 8 +++-- .../paimon/sink/SchemaChangeProvider.java | 1 + .../v2/bucket/BucketWrapperEventSerializer.java | 1 + .../paimon/sink/PaimonMetadataApplierTest.java | 42 ++++++++++++++-------- .../paimon/sink/v2/PaimonSinkITCase.java | 35 ++++++++++++------ .../source/meta/events/StreamSplitMetaEvent.java | 1 + .../base/source/meta/split/SnapshotSplit.java | 1 + .../base/source/metrics/SourceReaderMetrics.java | 1 + .../io/debezium/connector/db2/Db2Connection.java | 4 ++- .../cdc/connectors/db2/table/Db2TableSource.java | 1 + .../flink/cdc/debezium/DebeziumSourceFunction.java | 1 + .../connectors/mongodb/source/utils/BsonUtils.java | 2 +- .../mysql/source/events/BinlogSplitMetaEvent.java | 1 + .../mysql/source/split/MySqlSnapshotSplit.java | 1 + .../connectors/mysql/testutils/UniqueDatabase.java | 4 ++- .../OracleDeserializationConverterFactory.java | 4 +-- .../testcontainers/containers/OracleContainer.java | 4 ++- .../connector/postgresql/connection/Lsn.java | 8 +++-- .../postgresql/connection/PostgresConnection.java | 4 +-- .../operators/sink/DataSinkFunctionOperator.java | 1 + .../serializer/data/MapDataSerializerTest.java | 12 +++++-- pom.xml | 2 +- 31 files changed, 135 insertions(+), 50 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryFormat.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryFormat.java index ff52f4a2c..8bed6c4a5 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryFormat.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryFormat.java @@ -35,6 +35,7 @@ public interface BinaryFormat { * to the data, and 4-bytes length of data. Data is stored in variable-length part. */ int MAX_FIX_PART_DATA_SIZE = 7; + /** * To get the mark in highest bit of long. Form: 10000000 00000000 ... (8 bytes) * @@ -42,6 +43,7 @@ public interface BinaryFormat { * part. see {@link #MAX_FIX_PART_DATA_SIZE} for more information. */ long HIGHEST_FIRST_BIT = 0x80L << 56; + /** * To get the 7 bits length in second bit to eighth bit out of a long. Form: 01111111 00000000 * ... (8 bytes) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/text/ParsingException.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/text/ParsingException.java index ac38f6db2..ecf079d44 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/text/ParsingException.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/text/ParsingException.java @@ -24,7 +24,9 @@ public class ParsingException extends RuntimeException { private final Position position; - /** @param position the position of the error; never null */ + /** + * @param position the position of the error; never null + */ public ParsingException(Position position) { super(); this.position = position; diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/text/TokenStream.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/text/TokenStream.java index 0fa138e94..d1baf60ad 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/text/TokenStream.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/text/TokenStream.java @@ -76,6 +76,7 @@ public class TokenStream { private final boolean caseSensitive; private final Tokenizer tokenizer; private List<Token> tokens; + /** * This class navigates the Token objects using this iterator. However, because it very often * needs to access the "current token" in the "consume(...)" and "canConsume(...)" and diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/Preconditions.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/Preconditions.java index 4ab278486..c91c8dbb3 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/Preconditions.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/Preconditions.java @@ -129,6 +129,7 @@ public final class Preconditions { throw new IllegalArgumentException(format(errorMessageTemplate, errorMessageArgs)); } } + // ------------------------------------------------------------------------ // Boolean Condition Checking (State) // ------------------------------------------------------------------------ diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/TypeCheckUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/TypeCheckUtils.java index 27e2a23e3..68cb2d298 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/TypeCheckUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/TypeCheckUtils.java @@ -103,7 +103,7 @@ public class TypeCheckUtils { switch (type.getTypeRoot()) { case CHAR: case VARCHAR: // The internal representation of String is BinaryString which is - // mutable + // mutable case ARRAY: case MAP: case ROW: diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java index 4ea58ab35..cbb44176b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/config/ElasticsearchSinkOptions.java @@ -99,42 +99,58 @@ public class ElasticsearchSinkOptions implements Serializable { this.shardingSeparator = shardingSeparator; } - /** @return the maximum batch size */ + /** + * @return the maximum batch size + */ public int getMaxBatchSize() { return maxBatchSize; } - /** @return the maximum number of in-flight requests */ + /** + * @return the maximum number of in-flight requests + */ public int getMaxInFlightRequests() { return maxInFlightRequests; } - /** @return the maximum number of buffered requests */ + /** + * @return the maximum number of buffered requests + */ public int getMaxBufferedRequests() { return maxBufferedRequests; } - /** @return the maximum batch size in bytes */ + /** + * @return the maximum batch size in bytes + */ public long getMaxBatchSizeInBytes() { return maxBatchSizeInBytes; } - /** @return the maximum time in buffer in milliseconds */ + /** + * @return the maximum time in buffer in milliseconds + */ public long getMaxTimeInBufferMS() { return maxTimeInBufferMS; } - /** @return the maximum record size in bytes */ + /** + * @return the maximum record size in bytes + */ public long getMaxRecordSizeInBytes() { return maxRecordSizeInBytes; } - /** @return the network configuration */ + /** + * @return the network configuration + */ public NetworkConfig getNetworkConfig() { return networkConfig; } - /** @return the list of Elasticsearch hosts */ + /** + * @return the list of Elasticsearch hosts + */ public List<HttpHost> getHosts() { return networkConfig.getHosts(); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java index 50a606c87..c5fe2f656 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java @@ -61,11 +61,13 @@ public class Elasticsearch8AsyncWriter<InputT> extends AsyncSinkWriter<InputT, O private boolean close = false; private final Counter numRecordsOutErrorsCounter; + /** * A counter to track number of records that are returned by Elasticsearch as failed and then * retried by this writer. */ private final Counter numRecordsSendPartialFailureCounter; + /** A counter to track the number of bulk requests that are sent to Elasticsearch. */ private final Counter numRequestSubmittedCounter; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageOperator.java index d8ef141b7..ed9d3ecf2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageOperator.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageOperator.java @@ -94,6 +94,7 @@ public class SessionManageOperator extends AbstractStreamOperator<Event> private transient Future<CoordinationResponse> snapshotFlushSuccess; private transient int indexOfThisSubtask; + /** * trigger endOfInput is ahead of prepareSnapshotPreBarrier, so we need this flag to handle when * endOfInput, send WaitForSuccessRequest in advance. diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SessionCommitCoordinateHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SessionCommitCoordinateHelper.java index e6b1cb42b..ee2de0cc7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SessionCommitCoordinateHelper.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SessionCommitCoordinateHelper.java @@ -69,6 +69,7 @@ public class SessionCommitCoordinateHelper { private static final Logger LOG = LoggerFactory.getLogger(SessionCommitCoordinateHelper.class); private final Queue<String>[] toCommitSessionIds; private final Map<String, CompletableFuture<CoordinationResponse>> toCommitFutures; + /** * If any string is {@link Constant#END_OF_SESSION}, it should be considered larger than any * other non-{@link Constant#END_OF_SESSION} string. diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseCatalogException.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseCatalogException.java index 6d87a59f6..8ab83dcde 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseCatalogException.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseCatalogException.java @@ -19,12 +19,16 @@ package org.apache.flink.cdc.connectors.oceanbase.catalog; /** A catalog-related, runtime exception. */ public class OceanBaseCatalogException extends RuntimeException { - /** @param message the detail message. */ + /** + * @param message the detail message. + */ public OceanBaseCatalogException(String message) { super(message); } - /** @param cause the cause. */ + /** + * @param cause the cause. + */ public OceanBaseCatalogException(Throwable cause) { super(cause); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java index 9e486693b..f6e0df907 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java @@ -41,6 +41,7 @@ public class SchemaChangeProvider { public static final String DEFAULT_DATETIME = "1970-01-01 00:00:00"; public static final String INVALID_OR_MISSING_DATATIME = "0000-00-00 00:00:00"; + /** * Creates a SchemaChange object for adding a column without specifying its position. * diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java index 827c23d88..267500988 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java @@ -47,6 +47,7 @@ public class BucketWrapperEventSerializer extends TypeSerializerSingleton<Event> new ListSerializer<>(TableIdSerializer.INSTANCE); private final EnumSerializer<SchemaChangeEventType> schemaChangeEventTypeEnumSerializer = new EnumSerializer<>(SchemaChangeEventType.class); + /** Sharable instance of the TableIdSerializer. */ public static final BucketWrapperEventSerializer INSTANCE = new BucketWrapperEventSerializer(); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java index 1dd55673b..0ddcacd8d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java @@ -96,8 +96,10 @@ class PaimonMetadataApplierTest { @ParameterizedTest @ValueSource(strings = {"filesystem", "hive"}) void testApplySchemaChange(String metastore) - throws Catalog.TableNotExistException, Catalog.DatabaseNotEmptyException, - Catalog.DatabaseNotExistException, SchemaEvolveException { + throws Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { initialize(metastore); MetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); CreateTableEvent createTableEvent = @@ -246,8 +248,10 @@ class PaimonMetadataApplierTest { @ParameterizedTest @ValueSource(strings = {"filesystem", "hive"}) public void testCreateTableWithoutPrimaryKey(String metastore) - throws Catalog.TableNotExistException, Catalog.DatabaseNotEmptyException, - Catalog.DatabaseNotExistException, SchemaEvolveException { + throws Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { initialize(metastore); Map<String, String> tableOptions = new HashMap<>(); tableOptions.put("bucket", "-1"); @@ -289,8 +293,10 @@ class PaimonMetadataApplierTest { @ParameterizedTest @ValueSource(strings = {"filesystem", "hive"}) void testCreateTableWithOptions(String metastore) - throws Catalog.TableNotExistException, Catalog.DatabaseNotEmptyException, - Catalog.DatabaseNotExistException, SchemaEvolveException { + throws Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { initialize(metastore); Map<String, String> tableOptions = new HashMap<>(); tableOptions.put("bucket", "-1"); @@ -335,8 +341,10 @@ class PaimonMetadataApplierTest { @ParameterizedTest @ValueSource(strings = {"filesystem", "hive"}) void testCreateTableWithAllDataTypes(String metastore) - throws Catalog.TableNotExistException, Catalog.DatabaseNotEmptyException, - Catalog.DatabaseNotExistException, SchemaEvolveException { + throws Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { initialize(metastore); MetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); CreateTableEvent createTableEvent = @@ -445,8 +453,10 @@ class PaimonMetadataApplierTest { @ParameterizedTest @ValueSource(strings = {"filesystem", "hive"}) void testAddColumnWithPosition(String metastore) - throws Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException, - Catalog.TableNotExistException, SchemaEvolveException { + throws Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + Catalog.TableNotExistException, + SchemaEvolveException { initialize(metastore); MetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); @@ -536,8 +546,10 @@ class PaimonMetadataApplierTest { @ParameterizedTest @ValueSource(strings = {"filesystem", "hive"}) public void testCreateTableWithComment(String metastore) - throws Catalog.TableNotExistException, Catalog.DatabaseNotEmptyException, - Catalog.DatabaseNotExistException, SchemaEvolveException { + throws Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { initialize(metastore); Map<String, String> tableOptions = new HashMap<>(); tableOptions.put("bucket", "-1"); @@ -585,8 +597,10 @@ class PaimonMetadataApplierTest { @Test public void testMysqlDefaultTimestampValueConversionInAddColumn() - throws SchemaEvolveException, Catalog.TableNotExistException, - Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException { + throws SchemaEvolveException, + Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException { initialize("filesystem"); Map<String, String> tableOptions = new HashMap<>(); tableOptions.put("bucket", "-1"); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index aa447ae04..80b21674a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -268,8 +268,11 @@ public class PaimonSinkITCase { @ParameterizedTest @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"}) public void testSinkWithDataChange(String metastore, boolean enableDeleteVector) - throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, - Catalog.DatabaseNotExistException, SchemaEvolveException { + throws IOException, + InterruptedException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { initialize(metastore); PaimonSink<Event> paimonSink = new PaimonSink<>( @@ -326,8 +329,11 @@ public class PaimonSinkITCase { @ParameterizedTest @CsvSource({"filesystem, true", "hive, true", "filesystem, false", "hive, false"}) public void testSinkWithDataChangeForAppendOnlyTable(String metastore, boolean enabledBucketKey) - throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, - Catalog.DatabaseNotExistException, SchemaEvolveException { + throws IOException, + InterruptedException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { initialize(metastore); PaimonSink<Event> paimonSink = new PaimonSink<>( @@ -388,8 +394,11 @@ public class PaimonSinkITCase { @ParameterizedTest @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"}) public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVector) - throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, - Catalog.DatabaseNotExistException, SchemaEvolveException { + throws IOException, + InterruptedException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { initialize(metastore); PaimonSink<Event> paimonSink = new PaimonSink<>( @@ -711,8 +720,11 @@ public class PaimonSinkITCase { @ParameterizedTest @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"}) public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector) - throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, - Catalog.DatabaseNotExistException, SchemaEvolveException { + throws IOException, + InterruptedException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { initialize(metastore); PaimonSink<Event> paimonSink = new PaimonSink<>( @@ -839,8 +851,11 @@ public class PaimonSinkITCase { @ParameterizedTest @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"}) public void testDuplicateCommitAfterRestore(String metastore, boolean enableDeleteVector) - throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, - Catalog.DatabaseNotExistException, SchemaEvolveException { + throws IOException, + InterruptedException, + Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, + SchemaEvolveException { initialize(metastore); PaimonSink<Event> paimonSink = new PaimonSink<>( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java index 8f9f580e9..ece8511d7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/events/StreamSplitMetaEvent.java @@ -38,6 +38,7 @@ public class StreamSplitMetaEvent implements SourceEvent { /** The metadata of stream split is divided to multiple groups. */ private final int metaGroupId; + /** * The serialized metadata of stream split, it's serialized/deserialized by {@link * FinishedSnapshotSplitInfo#serialize()} and {@link diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SnapshotSplit.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SnapshotSplit.java index 714213985..7e8b5e2b5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SnapshotSplit.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SnapshotSplit.java @@ -40,6 +40,7 @@ public class SnapshotSplit extends SourceSplitBase { @Nullable private final Object[] splitStart; @Nullable private final Object[] splitEnd; + /** The high watermark is not null when the split read finished. */ @Nullable private final Offset highWatermark; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java index c597e6a0a..6f0ec3978 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java @@ -78,6 +78,7 @@ public class SourceReaderMetrics { /** The total number of record that failed to consume, process or emit. */ private final Counter numRecordsInErrorsCounter; + /** The timestamp of the last record received. */ private volatile long lastReceivedEventTime = UNDEFINED; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/io/debezium/connector/db2/Db2Connection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/io/debezium/connector/db2/Db2Connection.java index 8fe77ede9..63da894ec 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/io/debezium/connector/db2/Db2Connection.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/io/debezium/connector/db2/Db2Connection.java @@ -134,7 +134,9 @@ public class Db2Connection extends JdbcConnection { realDatabaseName = retrieveRealDatabaseName(); } - /** @return the current largest log sequence number */ + /** + * @return the current largest log sequence number + */ public Lsn getMaxLsn() throws SQLException { return queryAndMap( GET_MAX_LSN, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java index eebe109e0..12c8d27e0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java @@ -53,6 +53,7 @@ import java.util.stream.Stream; public class Db2TableSource implements ScanTableSource, SupportsReadingMetadata { private final ResolvedSchema physicalSchema; + /** Data type that describes the final output of the source. */ protected DataType producedDataType; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java index bdc985b71..a78f81129 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java @@ -183,6 +183,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> private transient ExecutorService executor; private transient DebeziumEngine<?> engine; + /** * Unique name of this Debezium Engine instance across all the jobs. Currently we randomly * generate a UUID for it. This is used for {@link FlinkDatabaseHistory}. diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/BsonUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/BsonUtils.java index 2586d79bc..a0eabf554 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/BsonUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/BsonUtils.java @@ -284,7 +284,7 @@ public class BsonUtils { } switch (bsonValue.getBsonType()) { - // MinKey < Undefined == [] < Null, Missing Key + // MinKey < Undefined == [] < Null, Missing Key case MIN_KEY: return 1; case UNDEFINED: diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java index 2c8fe0b1e..bdd553a73 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/events/BinlogSplitMetaEvent.java @@ -38,6 +38,7 @@ public class BinlogSplitMetaEvent implements SourceEvent { /** The metadata of binlog split is divided to multiple groups. */ private final int metaGroupId; + /** * The serialized metadata of binlog split, it's serialized/deserialize by {@link * FinishedSnapshotSplitInfo#serialize(FinishedSnapshotSplitInfo)} and {@link diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSnapshotSplit.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSnapshotSplit.java index c32a306e3..bbda1ad4a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSnapshotSplit.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSnapshotSplit.java @@ -40,6 +40,7 @@ public class MySqlSnapshotSplit extends MySqlSplit { @Nullable private final Object[] splitStart; @Nullable private final Object[] splitEnd; + /** The high watermark is not null when the split read finished. */ @Nullable private final BinlogOffset highWatermark; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/UniqueDatabase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/UniqueDatabase.java index a749bca53..4a4b631f3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/UniqueDatabase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/UniqueDatabase.java @@ -99,7 +99,9 @@ public class UniqueDatabase { return password; } - /** @return Fully qualified table name <code><databaseName>.<tableName></code> */ + /** + * @return Fully qualified table name <code><databaseName>.<tableName></code> + */ public String qualifiedTableName(final String tableName) { return String.format("%s.%s", databaseName, tableName); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleDeserializationConverterFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleDeserializationConverterFactory.java index a5374316b..5c19a6762 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleDeserializationConverterFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleDeserializationConverterFactory.java @@ -66,8 +66,8 @@ public class OracleDeserializationConverterFactory { return createFloatConverter(); case DOUBLE: return createDoubleConverter(); - // Debezium use io.debezium.time.ZonedTimestamp to map Oracle TIMESTAMP WITH LOCAL - // TIME ZONE type, the value is a string representation of a timestamp in UTC. + // Debezium use io.debezium.time.ZonedTimestamp to map Oracle TIMESTAMP WITH LOCAL + // TIME ZONE type, the value is a string representation of a timestamp in UTC. case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return convertToLocalTimeZoneTimestamp(); default: diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/testcontainers/containers/OracleContainer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/testcontainers/containers/OracleContainer.java index 8df09b6f7..8d3eb7859 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/testcontainers/containers/OracleContainer.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/testcontainers/containers/OracleContainer.java @@ -76,7 +76,9 @@ public class OracleContainer extends JdbcDatabaseContainer<OracleContainer> { private boolean usingSid = false; - /** @deprecated use @link OracleContainer(DockerImageName) instead */ + /** + * @deprecated use @link OracleContainer(DockerImageName) instead + */ @Deprecated public OracleContainer() { this(DEFAULT_IMAGE_NAME.withTag(DEFAULT_TAG)); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java index 17119e4f6..cc08c5196 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java @@ -91,12 +91,16 @@ public class Lsn implements Comparable<Lsn> { return Lsn.valueOf(value); } - /** @return Long represent position in the write-ahead log stream */ + /** + * @return Long represent position in the write-ahead log stream + */ public long asLong() { return value; } - /** @return PostgreSQL JDBC driver representation of position in the write-ahead log stream */ + /** + * @return PostgreSQL JDBC driver representation of position in the write-ahead log stream + */ public LogSequenceNumber asLogSequenceNumber() { return LogSequenceNumber.valueOf(value); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java index 69ae4c454..687dcc9ae 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java @@ -806,8 +806,8 @@ public class PostgresConnection extends JdbcConnection { ? value.get() : new SpecialValueDecimal(rs.getBigDecimal(columnIndex)); case PgOid.TIME: - // To handle time 24:00:00 supported by TIME columns, read the column as a - // string. + // To handle time 24:00:00 supported by TIME columns, read the column as a + // string. case PgOid.TIMETZ: // In order to guarantee that we resolve TIMETZ columns with proper microsecond // precision, diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java index 9992c8942..df0e7802b 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java @@ -54,6 +54,7 @@ public class DataSinkFunctionOperator extends StreamSink<Event> { private SchemaEvolutionClient schemaEvolutionClient; private final OperatorID schemaOperatorID; + /** A set of {@link TableId} that already processed {@link CreateTableEvent}. */ private final Set<TableId> processedTableIds; diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/MapDataSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/MapDataSerializerTest.java index ed6f1a847..a8d2c9c76 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/MapDataSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/MapDataSerializerTest.java @@ -64,7 +64,9 @@ public class MapDataSerializerTest extends SerializerTestBase<MapData> { (MapData) o2, INT, STRING)))); } - /** @return MapDataSerializer */ + /** + * @return MapDataSerializer + */ @Override protected TypeSerializer<MapData> createSerializer() { return new MapDataSerializer(INT, STRING); @@ -81,13 +83,17 @@ public class MapDataSerializerTest extends SerializerTestBase<MapData> { return -1; } - /** @return MapData clazz */ + /** + * @return MapData clazz + */ @Override protected Class<MapData> getTypeClass() { return MapData.class; } - /** @return MapData[] */ + /** + * @return MapData[] + */ @Override protected MapData[] getTestData() { Map<Object, Object> first = new HashMap<>(); diff --git a/pom.xml b/pom.xml index 8bb4824c0..61d657a91 100644 --- a/pom.xml +++ b/pom.xml @@ -473,7 +473,7 @@ limitations under the License. <configuration> <java> <googleJavaFormat> - <version>1.8</version> + <version>1.24.0</version> <style>AOSP</style> </googleJavaFormat>
