(flink) branch master updated: [FLINK-35246][test] Fix incorrect address construction in SqlClientSSLTest

2024-04-26 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 4e6dbe2d1a2 [FLINK-35246][test] Fix incorrect address construction in 
SqlClientSSLTest
4e6dbe2d1a2 is described below

commit 4e6dbe2d1a225a0d0e48fd0997c1f11317402e42
Author: Weijie Guo 
AuthorDate: Fri Apr 26 16:59:49 2024 +0800

[FLINK-35246][test] Fix incorrect address construction in SqlClientSSLTest
---
 .../java/org/apache/flink/table/client/SqlClientSSLTest.java | 9 -
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientSSLTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientSSLTest.java
index ccb4832f076..627361edaf9 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientSSLTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientSSLTest.java
@@ -28,7 +28,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -66,10 +65,10 @@ class SqlClientSSLTest extends SqlClientTestBase {
 new String[] {
 "gateway",
 "-e",
-InetSocketAddress.createUnresolved(
-
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(),
-
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort())
-.toString()
+String.format(
+"%s:%d",
+
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(),
+
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort())
 };
 String actual = runSqlClient(args, String.join("\n", "SET;", "QUIT;"), 
false);
 assertThat(actual).contains(SecurityOptions.SSL_REST_ENABLED.key(), 
"true");



(flink) branch master updated (714d1cb2e0b -> 3ff2ba43720)

2024-04-26 Thread leiyanfei
This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 714d1cb2e0b [FLINK-35189][test-utils] Introduce test-filesystem 
connector and catalog based on filesystem to support materialized table
 add 713c30f76d5 [FLINK-35026][runtime][config] Introduce async execution 
configurations
 add 3ff2ba43720 [FLINK-35026][runtime] Implement buffer timeout of AEC

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/api/common/ExecutionConfig.java   |  38 +
 .../flink/configuration/ExecutionOptions.java  |  59 +++
 .../asyncprocessing/AsyncExecutionController.java  |  48 +++---
 .../asyncprocessing/StateRequestBuffer.java|  83 +-
 .../AsyncExecutionControllerTest.java  | 182 +
 .../AbstractAsyncStateStreamOperator.java  |  23 ++-
 .../AbstractAsyncStateStreamOperatorV2.java|  18 +-
 .../StreamExecutionEnvironmentTest.java|  30 
 .../InternalTimerServiceAsyncImplTest.java |   2 +-
 9 files changed, 418 insertions(+), 65 deletions(-)



(flink) 03/03: [FLINK-35189][test-utils] Introduce test-filesystem connector and catalog based on filesystem to support materialized table

2024-04-26 Thread ron
This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 714d1cb2e0bd0df03393492dc87cbd800af63e1b
Author: fengli 
AuthorDate: Wed Apr 24 18:18:47 2024 +0800

[FLINK-35189][test-utils] Introduce test-filesystem connector and catalog 
based on filesystem to support materialized table
---
 .../flink-table-filesystem-test-utils/pom.xml  | 103 +++
 .../flink/connector/file/src/TestFileSource.java   | 189 ++
 .../file/table/TestFileSystemTableSource.java  |  88 +++
 .../file/testutils/TestFileSystemTableFactory.java |  54 ++
 .../file/testutils/catalog/JsonSerdeUtil.java  |  61 ++
 .../testutils/catalog/TestFileSystemCatalog.java   | 690 +
 .../catalog/TestFileSystemCatalogFactory.java  |  78 +++
 .../org.apache.flink.table.factories.Factory   |  17 +
 .../testutils/TestFileSystemTableFactoryTest.java  |  64 ++
 .../catalog/TestFileSystemCatalogFactoryTest.java  |  92 +++
 .../catalog/TestFileSystemCatalogITCase.java   |  89 +++
 .../catalog/TestFileSystemCatalogTest.java | 379 +++
 .../catalog/TestFileSystemCatalogTestBase.java |  56 ++
 .../src/test/resources/log4j2-test.properties  |  28 +
 flink-test-utils-parent/pom.xml|   1 +
 15 files changed, 1989 insertions(+)

diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/pom.xml 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/pom.xml
new file mode 100644
index 000..e1c8fdce665
--- /dev/null
+++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/pom.xml
@@ -0,0 +1,103 @@
+
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+4.0.0
+
+
+org.apache.flink
+flink-test-utils-parent
+1.20-SNAPSHOT
+
+
+flink-table-filesystem-test-utils
+Flink : Test utils : Table Filesystem
+
+
+
+org.apache.flink
+flink-core
+${project.version}
+provided
+
+
+org.apache.flink
+flink-table-common
+${project.version}
+provided
+
+
+org.apache.flink
+flink-table-api-java-bridge
+${project.version}
+provided
+
+
+org.apache.flink
+flink-connector-files
+${project.version}
+provided
+
+
+
+
+org.apache.flink
+flink-table-common
+${project.version}
+tests
+test
+
+
+org.apache.flink
+flink-table-api-java
+${project.version}
+tests
+test
+
+
+org.apache.flink
+
flink-table-planner_${scala.binary.version}
+${project.version}
+test
+
+
+org.apache.flink
+
flink-table-planner_${scala.binary.version}
+${project.version}
+test
+test-jar
+
+
+org.apache.flink
+flink-csv
+${project.version}
+test
+
+
+org.apache.flink
+flink-test-utils
+${project.version}
+test
+
+
+
+
\ No newline at end of file
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java
new file mode 100644
index 000..5d0e9ace77f
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import 

(flink) branch master updated (f64654e9749 -> 714d1cb2e0b)

2024-04-26 Thread ron
This is an automated email from the ASF dual-hosted git repository.

ron pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from f64654e9749 add HTTP options to java-storage client
 new 00d69e53fe7 [FLINK-35189][table-common] CatalogPropertiesUtil support 
serialize and deserialize materialized table
 new e3cda01cac7 [FLINK-35189][connectors/filesystem] Modify the visibility 
of filesystem connector related methods to protected
 new 714d1cb2e0b [FLINK-35189][test-utils] Introduce test-filesystem 
connector and catalog based on filesystem to support materialized table

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connector/file/src/AbstractFileSource.java |  11 +-
 .../file/table/FileSystemTableFactory.java |   4 +-
 .../file/table/FileSystemTableSource.java  |   4 +-
 .../flink/table/catalog/CatalogPropertiesUtil.java | 140 -
 .../flink-table-filesystem-test-utils/pom.xml  | 103 +++
 .../flink/connector/file/src/TestFileSource.java   | 189 ++
 .../file/table/TestFileSystemTableSource.java  |  88 +++
 .../file/testutils/TestFileSystemTableFactory.java |  54 ++
 .../file/testutils/catalog/JsonSerdeUtil.java  |  61 ++
 .../testutils/catalog/TestFileSystemCatalog.java   | 690 +
 .../catalog/TestFileSystemCatalogFactory.java  |  78 +++
 .../org.apache.flink.table.factories.Factory   |  17 +
 .../testutils/TestFileSystemTableFactoryTest.java  |  64 ++
 .../catalog/TestFileSystemCatalogFactoryTest.java  |  92 +++
 .../catalog/TestFileSystemCatalogITCase.java   |  89 +++
 .../catalog/TestFileSystemCatalogTest.java | 379 +++
 .../catalog/TestFileSystemCatalogTestBase.java |  56 ++
 .../src/test/resources/log4j2-test.properties  |   0
 flink-test-utils-parent/pom.xml|   1 +
 19 files changed, 2111 insertions(+), 9 deletions(-)
 create mode 100644 
flink-test-utils-parent/flink-table-filesystem-test-utils/pom.xml
 create mode 100644 
flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java
 create mode 100644 
flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java
 create mode 100644 
flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
 create mode 100644 
flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/JsonSerdeUtil.java
 create mode 100644 
flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java
 create mode 100644 
flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java
 create mode 100644 
flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 create mode 100644 
flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java
 create mode 100644 
flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactoryTest.java
 create mode 100644 
flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogITCase.java
 create mode 100644 
flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
 create mode 100644 
flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTestBase.java
 copy {flink-table/flink-table-api-java => 
flink-test-utils-parent/flink-table-filesystem-test-utils}/src/test/resources/log4j2-test.properties
 (100%)



(flink) 02/03: [FLINK-35189][connectors/filesystem] Modify the visibility of filesystem connector related methods to protected

2024-04-26 Thread ron
This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e3cda01cac737c97a104feae0e35da1eb91a8751
Author: fengli 
AuthorDate: Wed Apr 24 18:14:53 2024 +0800

[FLINK-35189][connectors/filesystem] Modify the visibility of filesystem 
connector related methods to protected
---
 .../apache/flink/connector/file/src/AbstractFileSource.java   | 11 ---
 .../flink/connector/file/table/FileSystemTableFactory.java|  4 ++--
 .../flink/connector/file/table/FileSystemTableSource.java |  4 ++--
 3 files changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
index f4fb463e10e..b14d46b3f9c 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
@@ -147,7 +147,7 @@ public abstract class AbstractFileSource
 throw new FlinkRuntimeException("Could not enumerate file splits", 
e);
 }
 
