(flink) branch master updated: [FLINK-35246][test] Fix incorrect address construction in SqlClientSSLTest
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 4e6dbe2d1a2 [FLINK-35246][test] Fix incorrect address construction in SqlClientSSLTest 4e6dbe2d1a2 is described below commit 4e6dbe2d1a225a0d0e48fd0997c1f11317402e42 Author: Weijie Guo AuthorDate: Fri Apr 26 16:59:49 2024 +0800 [FLINK-35246][test] Fix incorrect address construction in SqlClientSSLTest --- .../java/org/apache/flink/table/client/SqlClientSSLTest.java | 9 - 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientSSLTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientSSLTest.java index ccb4832f076..627361edaf9 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientSSLTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientSSLTest.java @@ -28,7 +28,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; @@ -66,10 +65,10 @@ class SqlClientSSLTest extends SqlClientTestBase { new String[] { "gateway", "-e", -InetSocketAddress.createUnresolved( - SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(), - SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort()) -.toString() +String.format( +"%s:%d", + SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(), + SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort()) }; String actual = runSqlClient(args, String.join("\n", "SET;", "QUIT;"), false); assertThat(actual).contains(SecurityOptions.SSL_REST_ENABLED.key(), "true");
(flink) branch master updated (714d1cb2e0b -> 3ff2ba43720)
This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 714d1cb2e0b [FLINK-35189][test-utils] Introduce test-filesystem connector and catalog based on filesystem to support materialized table add 713c30f76d5 [FLINK-35026][runtime][config] Introduce async execution configurations add 3ff2ba43720 [FLINK-35026][runtime] Implement buffer timeout of AEC No new revisions were added by this update. Summary of changes: .../apache/flink/api/common/ExecutionConfig.java | 38 + .../flink/configuration/ExecutionOptions.java | 59 +++ .../asyncprocessing/AsyncExecutionController.java | 48 +++--- .../asyncprocessing/StateRequestBuffer.java| 83 +- .../AsyncExecutionControllerTest.java | 182 + .../AbstractAsyncStateStreamOperator.java | 23 ++- .../AbstractAsyncStateStreamOperatorV2.java| 18 +- .../StreamExecutionEnvironmentTest.java| 30 .../InternalTimerServiceAsyncImplTest.java | 2 +- 9 files changed, 418 insertions(+), 65 deletions(-)
(flink) 03/03: [FLINK-35189][test-utils] Introduce test-filesystem connector and catalog based on filesystem to support materialized table
This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 714d1cb2e0bd0df03393492dc87cbd800af63e1b Author: fengli AuthorDate: Wed Apr 24 18:18:47 2024 +0800 [FLINK-35189][test-utils] Introduce test-filesystem connector and catalog based on filesystem to support materialized table --- .../flink-table-filesystem-test-utils/pom.xml | 103 +++ .../flink/connector/file/src/TestFileSource.java | 189 ++ .../file/table/TestFileSystemTableSource.java | 88 +++ .../file/testutils/TestFileSystemTableFactory.java | 54 ++ .../file/testutils/catalog/JsonSerdeUtil.java | 61 ++ .../testutils/catalog/TestFileSystemCatalog.java | 690 + .../catalog/TestFileSystemCatalogFactory.java | 78 +++ .../org.apache.flink.table.factories.Factory | 17 + .../testutils/TestFileSystemTableFactoryTest.java | 64 ++ .../catalog/TestFileSystemCatalogFactoryTest.java | 92 +++ .../catalog/TestFileSystemCatalogITCase.java | 89 +++ .../catalog/TestFileSystemCatalogTest.java | 379 +++ .../catalog/TestFileSystemCatalogTestBase.java | 56 ++ .../src/test/resources/log4j2-test.properties | 28 + flink-test-utils-parent/pom.xml| 1 + 15 files changed, 1989 insertions(+) diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/pom.xml b/flink-test-utils-parent/flink-table-filesystem-test-utils/pom.xml new file mode 100644 index 000..e1c8fdce665 --- /dev/null +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/pom.xml @@ -0,0 +1,103 @@ + + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + +4.0.0 + + +org.apache.flink +flink-test-utils-parent +1.20-SNAPSHOT + + +flink-table-filesystem-test-utils +Flink : Test utils : Table Filesystem + + + +org.apache.flink +flink-core +${project.version} +provided + + +org.apache.flink +flink-table-common +${project.version} +provided + + +org.apache.flink +flink-table-api-java-bridge +${project.version} +provided + + +org.apache.flink +flink-connector-files +${project.version} +provided + + + + +org.apache.flink +flink-table-common +${project.version} +tests +test + + +org.apache.flink +flink-table-api-java +${project.version} +tests +test + + +org.apache.flink + flink-table-planner_${scala.binary.version} +${project.version} +test + + +org.apache.flink + flink-table-planner_${scala.binary.version} +${project.version} +test +test-jar + + +org.apache.flink +flink-csv +${project.version} +test + + +org.apache.flink +flink-test-utils +${project.version} +test + + + + \ No newline at end of file diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java new file mode 100644 index 000..5d0e9ace77f --- /dev/null +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.src; + +import
(flink) branch master updated (f64654e9749 -> 714d1cb2e0b)
This is an automated email from the ASF dual-hosted git repository. ron pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from f64654e9749 add HTTP options to java-storage client new 00d69e53fe7 [FLINK-35189][table-common] CatalogPropertiesUtil support serialize and deserialize materialized table new e3cda01cac7 [FLINK-35189][connectors/filesystem] Modify the visibility of filesystem connector related methods to protected new 714d1cb2e0b [FLINK-35189][test-utils] Introduce test-filesystem connector and catalog based on filesystem to support materialized table The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../connector/file/src/AbstractFileSource.java | 11 +- .../file/table/FileSystemTableFactory.java | 4 +- .../file/table/FileSystemTableSource.java | 4 +- .../flink/table/catalog/CatalogPropertiesUtil.java | 140 - .../flink-table-filesystem-test-utils/pom.xml | 103 +++ .../flink/connector/file/src/TestFileSource.java | 189 ++ .../file/table/TestFileSystemTableSource.java | 88 +++ .../file/testutils/TestFileSystemTableFactory.java | 54 ++ .../file/testutils/catalog/JsonSerdeUtil.java | 61 ++ .../testutils/catalog/TestFileSystemCatalog.java | 690 + .../catalog/TestFileSystemCatalogFactory.java | 78 +++ .../org.apache.flink.table.factories.Factory | 17 + .../testutils/TestFileSystemTableFactoryTest.java | 64 ++ .../catalog/TestFileSystemCatalogFactoryTest.java | 92 +++ .../catalog/TestFileSystemCatalogITCase.java | 89 +++ .../catalog/TestFileSystemCatalogTest.java | 379 +++ .../catalog/TestFileSystemCatalogTestBase.java | 56 ++ .../src/test/resources/log4j2-test.properties | 0 flink-test-utils-parent/pom.xml| 1 + 19 files changed, 2111 insertions(+), 9 deletions(-) create mode 100644 flink-test-utils-parent/flink-table-filesystem-test-utils/pom.xml create mode 100644 flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java create mode 100644 flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java create mode 100644 flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java create mode 100644 flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/JsonSerdeUtil.java create mode 100644 flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java create mode 100644 flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java create mode 100644 flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java create mode 100644 flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactoryTest.java create mode 100644 flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogITCase.java create mode 100644 flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java create mode 100644 flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTestBase.java copy {flink-table/flink-table-api-java => flink-test-utils-parent/flink-table-filesystem-test-utils}/src/test/resources/log4j2-test.properties (100%)
(flink) 02/03: [FLINK-35189][connectors/filesystem] Modify the visibility of filesystem connector related methods to protected
This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit e3cda01cac737c97a104feae0e35da1eb91a8751 Author: fengli AuthorDate: Wed Apr 24 18:14:53 2024 +0800 [FLINK-35189][connectors/filesystem] Modify the visibility of filesystem connector related methods to protected --- .../apache/flink/connector/file/src/AbstractFileSource.java | 11 --- .../flink/connector/file/table/FileSystemTableFactory.java| 4 ++-- .../flink/connector/file/table/FileSystemTableSource.java | 4 ++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java index f4fb463e10e..b14d46b3f9c 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java @@ -147,7 +147,7 @@ public abstract class AbstractFileSource throw new FlinkRuntimeException("Could not enumerate file splits", e); } -return createSplitEnumerator(enumContext, enumerator, splits, null); +return createSplitEnumerator(getBoundedness(), enumContext, enumerator, splits, null); } @Override @@ -164,7 +164,11 @@ public abstract class AbstractFileSource (Collection) checkpoint.getSplits(); return createSplitEnumerator( -enumContext, enumerator, splits, checkpoint.getAlreadyProcessedPaths()); +getBoundedness(), +enumContext, +enumerator, +splits, +checkpoint.getAlreadyProcessedPaths()); } @Override @@ -186,6 +190,7 @@ public abstract class AbstractFileSource // private SplitEnumerator> createSplitEnumerator( +Boundedness boundedness, SplitEnumeratorContext context, FileEnumerator enumerator, Collection splits, @@ -199,7 +204,7 @@ public abstract class AbstractFileSource final FileSplitAssigner splitAssigner = assignerFactory.create(splits); -if (continuousEnumerationSettings == null) { +if (Boundedness.BOUNDED == boundedness) { // bounded case return castGeneric(new StaticFileSplitEnumerator(fileSplitContext, splitAssigner)); } else { diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java index 3f0f415b863..16593ffd3e7 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java @@ -149,7 +149,7 @@ public class FileSystemTableFactory implements DynamicTableSourceFactory, Dynami .collect(Collectors.toSet()); } -private void validate(FactoryUtil.TableFactoryHelper helper) { +protected void validate(FactoryUtil.TableFactoryHelper helper) { // Except format options, some formats like parquet and orc can not list all supported // options. helper.validateExcept(helper.getOptions().get(FactoryUtil.FORMAT) + "."); @@ -160,7 +160,7 @@ public class FileSystemTableFactory implements DynamicTableSourceFactory, Dynami .get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE)); } -private > DecodingFormat discoverDecodingFormat( +protected > DecodingFormat discoverDecodingFormat( Context context, Class formatFactoryClass) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); if (formatFactoryExists(context, formatFactoryClass)) { diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java index 1e53eb53cfb..bbaa2a310d5 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java @@ -264,7 +264,7 @@ public class FileSystemTableSource extends AbstractFileSystemTable return
(flink) 01/03: [FLINK-35189][table-common] CatalogPropertiesUtil support serialize and deserialize materialized table
This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 00d69e53fe7b263293608f0a949525441b82e09e Author: fengli AuthorDate: Wed Apr 24 18:11:03 2024 +0800 [FLINK-35189][table-common] CatalogPropertiesUtil support serialize and deserialize materialized table --- .../flink/table/catalog/CatalogPropertiesUtil.java | 140 - 1 file changed, 138 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java index 909e8f1c562..9bb6280cc7d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java @@ -28,9 +28,12 @@ import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.StringUtils; import javax.annotation.Nullable; +import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -117,6 +120,56 @@ public final class CatalogPropertiesUtil { } } +/** + * Serializes the given {@link ResolvedCatalogMaterializedTable} into a map of string + * properties. + */ +public static Map serializeCatalogMaterializedTable( +ResolvedCatalogMaterializedTable resolvedMaterializedTable) { +try { +final Map properties = new HashMap<>(); + +serializeResolvedSchema(properties, resolvedMaterializedTable.getResolvedSchema()); + +final String comment = resolvedMaterializedTable.getComment(); +if (comment != null && comment.length() > 0) { +properties.put(COMMENT, comment); +} + +final Optional snapshot = resolvedMaterializedTable.getSnapshot(); +snapshot.ifPresent(snapshotId -> properties.put(SNAPSHOT, Long.toString(snapshotId))); + +serializePartitionKeys(properties, resolvedMaterializedTable.getPartitionKeys()); + +properties.putAll(resolvedMaterializedTable.getOptions()); + +properties.put(DEFINITION_QUERY, resolvedMaterializedTable.getDefinitionQuery()); +properties.put(FRESHNESS, resolvedMaterializedTable.getFreshness().toString()); + +properties.put( +LOGICAL_REFRESH_MODE, resolvedMaterializedTable.getLogicalRefreshMode().name()); +properties.put(REFRESH_MODE, resolvedMaterializedTable.getRefreshMode().name()); +properties.put(REFRESH_STATUS, resolvedMaterializedTable.getRefreshStatus().name()); + +resolvedMaterializedTable +.getRefreshHandlerDescription() +.ifPresent( +refreshHandlerDesc -> +properties.put(REFRESH_HANDLER_DESC, refreshHandlerDesc)); +if (resolvedMaterializedTable.getSerializedRefreshHandler() != null) { +properties.put( +REFRESH_HANDLER_BYTES, +new String( + resolvedMaterializedTable.getSerializedRefreshHandler(), +StandardCharsets.UTF_8)); +} + +return properties; +} catch (Exception e) { +throw new CatalogException("Error in serializing catalog materialized table.", e); +} +} + /** Deserializes the given map of string properties into an unresolved {@link CatalogTable}. */ public static CatalogTable deserializeCatalogTable(Map properties) { return deserializeCatalogTable(properties, null); @@ -128,7 +181,7 @@ public final class CatalogPropertiesUtil { * @param properties The properties to deserialize from * @param fallbackKey The fallback key to get the schema properties. This is meant to support * the old table (1.10) deserialization - * @return + * @return a catalog table instance. */ public static CatalogTable deserializeCatalogTable( Map properties, @Nullable String fallbackKey) { @@ -158,6 +211,64 @@ public final class CatalogPropertiesUtil { } } +/** + * Deserializes the given map of string properties into an unresolved {@link + * CatalogMaterializedTable}. + */ +public static CatalogMaterializedTable deserializeCatalogMaterializedTable( +Map properties) { +try { +final Schema schema =
(flink) branch master updated: add HTTP options to java-storage client
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new f64654e9749 add HTTP options to java-storage client f64654e9749 is described below commit f64654e97490963f2956d5a632e7a0303662be6a Author: Ravi Dutt Singh AuthorDate: Wed Aug 16 19:20:57 2023 +0530 add HTTP options to java-storage client --- docs/content.zh/docs/deployment/filesystems/gcs.md | 4 ++- docs/content/docs/deployment/filesystems/gcs.md| 8 +++--- .../apache/flink/fs/gs/GSFileSystemFactory.java| 12 + .../apache/flink/fs/gs/GSFileSystemOptions.java| 31 ++ 4 files changed, 51 insertions(+), 4 deletions(-) diff --git a/docs/content.zh/docs/deployment/filesystems/gcs.md b/docs/content.zh/docs/deployment/filesystems/gcs.md index ed889c5ec60..444c3d0d5a3 100644 --- a/docs/content.zh/docs/deployment/filesystems/gcs.md +++ b/docs/content.zh/docs/deployment/filesystems/gcs.md @@ -83,7 +83,9 @@ You can also set `gcs-connector` options directly in the Hadoop `core-site.xml` |-|- [...] | gs.writer.temporary.bucket.name | Set this property to choose a bucket to hold temporary blobs for in-progress writes via `RecoverableWriter`. If this property is not set, temporary blobs will be written to same bucket as the final file being written. In either case, temporary blobs are written with the prefix `.inprogress/`. It is recommended to choose a separate bucket in order to [assign it a TTL](https://cloud.google.com/storage/docs/lifecycle), to provide a mechanism to [...] | gs.writer.chunk.size| Set this property to [set the chunk size](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.WriteChannel#com_google_cloud_WriteChannel_setChunkSize_int_) for writes via `RecoverableWriter`. If not set, a Google-determined default chunk size will be used. [...] -| gs.filesink.entropy.enabled | Set this property to improve performance due to hotspotting issues on GCS. This option defines whether to enable entropy injection in filesink gcs path. If this is enabled, entropy in the form of temporary object id will be injected in beginning of the gcs path of the temporary objects. The final object path remains unchanged. [...] +| gs.filesink.entropy.enabled | Set this property to improve performance due to hotspotting issues on GCS. This option defines whether to enable entropy injection in filesink gcs path. If this is enabled, entropy in the form of temporary object id will be injected in beginning of the gcs path of the temporary objects. The final object path remains unchanged. [...] +| gs.http.connect-timeout | Set this property to [set the connection timeout](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.http.HttpTransportOptions.Builder#com_google_cloud_http_HttpTransportOptions_Builder_setConnectTimeout_int_) for java-storage client. [...] +| gs.http.read-timeout| Set this property to [set the content read timeout](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.http.HttpTransportOptions.Builder#com_google_cloud_http_HttpTransportOptions_Builder_setReadTimeout_int_) from connection established via java-storage client. [...] ### Authentication to access GCS diff --git a/docs/content/docs/deployment/filesystems/gcs.md b/docs/content/docs/deployment/filesystems/gcs.md index 58670cb3355..8f0f69fafcb 100644 --- a/docs/content/docs/deployment/filesystems/gcs.md +++
(flink-cdc) branch master updated: [FLINK-35235][pipeline-connector][kafka] Fix missing dependencies in the uber jar of Kafka pipeline sink
This is an automated email from the ASF dual-hosted git repository. jiabaosun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git The following commit(s) were added to refs/heads/master by this push: new ec643c9dd [FLINK-35235][pipeline-connector][kafka] Fix missing dependencies in the uber jar of Kafka pipeline sink ec643c9dd is described below commit ec643c9dd7365261f3cee620d4d6bd5d042917e0 Author: Kunni AuthorDate: Fri Apr 26 16:59:48 2024 +0800 [FLINK-35235][pipeline-connector][kafka] Fix missing dependencies in the uber jar of Kafka pipeline sink --- .../flink-cdc-pipeline-connector-kafka/pom.xml | 10 ++ 1 file changed, 10 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml index 0b0c6f687..5be032a76 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml @@ -83,11 +83,21 @@ limitations under the License. false + + +org.apache.kafka:* + org.apache.flink:flink-connector-kafka + + org.apache.kafka org.apache.flink.cdc.connectors.kafka.shaded.org.apache.kafka + + org.apache.flink.connector.kafka + org.apache.flink.cdc.connectors.kafka.shaded.org.apache.flink.connector.kafka +
(flink-cdc) branch release-3.1 updated: [FLINK-35235][pipeline-connector][kafka] Fix missing dependencies in the uber jar of Kafka pipeline sink
This is an automated email from the ASF dual-hosted git repository. jiabaosun pushed a commit to branch release-3.1 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git The following commit(s) were added to refs/heads/release-3.1 by this push: new b96ea11cc [FLINK-35235][pipeline-connector][kafka] Fix missing dependencies in the uber jar of Kafka pipeline sink b96ea11cc is described below commit b96ea11cc7df6c3d57a155573f29c18bf9d787ae Author: Kunni AuthorDate: Fri Apr 26 17:00:12 2024 +0800 [FLINK-35235][pipeline-connector][kafka] Fix missing dependencies in the uber jar of Kafka pipeline sink --- .../flink-cdc-pipeline-connector-kafka/pom.xml | 10 ++ 1 file changed, 10 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml index 0b0c6f687..5be032a76 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml @@ -83,11 +83,21 @@ limitations under the License. false + + +org.apache.kafka:* + org.apache.flink:flink-connector-kafka + + org.apache.kafka org.apache.flink.cdc.connectors.kafka.shaded.org.apache.kafka + + org.apache.flink.connector.kafka + org.apache.flink.cdc.connectors.kafka.shaded.org.apache.flink.connector.kafka +
(flink-cdc) branch release-3.1 updated: [FLINK-35173][cdc][mysql] Debezium custom time serializer for MySQL connector (#3240) (#3264)
This is an automated email from the ASF dual-hosted git repository. renqs pushed a commit to branch release-3.1 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git The following commit(s) were added to refs/heads/release-3.1 by this push: new 90511b3a6 [FLINK-35173][cdc][mysql] Debezium custom time serializer for MySQL connector (#3240) (#3264) 90511b3a6 is described below commit 90511b3a65f5a3646f70cfca73e54df363e2d119 Author: ConradJam AuthorDate: Fri Apr 26 16:47:43 2024 +0800 [FLINK-35173][cdc][mysql] Debezium custom time serializer for MySQL connector (#3240) (#3264) --- .../cdc/debezium/utils/ConvertTimeBceUtil.java | 58 .../converters/MysqlDebeziumTimeConverter.java | 326 + .../MysqlDebeziumTimeConverterITCase.java | 298 +++ .../src/test/resources/ddl/date_convert_test.sql | 36 +++ 4 files changed, 718 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java new file mode 100644 index 0..b6ac0b226 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.debezium.utils; + +import java.sql.Date; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.chrono.IsoEra; + +/** Convert And Check TimeBce Util. */ +public class ConvertTimeBceUtil { + +private static final Date ONE_CE = Date.valueOf("0001-01-01"); + +public static String resolveEra(boolean isBce, String value) { +String mangledValue = value; +if (isBce) { +if (mangledValue.startsWith("-")) { +mangledValue = mangledValue.substring(1); +} +if (!mangledValue.endsWith(" BC")) { +mangledValue += " BC"; +} +} +return mangledValue; +} + +public static boolean isBce(LocalDate date) { +return date.getEra() == IsoEra.BCE; +} + +public static String resolveEra(LocalDate date, String value) { +return resolveEra(isBce(date), value); +} + +public static String resolveEra(Date date, String value) { +return resolveEra(date.before(ONE_CE), value); +} + +public static String resolveEra(Timestamp timestamp, String value) { +return resolveEra(timestamp.before(ONE_CE), value); +} +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java new file mode 100644 index 0..493fd682c --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.debezium.connector.mysql.converters; + +import
(flink-cdc) branch release-3.1 updated: [minor][cdc][docs] Add user guide about providing extra jar package in quickstart docs
This is an automated email from the ASF dual-hosted git repository. jiabaosun pushed a commit to branch release-3.1 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git The following commit(s) were added to refs/heads/release-3.1 by this push: new 34e1388a7 [minor][cdc][docs] Add user guide about providing extra jar package in quickstart docs 34e1388a7 is described below commit 34e1388a74246103acbcdc1798992c96081e88a8 Author: yux <34335406+yuxiq...@users.noreply.github.com> AuthorDate: Fri Apr 26 16:34:54 2024 +0800 [minor][cdc][docs] Add user guide about providing extra jar package in quickstart docs --- docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md | 3 ++- docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md | 3 ++- docs/content/docs/get-started/quickstart/mysql-to-doris.md| 3 ++- docs/content/docs/get-started/quickstart/mysql-to-starrocks.md| 3 ++- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md b/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md index 86adab6d6..03ddcd3d8 100644 --- a/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md +++ b/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md @@ -188,6 +188,7 @@ MacOS 由于内部实现容器的方式不同,在部署时宿主机直接修 **下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译.** - [MySQL pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar) - [Apache Doris pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar) +- [MySQL Connector Java](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar) 3.编写任务配置 yaml 文件 下面给出了一个整库同步的示例文件 `mysql-to-doris.yaml`: @@ -226,7 +227,7 @@ sink 添加 `table.create.properties.replication_num` 参数是由于 Docker 镜 4. 最后,通过命令行提交任务到 Flink Standalone cluster ```shell - bash bin/flink-cdc.sh mysql-to-doris.yaml + bash bin/flink-cdc.sh mysql-to-doris.yaml --jar lib/mysql-connector-java-8.0.27.jar ``` 提交成功后,返回信息如: ```shell diff --git a/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md b/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md index a52895a40..c13773766 100644 --- a/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md +++ b/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md @@ -149,6 +149,7 @@ under the License. **下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译** - [MySQL pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar) - [StarRocks pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-starrocks/3.0.0/flink-cdc-pipeline-connector-starrocks-3.0.0.jar) + - [MySQL Connector Java](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar) 3. 编写任务配置 yaml 文件 下面给出了一个整库同步的示例文件 mysql-to-starrocks.yaml: @@ -189,7 +190,7 @@ under the License. 4. 最后,通过命令行提交任务到 Flink Standalone cluster ```shell - bash bin/flink-cdc.sh mysql-to-starrocks.yaml + bash bin/flink-cdc.sh mysql-to-starrocks.yaml --jar lib/mysql-connector-java-8.0.27.jar ``` 提交成功后,返回信息如: diff --git a/docs/content/docs/get-started/quickstart/mysql-to-doris.md b/docs/content/docs/get-started/quickstart/mysql-to-doris.md index 0aa080b2f..8e5a31359 100644 --- a/docs/content/docs/get-started/quickstart/mysql-to-doris.md +++ b/docs/content/docs/get-started/quickstart/mysql-to-doris.md @@ -190,6 +190,7 @@ This command automatically starts all the containers defined in the Docker Compo **Download links are available only for stable releases, SNAPSHOT dependencies need to be built based on master or release branches by yourself.** - [MySQL pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar) - [Apache Doris pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar) +- [MySQL Connector Java](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar) 3. Write task configuration yaml file Here is an example file for synchronizing the entire database `mysql-to-doris.yaml`: @@ -228,7 +229,7 @@ Notice that: 4. Finally, submit job to Flink Standalone cluster using Cli. ```shell - bash bin/flink-cdc.sh mysql-to-doris.yaml + bash bin/flink-cdc.sh mysql-to-doris.yaml --jar lib/mysql-connector-java-8.0.27.jar ``` After successful submission, the return information is
(flink-cdc) branch master updated: [minor][cdc][docs] Add user guide about providing extra jar package in quickstart docs
This is an automated email from the ASF dual-hosted git repository. jiabaosun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git The following commit(s) were added to refs/heads/master by this push: new 185f71978 [minor][cdc][docs] Add user guide about providing extra jar package in quickstart docs 185f71978 is described below commit 185f71978d1e3d0ae689d26092e73443344a0391 Author: yux <34335406+yuxiq...@users.noreply.github.com> AuthorDate: Fri Apr 26 16:30:48 2024 +0800 [minor][cdc][docs] Add user guide about providing extra jar package in quickstart docs --- docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md | 3 ++- docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md | 3 ++- docs/content/docs/get-started/quickstart/mysql-to-doris.md| 3 ++- docs/content/docs/get-started/quickstart/mysql-to-starrocks.md| 3 ++- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md b/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md index 9874db579..d584a8b28 100644 --- a/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md +++ b/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md @@ -188,6 +188,7 @@ MacOS 由于内部实现容器的方式不同,在部署时宿主机直接修 **下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译.** - [MySQL pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar) - [Apache Doris pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar) +- [MySQL Connector Java](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar) 3.编写任务配置 yaml 文件 下面给出了一个整库同步的示例文件 `mysql-to-doris.yaml`: @@ -226,7 +227,7 @@ sink 添加 `table.create.properties.replication_num` 参数是由于 Docker 镜 4. 最后,通过命令行提交任务到 Flink Standalone cluster ```shell - bash bin/flink-cdc.sh mysql-to-doris.yaml + bash bin/flink-cdc.sh mysql-to-doris.yaml --jar lib/mysql-connector-java-8.0.27.jar ``` 提交成功后,返回信息如: ```shell diff --git a/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md b/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md index a52895a40..c13773766 100644 --- a/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md +++ b/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md @@ -149,6 +149,7 @@ under the License. **下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译** - [MySQL pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar) - [StarRocks pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-starrocks/3.0.0/flink-cdc-pipeline-connector-starrocks-3.0.0.jar) + - [MySQL Connector Java](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar) 3. 编写任务配置 yaml 文件 下面给出了一个整库同步的示例文件 mysql-to-starrocks.yaml: @@ -189,7 +190,7 @@ under the License. 4. 最后,通过命令行提交任务到 Flink Standalone cluster ```shell - bash bin/flink-cdc.sh mysql-to-starrocks.yaml + bash bin/flink-cdc.sh mysql-to-starrocks.yaml --jar lib/mysql-connector-java-8.0.27.jar ``` 提交成功后,返回信息如: diff --git a/docs/content/docs/get-started/quickstart/mysql-to-doris.md b/docs/content/docs/get-started/quickstart/mysql-to-doris.md index 0aa080b2f..8e5a31359 100644 --- a/docs/content/docs/get-started/quickstart/mysql-to-doris.md +++ b/docs/content/docs/get-started/quickstart/mysql-to-doris.md @@ -190,6 +190,7 @@ This command automatically starts all the containers defined in the Docker Compo **Download links are available only for stable releases, SNAPSHOT dependencies need to be built based on master or release branches by yourself.** - [MySQL pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar) - [Apache Doris pipeline connector 3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar) +- [MySQL Connector Java](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar) 3. Write task configuration yaml file Here is an example file for synchronizing the entire database `mysql-to-doris.yaml`: @@ -228,7 +229,7 @@ Notice that: 4. Finally, submit job to Flink Standalone cluster using Cli. ```shell - bash bin/flink-cdc.sh mysql-to-doris.yaml + bash bin/flink-cdc.sh mysql-to-doris.yaml --jar lib/mysql-connector-java-8.0.27.jar ``` After successful submission, the return information is as follows:
(flink-cdc) branch master updated: [FLINK-35173][cdc][mysql] Debezium custom time serializer for MySQL connector (#3240)
This is an automated email from the ASF dual-hosted git repository. renqs pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git The following commit(s) were added to refs/heads/master by this push: new 6232d8405 [FLINK-35173][cdc][mysql] Debezium custom time serializer for MySQL connector (#3240) 6232d8405 is described below commit 6232d84052422aa88299f28074a8437e91db2988 Author: ConradJam AuthorDate: Fri Apr 26 14:10:34 2024 +0800 [FLINK-35173][cdc][mysql] Debezium custom time serializer for MySQL connector (#3240) --- .../cdc/debezium/utils/ConvertTimeBceUtil.java | 58 .../converters/MysqlDebeziumTimeConverter.java | 326 + .../MysqlDebeziumTimeConverterITCase.java | 298 +++ .../src/test/resources/ddl/date_convert_test.sql | 36 +++ 4 files changed, 718 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java new file mode 100644 index 0..b6ac0b226 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.debezium.utils; + +import java.sql.Date; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.chrono.IsoEra; + +/** Convert And Check TimeBce Util. */ +public class ConvertTimeBceUtil { + +private static final Date ONE_CE = Date.valueOf("0001-01-01"); + +public static String resolveEra(boolean isBce, String value) { +String mangledValue = value; +if (isBce) { +if (mangledValue.startsWith("-")) { +mangledValue = mangledValue.substring(1); +} +if (!mangledValue.endsWith(" BC")) { +mangledValue += " BC"; +} +} +return mangledValue; +} + +public static boolean isBce(LocalDate date) { +return date.getEra() == IsoEra.BCE; +} + +public static String resolveEra(LocalDate date, String value) { +return resolveEra(isBce(date), value); +} + +public static String resolveEra(Date date, String value) { +return resolveEra(date.before(ONE_CE), value); +} + +public static String resolveEra(Timestamp timestamp, String value) { +return resolveEra(timestamp.before(ONE_CE), value); +} +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java new file mode 100644 index 0..493fd682c --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.debezium.connector.mysql.converters; + +import org.apache.flink.cdc.debezium.utils.ConvertTimeBceUtil;