-return createSplitEnumerator(enumContext, enumerator, splits, null);
+return createSplitEnumerator(getBoundedness(), enumContext, 
enumerator, splits, null);
 }
 
 @Override
@@ -164,7 +164,11 @@ public abstract class AbstractFileSource
 (Collection) checkpoint.getSplits();
 
 return createSplitEnumerator(
-enumContext, enumerator, splits, 
checkpoint.getAlreadyProcessedPaths());
+getBoundedness(),
+enumContext,
+enumerator,
+splits,
+checkpoint.getAlreadyProcessedPaths());
 }
 
 @Override
@@ -186,6 +190,7 @@ public abstract class AbstractFileSource
 // 
 
 private SplitEnumerator> 
createSplitEnumerator(
+Boundedness boundedness,
 SplitEnumeratorContext context,
 FileEnumerator enumerator,
 Collection splits,
@@ -199,7 +204,7 @@ public abstract class AbstractFileSource
 
 final FileSplitAssigner splitAssigner = assignerFactory.create(splits);
 
-if (continuousEnumerationSettings == null) {
+if (Boundedness.BOUNDED == boundedness) {
 // bounded case
 return castGeneric(new StaticFileSplitEnumerator(fileSplitContext, 
splitAssigner));
 } else {
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
index 3f0f415b863..16593ffd3e7 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
@@ -149,7 +149,7 @@ public class FileSystemTableFactory implements 
DynamicTableSourceFactory, Dynami
 .collect(Collectors.toSet());
 }
 
-private void validate(FactoryUtil.TableFactoryHelper helper) {
+protected void validate(FactoryUtil.TableFactoryHelper helper) {
 // Except format options, some formats like parquet and orc can not 
list all supported
 // options.
 helper.validateExcept(helper.getOptions().get(FactoryUtil.FORMAT) + 
".");
@@ -160,7 +160,7 @@ public class FileSystemTableFactory implements 
DynamicTableSourceFactory, Dynami
 
.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE));
 }
 
-private > DecodingFormat 
discoverDecodingFormat(
+protected > DecodingFormat 
discoverDecodingFormat(
 Context context, Class formatFactoryClass) {
 FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
 if (formatFactoryExists(context, formatFactoryClass)) {
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
index 1e53eb53cfb..bbaa2a310d5 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
@@ -264,7 +264,7 @@ public class FileSystemTableSource extends 
AbstractFileSystemTable
 return 

(flink) 01/03: [FLINK-35189][table-common] CatalogPropertiesUtil support serialize and deserialize materialized table

2024-04-26 Thread ron
This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 00d69e53fe7b263293608f0a949525441b82e09e
Author: fengli 
AuthorDate: Wed Apr 24 18:11:03 2024 +0800

[FLINK-35189][table-common] CatalogPropertiesUtil support serialize and 
deserialize materialized table
---
 .../flink/table/catalog/CatalogPropertiesUtil.java | 140 -
 1 file changed, 138 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
index 909e8f1c562..9bb6280cc7d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
@@ -28,9 +28,12 @@ import 
org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
 
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -117,6 +120,56 @@ public final class CatalogPropertiesUtil {
 }
 }
 
+/**
+ * Serializes the given {@link ResolvedCatalogMaterializedTable} into a 
map of string
+ * properties.
+ */
+public static Map serializeCatalogMaterializedTable(
+ResolvedCatalogMaterializedTable resolvedMaterializedTable) {
+try {
+final Map properties = new HashMap<>();
+
+serializeResolvedSchema(properties, 
resolvedMaterializedTable.getResolvedSchema());
+
+final String comment = resolvedMaterializedTable.getComment();
+if (comment != null && comment.length() > 0) {
+properties.put(COMMENT, comment);
+}
+
+final Optional snapshot = 
resolvedMaterializedTable.getSnapshot();
+snapshot.ifPresent(snapshotId -> properties.put(SNAPSHOT, 
Long.toString(snapshotId)));
+
+serializePartitionKeys(properties, 
resolvedMaterializedTable.getPartitionKeys());
+
+properties.putAll(resolvedMaterializedTable.getOptions());
+
+properties.put(DEFINITION_QUERY, 
resolvedMaterializedTable.getDefinitionQuery());
+properties.put(FRESHNESS, 
resolvedMaterializedTable.getFreshness().toString());
+
+properties.put(
+LOGICAL_REFRESH_MODE, 
resolvedMaterializedTable.getLogicalRefreshMode().name());
+properties.put(REFRESH_MODE, 
resolvedMaterializedTable.getRefreshMode().name());
+properties.put(REFRESH_STATUS, 
resolvedMaterializedTable.getRefreshStatus().name());
+
+resolvedMaterializedTable
+.getRefreshHandlerDescription()
+.ifPresent(
+refreshHandlerDesc ->
+properties.put(REFRESH_HANDLER_DESC, 
refreshHandlerDesc));
+if (resolvedMaterializedTable.getSerializedRefreshHandler() != 
null) {
+properties.put(
+REFRESH_HANDLER_BYTES,
+new String(
+
resolvedMaterializedTable.getSerializedRefreshHandler(),
+StandardCharsets.UTF_8));
+}
+
+return properties;
+} catch (Exception e) {
+throw new CatalogException("Error in serializing catalog 
materialized table.", e);
+}
+}
+
 /** Deserializes the given map of string properties into an unresolved 
{@link CatalogTable}. */
 public static CatalogTable deserializeCatalogTable(Map 
properties) {
 return deserializeCatalogTable(properties, null);
@@ -128,7 +181,7 @@ public final class CatalogPropertiesUtil {
  * @param properties The properties to deserialize from
  * @param fallbackKey The fallback key to get the schema properties. This 
is meant to support
  * the old table (1.10) deserialization
- * @return
+ * @return a catalog table instance.
  */
 public static CatalogTable deserializeCatalogTable(
 Map properties, @Nullable String fallbackKey) {
@@ -158,6 +211,64 @@ public final class CatalogPropertiesUtil {
 }
 }
 
+/**
+ * Deserializes the given map of string properties into an unresolved 
{@link
+ * CatalogMaterializedTable}.
+ */
+public static CatalogMaterializedTable deserializeCatalogMaterializedTable(
+Map properties) {
+try {
+final Schema schema = 

(flink) branch master updated: add HTTP options to java-storage client

2024-04-26 Thread dannycranmer
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new f64654e9749 add HTTP options to java-storage client
f64654e9749 is described below

commit f64654e97490963f2956d5a632e7a0303662be6a
Author: Ravi Dutt Singh 
AuthorDate: Wed Aug 16 19:20:57 2023 +0530

add HTTP options to java-storage client
---
 docs/content.zh/docs/deployment/filesystems/gcs.md |  4 ++-
 docs/content/docs/deployment/filesystems/gcs.md|  8 +++---
 .../apache/flink/fs/gs/GSFileSystemFactory.java| 12 +
 .../apache/flink/fs/gs/GSFileSystemOptions.java| 31 ++
 4 files changed, 51 insertions(+), 4 deletions(-)

diff --git a/docs/content.zh/docs/deployment/filesystems/gcs.md 
b/docs/content.zh/docs/deployment/filesystems/gcs.md
index ed889c5ec60..444c3d0d5a3 100644
--- a/docs/content.zh/docs/deployment/filesystems/gcs.md
+++ b/docs/content.zh/docs/deployment/filesystems/gcs.md
@@ -83,7 +83,9 @@ You can also set `gcs-connector` options directly in the 
Hadoop `core-site.xml`
 
|-|-
 [...]
 | gs.writer.temporary.bucket.name | Set this property to choose a bucket to 
hold temporary blobs for in-progress writes via `RecoverableWriter`. If this 
property is not set, temporary blobs will be written to same bucket as the 
final file being written. In either case, temporary blobs are written with the 
prefix `.inprogress/`.   It is recommended to choose a separate bucket 
in order to [assign it a TTL](https://cloud.google.com/storage/docs/lifecycle), 
to provide a mechanism to  [...]
 | gs.writer.chunk.size| Set this property to [set the chunk 
size](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.WriteChannel#com_google_cloud_WriteChannel_setChunkSize_int_)
 for writes via `RecoverableWriter`. If not set, a Google-determined 
default chunk size will be used.

  [...]
-| gs.filesink.entropy.enabled | Set this property to improve performance 
due to hotspotting issues on GCS. This option defines whether to enable entropy 
injection in filesink gcs path. If this is enabled, entropy in the form of 
temporary object id will be injected in beginning of the gcs path of the 
temporary objects. The final object path remains unchanged. 

[...]
+| gs.filesink.entropy.enabled | Set this property to improve performance 
due to hotspotting issues on GCS. This option defines whether to enable entropy 
injection in filesink gcs path. If this is enabled, entropy in the form of 
temporary object id will be injected in beginning of the gcs path of the 
temporary objects. The final object path remains unchanged. 

[...]
+| gs.http.connect-timeout | Set this property to [set the connection 
timeout](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.http.HttpTransportOptions.Builder#com_google_cloud_http_HttpTransportOptions_Builder_setConnectTimeout_int_)
 for java-storage client.   

[...]
+| gs.http.read-timeout| Set this property to [set the content read 
timeout](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.http.HttpTransportOptions.Builder#com_google_cloud_http_HttpTransportOptions_Builder_setReadTimeout_int_)
 from connection established via java-storage client.   

 [...]
 
 ### Authentication to access GCS
 
diff --git a/docs/content/docs/deployment/filesystems/gcs.md 
b/docs/content/docs/deployment/filesystems/gcs.md
index 58670cb3355..8f0f69fafcb 100644
--- a/docs/content/docs/deployment/filesystems/gcs.md
+++ 

(flink-cdc) branch master updated: [FLINK-35235][pipeline-connector][kafka] Fix missing dependencies in the uber jar of Kafka pipeline sink

2024-04-26 Thread jiabaosun
This is an automated email from the ASF dual-hosted git repository.

jiabaosun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
 new ec643c9dd [FLINK-35235][pipeline-connector][kafka] Fix missing 
dependencies in the uber jar of Kafka pipeline sink
ec643c9dd is described below

commit ec643c9dd7365261f3cee620d4d6bd5d042917e0
Author: Kunni 
AuthorDate: Fri Apr 26 16:59:48 2024 +0800

[FLINK-35235][pipeline-connector][kafka] Fix missing dependencies in the 
uber jar of Kafka pipeline sink
---
 .../flink-cdc-pipeline-connector-kafka/pom.xml | 10 ++
 1 file changed, 10 insertions(+)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
index 0b0c6f687..5be032a76 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
@@ -83,11 +83,21 @@ limitations under the License.
 
 
 false
+
+
+org.apache.kafka:*
+
org.apache.flink:flink-connector-kafka
+
+
 
 
 org.apache.kafka
 
org.apache.flink.cdc.connectors.kafka.shaded.org.apache.kafka
 
+
+
org.apache.flink.connector.kafka
+
org.apache.flink.cdc.connectors.kafka.shaded.org.apache.flink.connector.kafka
+
 
 
 



(flink-cdc) branch release-3.1 updated: [FLINK-35235][pipeline-connector][kafka] Fix missing dependencies in the uber jar of Kafka pipeline sink

2024-04-26 Thread jiabaosun
This is an automated email from the ASF dual-hosted git repository.

jiabaosun pushed a commit to branch release-3.1
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/release-3.1 by this push:
 new b96ea11cc [FLINK-35235][pipeline-connector][kafka] Fix missing 
dependencies in the uber jar of Kafka pipeline sink
b96ea11cc is described below

commit b96ea11cc7df6c3d57a155573f29c18bf9d787ae
Author: Kunni 
AuthorDate: Fri Apr 26 17:00:12 2024 +0800

[FLINK-35235][pipeline-connector][kafka] Fix missing dependencies in the 
uber jar of Kafka pipeline sink
---
 .../flink-cdc-pipeline-connector-kafka/pom.xml | 10 ++
 1 file changed, 10 insertions(+)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
index 0b0c6f687..5be032a76 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
@@ -83,11 +83,21 @@ limitations under the License.
 
 
 false
+
+
+org.apache.kafka:*
+
org.apache.flink:flink-connector-kafka
+
+
 
 
 org.apache.kafka
 
org.apache.flink.cdc.connectors.kafka.shaded.org.apache.kafka
 
+
+
org.apache.flink.connector.kafka
+
org.apache.flink.cdc.connectors.kafka.shaded.org.apache.flink.connector.kafka
+
 
 
 



(flink-cdc) branch release-3.1 updated: [FLINK-35173][cdc][mysql] Debezium custom time serializer for MySQL connector (#3240) (#3264)

2024-04-26 Thread renqs
This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch release-3.1
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/release-3.1 by this push:
 new 90511b3a6 [FLINK-35173][cdc][mysql] Debezium custom time serializer 
for MySQL connector (#3240) (#3264)
90511b3a6 is described below

commit 90511b3a65f5a3646f70cfca73e54df363e2d119
Author: ConradJam 
AuthorDate: Fri Apr 26 16:47:43 2024 +0800

[FLINK-35173][cdc][mysql] Debezium custom time serializer for MySQL 
connector (#3240) (#3264)
---
 .../cdc/debezium/utils/ConvertTimeBceUtil.java |  58 
 .../converters/MysqlDebeziumTimeConverter.java | 326 +
 .../MysqlDebeziumTimeConverterITCase.java  | 298 +++
 .../src/test/resources/ddl/date_convert_test.sql   |  36 +++
 4 files changed, 718 insertions(+)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java
new file mode 100644
index 0..b6ac0b226
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.debezium.utils;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.chrono.IsoEra;
+
+/** Convert And Check TimeBce Util. */
+public class ConvertTimeBceUtil {
+
+private static final Date ONE_CE = Date.valueOf("0001-01-01");
+
+public static String resolveEra(boolean isBce, String value) {
+String mangledValue = value;
+if (isBce) {
+if (mangledValue.startsWith("-")) {
+mangledValue = mangledValue.substring(1);
+}
+if (!mangledValue.endsWith(" BC")) {
+mangledValue += " BC";
+}
+}
+return mangledValue;
+}
+
+public static boolean isBce(LocalDate date) {
+return date.getEra() == IsoEra.BCE;
+}
+
+public static String resolveEra(LocalDate date, String value) {
+return resolveEra(isBce(date), value);
+}
+
+public static String resolveEra(Date date, String value) {
+return resolveEra(date.before(ONE_CE), value);
+}
+
+public static String resolveEra(Timestamp timestamp, String value) {
+return resolveEra(timestamp.before(ONE_CE), value);
+}
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java
new file mode 100644
index 0..493fd682c
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.mysql.converters;
+
+import 

(flink-cdc) branch release-3.1 updated: [minor][cdc][docs] Add user guide about providing extra jar package in quickstart docs

2024-04-26 Thread jiabaosun
This is an automated email from the ASF dual-hosted git repository.

jiabaosun pushed a commit to branch release-3.1
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/release-3.1 by this push:
 new 34e1388a7 [minor][cdc][docs] Add user guide about providing extra jar 
package in quickstart docs
34e1388a7 is described below

commit 34e1388a74246103acbcdc1798992c96081e88a8
Author: yux <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Fri Apr 26 16:34:54 2024 +0800

[minor][cdc][docs] Add user guide about providing extra jar package in 
quickstart docs
---
 docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md | 3 ++-
 docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md | 3 ++-
 docs/content/docs/get-started/quickstart/mysql-to-doris.md| 3 ++-
 docs/content/docs/get-started/quickstart/mysql-to-starrocks.md| 3 ++-
 4 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md 
b/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md
index 86adab6d6..03ddcd3d8 100644
--- a/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md
+++ b/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md
@@ -188,6 +188,7 @@ MacOS 由于内部实现容器的方式不同,在部署时宿主机直接修
**下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译.**
 - [MySQL pipeline connector 
3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar)
 - [Apache Doris pipeline connector 
3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar)
+- [MySQL Connector 
Java](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar)
 
 3.编写任务配置 yaml 文件
 下面给出了一个整库同步的示例文件 `mysql-to-doris.yaml`:
@@ -226,7 +227,7 @@ sink 添加 `table.create.properties.replication_num` 参数是由于 
Docker 镜
 
 4. 最后,通过命令行提交任务到 Flink Standalone cluster
```shell
-   bash bin/flink-cdc.sh mysql-to-doris.yaml
+   bash bin/flink-cdc.sh mysql-to-doris.yaml --jar 
lib/mysql-connector-java-8.0.27.jar
```
 提交成功后,返回信息如:
```shell
diff --git a/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md 
b/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md
index a52895a40..c13773766 100644
--- a/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md
+++ b/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md
@@ -149,6 +149,7 @@ under the License.
**下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译**
- [MySQL pipeline connector 
3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar)
- [StarRocks pipeline connector 
3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-starrocks/3.0.0/flink-cdc-pipeline-connector-starrocks-3.0.0.jar)
+   - [MySQL Connector 
Java](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar)
 
 3. 编写任务配置 yaml 文件  
下面给出了一个整库同步的示例文件 mysql-to-starrocks.yaml:
@@ -189,7 +190,7 @@ under the License.
 4. 最后,通过命令行提交任务到 Flink Standalone cluster
 
```shell
-   bash bin/flink-cdc.sh mysql-to-starrocks.yaml
+   bash bin/flink-cdc.sh mysql-to-starrocks.yaml --jar 
lib/mysql-connector-java-8.0.27.jar
```
 
 提交成功后,返回信息如:
diff --git a/docs/content/docs/get-started/quickstart/mysql-to-doris.md 
b/docs/content/docs/get-started/quickstart/mysql-to-doris.md
index 0aa080b2f..8e5a31359 100644
--- a/docs/content/docs/get-started/quickstart/mysql-to-doris.md
+++ b/docs/content/docs/get-started/quickstart/mysql-to-doris.md
@@ -190,6 +190,7 @@ This command automatically starts all the containers 
defined in the Docker Compo
**Download links are available only for stable releases, SNAPSHOT 
dependencies need to be built based on master or release branches by yourself.**
 - [MySQL pipeline connector 
3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar)
 - [Apache Doris pipeline connector 
3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar)
+- [MySQL Connector 
Java](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar)
 
 3. Write task configuration yaml file 
   Here is an example file for synchronizing the entire database 
`mysql-to-doris.yaml`:
@@ -228,7 +229,7 @@ Notice that:
 
 4. Finally, submit job to Flink Standalone cluster using Cli.
```shell
-   bash bin/flink-cdc.sh mysql-to-doris.yaml
+   bash bin/flink-cdc.sh mysql-to-doris.yaml --jar 
lib/mysql-connector-java-8.0.27.jar
```
 After successful submission, the return information is 

(flink-cdc) branch master updated: [minor][cdc][docs] Add user guide about providing extra jar package in quickstart docs

2024-04-26 Thread jiabaosun
This is an automated email from the ASF dual-hosted git repository.

jiabaosun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
 new 185f71978 [minor][cdc][docs] Add user guide about providing extra jar 
package in quickstart docs
185f71978 is described below

commit 185f71978d1e3d0ae689d26092e73443344a0391
Author: yux <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Fri Apr 26 16:30:48 2024 +0800

[minor][cdc][docs] Add user guide about providing extra jar package in 
quickstart docs
---
 docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md | 3 ++-
 docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md | 3 ++-
 docs/content/docs/get-started/quickstart/mysql-to-doris.md| 3 ++-
 docs/content/docs/get-started/quickstart/mysql-to-starrocks.md| 3 ++-
 4 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md 
b/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md
index 9874db579..d584a8b28 100644
--- a/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md
+++ b/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md
@@ -188,6 +188,7 @@ MacOS 由于内部实现容器的方式不同,在部署时宿主机直接修
**下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译.**
 - [MySQL pipeline connector 
3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar)
 - [Apache Doris pipeline connector 
3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar)
+- [MySQL Connector 
Java](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar)
 
 3.编写任务配置 yaml 文件
 下面给出了一个整库同步的示例文件 `mysql-to-doris.yaml`:
@@ -226,7 +227,7 @@ sink 添加 `table.create.properties.replication_num` 参数是由于 
Docker 镜
 
 4. 最后,通过命令行提交任务到 Flink Standalone cluster
```shell
-   bash bin/flink-cdc.sh mysql-to-doris.yaml
+   bash bin/flink-cdc.sh mysql-to-doris.yaml --jar 
lib/mysql-connector-java-8.0.27.jar
```
 提交成功后,返回信息如:
```shell
diff --git a/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md 
b/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md
index a52895a40..c13773766 100644
--- a/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md
+++ b/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md
@@ -149,6 +149,7 @@ under the License.
**下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译**
- [MySQL pipeline connector 
3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar)
- [StarRocks pipeline connector 
3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-starrocks/3.0.0/flink-cdc-pipeline-connector-starrocks-3.0.0.jar)
+   - [MySQL Connector 
Java](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar)
 
 3. 编写任务配置 yaml 文件  
下面给出了一个整库同步的示例文件 mysql-to-starrocks.yaml:
@@ -189,7 +190,7 @@ under the License.
 4. 最后,通过命令行提交任务到 Flink Standalone cluster
 
```shell
-   bash bin/flink-cdc.sh mysql-to-starrocks.yaml
+   bash bin/flink-cdc.sh mysql-to-starrocks.yaml --jar 
lib/mysql-connector-java-8.0.27.jar
```
 
 提交成功后,返回信息如:
diff --git a/docs/content/docs/get-started/quickstart/mysql-to-doris.md 
b/docs/content/docs/get-started/quickstart/mysql-to-doris.md
index 0aa080b2f..8e5a31359 100644
--- a/docs/content/docs/get-started/quickstart/mysql-to-doris.md
+++ b/docs/content/docs/get-started/quickstart/mysql-to-doris.md
@@ -190,6 +190,7 @@ This command automatically starts all the containers 
defined in the Docker Compo
**Download links are available only for stable releases, SNAPSHOT 
dependencies need to be built based on master or release branches by yourself.**
 - [MySQL pipeline connector 
3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar)
 - [Apache Doris pipeline connector 
3.0.0](https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar)
+- [MySQL Connector 
Java](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar)
 
 3. Write task configuration yaml file 
   Here is an example file for synchronizing the entire database 
`mysql-to-doris.yaml`:
@@ -228,7 +229,7 @@ Notice that:
 
 4. Finally, submit job to Flink Standalone cluster using Cli.
```shell
-   bash bin/flink-cdc.sh mysql-to-doris.yaml
+   bash bin/flink-cdc.sh mysql-to-doris.yaml --jar 
lib/mysql-connector-java-8.0.27.jar
```
 After successful submission, the return information is as follows:

(flink-cdc) branch master updated: [FLINK-35173][cdc][mysql] Debezium custom time serializer for MySQL connector (#3240)

2024-04-26 Thread renqs
This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
 new 6232d8405 [FLINK-35173][cdc][mysql] Debezium custom time serializer 
for MySQL connector (#3240)
6232d8405 is described below

commit 6232d84052422aa88299f28074a8437e91db2988
Author: ConradJam 
AuthorDate: Fri Apr 26 14:10:34 2024 +0800

[FLINK-35173][cdc][mysql] Debezium custom time serializer for MySQL 
connector (#3240)
---
 .../cdc/debezium/utils/ConvertTimeBceUtil.java |  58 
 .../converters/MysqlDebeziumTimeConverter.java | 326 +
 .../MysqlDebeziumTimeConverterITCase.java  | 298 +++
 .../src/test/resources/ddl/date_convert_test.sql   |  36 +++
 4 files changed, 718 insertions(+)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java
new file mode 100644
index 0..b6ac0b226
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.debezium.utils;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.chrono.IsoEra;
+
+/** Convert And Check TimeBce Util. */
+public class ConvertTimeBceUtil {
+
+private static final Date ONE_CE = Date.valueOf("0001-01-01");
+
+public static String resolveEra(boolean isBce, String value) {
+String mangledValue = value;
+if (isBce) {
+if (mangledValue.startsWith("-")) {
+mangledValue = mangledValue.substring(1);
+}
+if (!mangledValue.endsWith(" BC")) {
+mangledValue += " BC";
+}
+}
+return mangledValue;
+}
+
+public static boolean isBce(LocalDate date) {
+return date.getEra() == IsoEra.BCE;
+}
+
+public static String resolveEra(LocalDate date, String value) {
+return resolveEra(isBce(date), value);
+}
+
+public static String resolveEra(Date date, String value) {
+return resolveEra(date.before(ONE_CE), value);
+}
+
+public static String resolveEra(Timestamp timestamp, String value) {
+return resolveEra(timestamp.before(ONE_CE), value);
+}
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java
new file mode 100644
index 0..493fd682c
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.mysql.converters;
+
+import org.apache.flink.cdc.debezium.utils.ConvertTimeBceUtil;