(flink-cdc) branch master updated: [FLINK-35121][common] Adds validation for pipeline definition options

2024-06-13 Thread jiabaosun
This is an automated email from the ASF dual-hosted git repository.

jiabaosun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
 new 2bd2e4ce2 [FLINK-35121][common] Adds validation for pipeline 
definition options
2bd2e4ce2 is described below

commit 2bd2e4ce24ec0cc6a11129e3e3b32af6a09dd977
Author: yux <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Fri Jun 14 12:17:38 2024 +0800

[FLINK-35121][common] Adds validation for pipeline definition options
---
 .../docs/connectors/pipeline-connectors/mysql.md   |  15 ++
 .../docs/connectors/pipeline-connectors/mysql.md   |  15 ++
 .../org/apache/flink/cdc/cli/CliFrontendTest.java  |   3 +-
 .../parser/YamlPipelineDefinitionParserTest.java   |   8 +-
 .../definitions/pipeline-definition-full.yaml  |   2 +-
 .../definitions/pipeline-definition-minimized.yaml |   3 +
 .../resources/global-config/global-config.yaml |   2 +-
 .../cdc/common/configuration/Configuration.java|  17 ++
 .../flink/cdc/common/factories/FactoryHelper.java  | 121 ++
 .../cdc/common/factories/FactoryHelperTests.java   | 174 +
 .../flink/cdc/composer/definition/PipelineDef.java |  45 ++
 .../definition/PipelineValidationTest.java |  85 ++
 .../doris/factory/DorisDataSinkFactory.java|   9 +-
 .../kafka/sink/KafkaDataSinkFactory.java   |  15 +-
 .../kafka/sink/KafkaDataSinkFactoryTest.java   |  54 ++-
 .../mysql/factory/MySqlDataSourceFactory.java  |  26 +--
 .../mysql/source/MySqlDataSourceFactoryTest.java   |  74 +
 .../paimon/sink/PaimonDataSinkFactory.java |  17 +-
 .../paimon/sink/PaimonDataSinkOptions.java |   2 +-
 .../paimon/sink/PaimonDataSinkFactoryTest.java | 112 -
 .../starrocks/sink/StarRocksDataSinkFactory.java   |   9 +-
 .../sink/StarRocksDataSinkFactoryTest.java | 108 -
 .../values/factory/ValuesDataFactory.java  |   5 +
 23 files changed, 876 insertions(+), 45 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
index b1f4a945e..9f9465ed7 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
@@ -275,6 +275,21 @@ pipeline:
 - `specific-offset`:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 
在集群上启用时通过 GTID 集合指定。
 - `timestamp`:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
 
+例如,可以在 YAML 配置文件中这样指定启动模式:
+
+```yaml
+source:
+  type: mysql
+  scan.startup.mode: earliest-offset# Start from earliest 
offset
+  scan.startup.mode: latest-offset  # Start from latest 
offset
+  scan.startup.mode: specific-offset# Start from specific 
offset
+  scan.startup.mode: timestamp  # Start from timestamp
+  scan.startup.specific-offset.file: 'mysql-bin.03' # Binlog filename 
under specific offset startup mode
+  scan.startup.specific-offset.pos: 4   # Binlog position 
under specific offset mode
+  scan.startup.specific-offset.gtid-set: 24DA167-...# GTID set under 
specific offset startup mode
+  scan.startup.timestamp-millis: 166723200  # Timestamp under 
timestamp startup mode
+  # ...
+```
 
 ## 数据类型映射
 
diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md 
b/docs/content/docs/connectors/pipeline-connectors/mysql.md
index b450c4017..30feb47f7 100644
--- a/docs/content/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md
@@ -284,6 +284,21 @@ The config option `scan.startup.mode` specifies the 
startup mode for MySQL CDC c
   specified with binlog filename and position, or a GTID set if GTID is 
enabled on server.
 - `timestamp`: Skip snapshot phase and start reading binlog events from a 
specific timestamp.
 
+For example in YAML definition:
+
+```yaml
+source:
+  type: mysql
+  scan.startup.mode: earliest-offset# Start from earliest 
offset
+  scan.startup.mode: latest-offset  # Start from latest 
offset
+  scan.startup.mode: specific-offset# Start from specific 
offset
+  scan.startup.mode: timestamp  # Start from timestamp
+  scan.startup.specific-offset.file: 'mysql-bin.03' # Binlog filename 
under specific offset startup mode
+  scan.startup.specific-offset.pos: 4   # Binlog position 
under specific offset mode
+  scan.startup.specific-offset.gtid-set: 24DA167-...# GTID set under 
specific offset startup mode
+  scan.startup.timestamp-millis: 166723200  # Timestamp under 
timestamp startup mode
+  # ...
+```
 
 ## Data Type Mapping
 
diff --git 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.jav

(flink) branch master updated: [FLINK-35543][HIVE] Upgrade Hive 2.3 connector to version 2.3.10 (#24905)

2024-06-13 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 6c05981948e [FLINK-35543][HIVE] Upgrade Hive 2.3 connector to version 
2.3.10 (#24905)
6c05981948e is described below

commit 6c05981948ea72f79fef94282d6e8c648951092b
Author: Cheng Pan 
AuthorDate: Fri Jun 14 11:40:52 2024 +0800

[FLINK-35543][HIVE] Upgrade Hive 2.3 connector to version 2.3.10 (#24905)
---
 .../docs/connectors/table/hive/overview.md |  9 ++---
 .../content/docs/connectors/table/hive/overview.md |  9 ++---
 .../table/catalog/hive/client/HiveShimLoader.java  |  4 +++
 .../table/catalog/hive/client/HiveShimV2310.java   | 22 +
 .../table/endpoint/hive/HiveServer2Endpoint.java   |  5 ---
 .../connectors/hive/HiveRunnerShimLoader.java  |  1 +
 .../pom.xml| 10 ++
 .../src/main/resources/META-INF/NOTICE | 38 +-
 .../main/resources/META-INF/licenses/LICENSE.antlr |  0
 .../resources/META-INF/licenses/LICENSE.javolution |  0
 .../main/resources/META-INF/licenses/LICENSE.kryo  |  0
 .../resources/META-INF/licenses/LICENSE.minlog |  0
 .../resources/META-INF/licenses/LICENSE.protobuf   |  0
 .../resources/META-INF/licenses/LICENSE.reflectasm |  0
 .../resources/META-INF/licenses/LICENSE.slf4j-api  |  0
 .../main/resources/META-INF/licenses/LICENSE.jodd  | 25 --
 flink-connectors/pom.xml   |  2 +-
 pom.xml|  2 +-
 ...modules-defining-excess-dependencies.modulelist |  2 +-
 19 files changed, 58 insertions(+), 71 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/hive/overview.md 
b/docs/content.zh/docs/connectors/table/hive/overview.md
index ea2a72056ed..49dcdd96292 100644
--- a/docs/content.zh/docs/connectors/table/hive/overview.md
+++ b/docs/content.zh/docs/connectors/table/hive/overview.md
@@ -54,6 +54,7 @@ Flink 支持以下的 Hive 版本。
 - 2.3.7
 - 2.3.8
 - 2.3.9
+- 2.3.10
 - 3.1
 - 3.1.0
 - 3.1.1
@@ -87,10 +88,10 @@ export HADOOP_CLASSPATH=`hadoop classpath`
 
 下表列出了所有可用的 Hive jar。您可以选择一个并放在 Flink 发行版的`/lib/` 目录中。
 
-| Metastore version | Maven dependency | SQL Client JAR



   |
-|:--|:-|:-|
-| 2.3.0 - 2.3.9 | `flink-sql-connector-hive-2.3.9` | {{< stable 
>}}[Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.9{{<
 scala_version >}}/{{< version >}}/flink-sql-connector-hive-2.3.9{{< 
scala_version >}}-{{< version >}}.jar) {{< /stable >}}{{< unstable >}} Only 
available for stable releases {{< /unstable >}} |
-| 3.0.0 - 3.1.3 | `flink-sql-connector-hive-3.1.3` | {{< stable 
>}}[Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3{{<
 scala_version >}}/{{< version >}}/flink-sql-connector-hive-3.1.3{{< 
scala_version >}}-{{< version >}}.jar) {{< /stable >}}{{< unstable >}} Only 
available for stable releases {{< /unstable >}} |
+| Metastore version | Maven dependency  | SQL Client JAR   



  |
+|:--|:--|:---|
+| 2.3.0 - 2.3.10| `flink-sql-connector-hive-2.3.10` | {{< stable 
>}}[Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.10{{<
 scala_version >}}/{{< version >}}/flink-sql-connector-hive-2.3.10{{< 
scala_version >}}-{{< version >}}.jar) {{< /stable >}}{{< unstable >}} Only 
available for stable releases {{< /unstable >}} |
+| 3.0.0 - 3.1.3 | `flink-sql-connector-hive-3.1.3`  | {{< stable 
>}}[Download](https://

(flink-kubernetes-operator) branch main updated: [FLINK-35448] Translate pod templates documentation into Chinese

2024-06-13 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new d920a235 [FLINK-35448] Translate pod templates documentation into 
Chinese
d920a235 is described below

commit d920a235a3357311b96e3c087d5918d83da27853
Author: caicancai <2356672...@qq.com>
AuthorDate: Fri May 24 21:17:39 2024 +0800

[FLINK-35448] Translate pod templates documentation into Chinese

Co-authored-by: Yuepeng Pan 
Co-authored-by: 1996fanrui <1996fan...@gmail.com>
---
 .../docs/custom-resource/pod-template.md   | 42 +++---
 docs/content/docs/custom-resource/pod-template.md  |  2 +-
 2 files changed, 22 insertions(+), 22 deletions(-)

diff --git a/docs/content.zh/docs/custom-resource/pod-template.md 
b/docs/content.zh/docs/custom-resource/pod-template.md
index bf7097e5..a10efc79 100644
--- a/docs/content.zh/docs/custom-resource/pod-template.md
+++ b/docs/content.zh/docs/custom-resource/pod-template.md
@@ -26,21 +26,18 @@ under the License.
 
 # Pod template
 
-The operator CRD is designed to have a minimal set of direct, short-hand CRD 
settings to express the most
-basic attributes of a deployment. For all other settings the CRD provides the 
`flinkConfiguration` and
-`podTemplate` fields.
+
 
-Pod templates permit customization of the Flink job and task manager pods, for 
example to specify
-volume mounts, ephemeral storage, sidecar containers etc.
+Operator CRD 被设计为一组直接、简短的 CRD 设置,以表达 deployment 的最基本属性。对于所有其他设置,CRD 提供了 
`flinkConfiguration` 和 `podTemplate` 字段。
 
-Pod templates can be layered, as shown in the example below.
-A common pod template may hold the settings that apply to both job and task 
manager,
-like `volumeMounts`. Another template under job or task manager can define 
additional settings that supplement or override those
-in the common template, such as a task manager sidecar.
+Pod templates 允许自定义 Flink Job 和 Task Manager 的 pod,例如指定卷挂载、临时存储、sidecar 容器等。
 
-The operator is going to merge the common and specific templates for job and 
task manager respectively.
+Pod template 可以被分层,如下面的示例所示。
+一个通用的 pod template 可以保存适用于作业和 task manager 的设置,比如 `volumeMounts`。作业或 task 
manager 下的另一个模板可以定义补充或覆盖通用模板中的其他设置,比如一个 task manager sidecar。
 
-Here the full example:
+Operator 将分别合并作业和 task manager 的通用和特定模板。
+
+下面是一个完整的示例:
 
 ```yaml
 apiVersion: flink.apache.org/v1beta1
@@ -93,18 +90,21 @@ spec:
 ```
 
 {{< hint info >}}
-When using the operator with Flink native Kubernetes integration, please refer 
to [pod template field precedence](
-https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#fields-overwritten-by-flink).
+当使用与 Flink 原生 Kubernetes 集成的 operator 时,请参考 [pod template 字段优先级](
+https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#fields-overwritten-by-flink)。
 {{< /hint >}}
 
+
 ## Array Merging Behaviour
 
-When layering pod templates (defining both a top level and jobmanager specific 
podtemplate for example) the corresponding yamls are merged together.
+
+
+当分层 pod templates(例如同时定义顶层和 jobmanager 特定的 pod 模板)时,相应的 yaml 会合并在一起。
 
-The default behaviour of the pod template mechanism is to merge array arrays 
by merging the objects in the respective array positions.
-This requires that containers in the podTemplates are defined in the same 
order otherwise the results may be undefined.
+Pod 模板机制的默认行为是通过合并相应数组位置的对象来合并数组。
+这要求 podTemplates 中的容器以相同的顺序定义,否则结果可能未定义。
 
-Default behaviour (merge by position):
+默认行为(按位置合并):
 
 ```
 arr1: [{name: a, p1: v1}, {name: b, p1: v1}]
@@ -113,10 +113,10 @@ arr1: [{name: a, p2: v2}, {name: c, p2: v2}]
 merged: [{name: a, p1: v1, p2: v2}, {name: c, p1: v1, p2: v2}]
 ```
 
-The operator supports an alternative array merging mechanism that can be 
enabled by the `kubernetes.operator.pod-template.merge-arrays-by-name` flag.
-When true, instead of the default positional merging, object array elements 
that have a `name` property defined will be merged by their name and the 
resulting array will be a union of the two input arrays.
+Operator 支持另一种数组合并机制,可以通过 
`kubernetes.operator.pod-template.merge-arrays-by-name` 标志启用。
+当为 true 时,不会进行默认的位置合并,而是根据名称合并定义了 `name` 属性的对象数组元素,并且生成的数组将是两个输入数组的并集。
 
-Merge by name:
+通过名称合并:
 
 ```
 arr1: [{name: a, p1: v1}, {name: b, p1: v1}]
@@ -125,4 +125,4 @@ arr1: [{name: a, p2: v2}, {name: c, p2: v2}]
 merged: [{name: a, p1: v1, p2: v2}, {name: b, p1: v1}, {name: c, p2: v2}]
 ```
 
-Merging by name can we be very convenient when merging container specs or when 
the base and override templates are not defined together.
+当合并容器规格或者当基础模板和覆盖模板没有一起定义时,按名称合并可以非常方便。
diff --git a/docs/content/docs/custom-resource/pod-template.md 
b/docs/content/docs/custom-resource/pod-template.md
index bf7097e5..3695864c 100644
--- a/docs/content/doc

(flink-connector-jdbc) 01/02: [FLINK-33461][Connector/JDBC] Support streaming related semantics for the new JDBC source

2024-06-13 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 190238a05bebe9a092e9cec84627127781d4d859
Author: Roc Marshal 
AuthorDate: Fri May 10 21:18:11 2024 +0800

[FLINK-33461][Connector/JDBC] Support streaming related semantics for the 
new JDBC source
---
 .../connections/SimpleJdbcConnectionProvider.java  |   1 +
 .../flink/connector/jdbc/source/JdbcSource.java|  40 +--
 .../connector/jdbc/source/JdbcSourceBuilder.java   |  87 -
 .../source/enumerator/JdbcSourceEnumerator.java|  88 -
 .../enumerator/JdbcSqlSplitEnumeratorBase.java |   6 +-
 .../enumerator/SqlTemplateSplitEnumerator.java |  36 +-
 .../jdbc/source/reader/JdbcSourceSplitReader.java  | 186 +++---
 .../jdbc/source/split/JdbcSourceSplit.java |  28 +-
 .../source/split/JdbcSourceSplitSerializer.java|   6 +-
 .../jdbc/source/split/JdbcSourceSplitState.java|   2 +-
 .../jdbc/split/JdbcParameterValuesProvider.java|   8 +
 .../split/JdbcSlideTimingParameterProvider.java|  94 ++
 .../jdbc/utils/ContinuousUnBoundingSettings.java   |  86 +
 .../jdbc/source/JdbcSourceBuilderTest.java |  43 ++-
 .../jdbc/source/JdbcSourceStreamRelatedITCase.java | 374 +
 .../JdbcSourceEnumStateSerializerTest.java |   5 +-
 .../enumerator/JdbcSourceEnumeratorTest.java   |   8 +-
 .../jdbc/source/reader/JdbcSourceReaderTest.java   |   2 +-
 .../source/reader/JdbcSourceSplitReaderTest.java   |   2 +-
 .../split/JdbcSourceSplitSerializerTest.java   |   3 +-
 20 files changed, 971 insertions(+), 134 deletions(-)

diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java
index 811210af..4c48f799 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java
@@ -79,6 +79,7 @@ public class SimpleJdbcConnectionProvider implements 
JdbcConnectionProvider, Ser
 @Override
 public boolean isConnectionValid() throws SQLException {
 return connection != null
+&& !connection.isClosed()
 && 
connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
 }
 
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
index fc145feb..08e4a777 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
@@ -40,9 +40,12 @@ import 
org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader;
 import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
 import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
 import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
+import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Objects;
@@ -55,6 +58,7 @@ public class JdbcSource
 
 private final Boundedness boundedness;
 private final TypeInformation typeInformation;
+private final @Nullable ContinuousUnBoundingSettings 
continuousUnBoundingSettings;
 
 private final Configuration configuration;
 private final JdbcSqlSplitEnumeratorBase.Provider 
sqlSplitEnumeratorProvider;
@@ -69,29 +73,20 @@ public class JdbcSource
 JdbcSqlSplitEnumeratorBase.Provider 
sqlSplitEnumeratorProvider,
 ResultExtractor resultExtractor,
 TypeInformation typeInformation,
-DeliveryGuarantee deliveryGuarantee) {
+@Nullable DeliveryGuarantee deliveryGuarantee,
+@Nullable ContinuousUnBoundingSettings 
continuousUnBoundingSettings) {
 this.configuration = Preconditions.checkNotNull(configuration);
 this.connectionProvider = 
Preconditions.checkNotNull(connectionProvider);
 this.sqlSplitEnumeratorProvider = 
Preconditions.checkNotNull(sqlSplitEnumeratorProvider);
 this.resultExtractor = Preconditions.checkNotNull(resultExtractor);
-this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee);
+this.deliveryGuarantee =
+Objects.isNull(deliveryGuarantee) ? DeliveryGuarantee.NONE : 
deliveryGuarante

(flink-connector-jdbc) 02/02: [FLINK-33462][Connector/JDBC] Sort out the document page about the new Jdbc source.

2024-06-13 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 1bab53304c65384b4fbe6a5fe71de71a344a78fe
Author: Roc Marshal 
AuthorDate: Tue May 28 23:53:27 2024 +0800

[FLINK-33462][Connector/JDBC] Sort out the document page about the new Jdbc 
source.
---
 docs/content.zh/docs/connectors/datastream/jdbc.md | 486 -
 docs/content/docs/connectors/datastream/jdbc.md| 200 -
 2 files changed, 660 insertions(+), 26 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/jdbc.md 
b/docs/content.zh/docs/connectors/datastream/jdbc.md
index a57cf0bb..fddaa9f6 100644
--- a/docs/content.zh/docs/connectors/datastream/jdbc.md
+++ b/docs/content.zh/docs/connectors/datastream/jdbc.md
@@ -26,38 +26,341 @@ under the License.
 
 # JDBC Connector
 
-该连接器可以向 JDBC 数据库写入数据。
+This connector provides a source that read data from a JDBC database and
+provides a sink that writes data to a JDBC database.
 
-添加下面的依赖以便使用该连接器(同时添加 JDBC 驱动):
+To use it, add the following dependency to your project (along with your JDBC 
driver):
 
 {{< connector_artifact flink-connector-jdbc jdbc >}}
 
-注意该连接器目前还 __不是__ 二进制发行版的一部分,如何在集群中运行请参考 [这里]({{< ref 
"docs/dev/configuration/overview" >}})。
+Note that the streaming connectors are currently __NOT__ part of the binary 
distribution.
+See how to link with them for cluster execution [here]({{< ref 
"docs/dev/configuration/overview" >}}).
+A driver dependency is also required to connect to a specified database.
+Please consult your database documentation on how to add the corresponding 
driver.
 
-已创建的 JDBC Sink 能够保证至少一次的语义。
-更有效的精确执行一次可以通过 upsert 语句或幂等更新实现。
+## JDBC Source
 
-用法示例:
-{{< tabs "4ab65f13-608a-411a-8d24-e303f384ab5d" >}}
+Configuration goes as follow (see also {{< javadoc 
file="org/apache/flink/connector/jdbc/source/JdbcSource.html" name="JdbcSource 
javadoc" >}}
+and {{< javadoc 
file="org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.html" 
name="JdbcSourceBuilder javadoc" >}}).
+
+### Usage
+
+{{< tabs "4ab65f13-607a-411a-8d24-e709f701cd41" >}}
 {{< tab "Java" >}}
 ```java
+JdbcSource source = JdbcSourceBuilder.builder()
+// Required
+.setSql(...)
+.setResultExtractor(...)
+.setUsername(...)
+.setPassword(...)
+.setDriverName(...)
+.setDBUrl(...)
+.setTypeInformation(...)
+
+ // Optional
+.setContinuousUnBoundingSettings(...)
+.setJdbcParameterValuesProvider(...)
+.setDeliveryGuarantee(...)
+.setConnectionCheckTimeoutSeconds(...)
+
+// The extended JDBC connection property passing
+.setConnectionProperty("key", "value")
+
+// other attributes
+.setSplitReaderFetchBatchSize(...)
+.setResultSetType(...)
+.setResultSetConcurrency(...)
+.setAutoCommit(...)
+.setResultSetFetchSize(...)
+.setConnectionProvider(...)
+.build();
+
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Still not supported in Python API.
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Delivery guarantee
+
+The JDBC source provides 
`at-least-once`/`at-most-once(default)`/`exactly-once` guarantee.
+The `JdbcSource` supports `Delivery guarantee` semantic based on `Concur` of 
`ResultSet`.
+
+**NOTE:** Here's a few disadvantage. It only makes sense for corresponding 
semantic
+that the `ResultSet` corresponding to this SQL(`JdbcSourceSplit`)
+remains unchanged in the whole lifecycle of `JdbcSourceSplit` processing.
+Unfortunately, this condition is not met in most databases and data scenarios.
+See 
[FLIP-239](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271)
 for more details.
+
+### ResultExtractor
+
+An `Extractor` to extract a record from `ResultSet` executed by a sql.
+
+```java
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+class Book {
+public Book(Long id, String title) {
+this.id = id;
+this.title = title;
+}
+
+final Long id;
+final String title;
+};
+
+ResultExtractor resultExtractor = new ResultExtractor() {
+@Override
+public Object extract(ResultSet resultSet) throws SQLException {
+return new Book(resultSet.getLong("id"), 
resultSet.getString("titile"));
+}
+};
+
+```
+
+### JdbcParameterValuesProvider
+
+A provider to provide parameters in sql to fulfill actual value in the 
corresponding placeholders, which is in the form of two-dimension array.
+See {{< javadoc 
file="org/apache/flink/connector/jdbc/split/JdbcParameterValuesProvider.html" 
name="JdbcParameterValuesProvider javadoc" >}} for more details.
+
+```java
+
+class TestEntry {
+...
+};
+
+ResultSetExtractor extractor = ...;
+
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.g

(flink-connector-jdbc) branch main updated (3b063c9c -> 1bab5330)

2024-06-13 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


from 3b063c9c [FLINK-35137][Connectors/JDBC] Update website data for 
3.3-SNAPSHOT
 new 190238a0 [FLINK-33461][Connector/JDBC] Support streaming related 
semantics for the new JDBC source
 new 1bab5330 [FLINK-33462][Connector/JDBC] Sort out the document page 
about the new Jdbc source.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content.zh/docs/connectors/datastream/jdbc.md | 486 -
 docs/content/docs/connectors/datastream/jdbc.md| 200 -
 .../connections/SimpleJdbcConnectionProvider.java  |   1 +
 .../flink/connector/jdbc/source/JdbcSource.java|  40 +-
 .../connector/jdbc/source/JdbcSourceBuilder.java   |  87 +++-
 .../source/enumerator/JdbcSourceEnumerator.java|  88 +++-
 .../enumerator/JdbcSqlSplitEnumeratorBase.java |   6 +-
 .../enumerator/SqlTemplateSplitEnumerator.java |  36 +-
 .../jdbc/source/reader/JdbcSourceSplitReader.java  | 186 +---
 .../jdbc/source/split/JdbcSourceSplit.java |  28 +-
 .../source/split/JdbcSourceSplitSerializer.java|   6 +-
 .../jdbc/source/split/JdbcSourceSplitState.java|   2 +-
 .../jdbc/split/JdbcParameterValuesProvider.java|   8 +
 .../split/JdbcSlideTimingParameterProvider.java|  94 
 .../jdbc/utils/ContinuousUnBoundingSettings.java   |  86 
 .../jdbc/source/JdbcSourceBuilderTest.java |  43 +-
 .../jdbc/source/JdbcSourceStreamRelatedITCase.java | 374 
 .../JdbcSourceEnumStateSerializerTest.java |   5 +-
 .../enumerator/JdbcSourceEnumeratorTest.java   |   8 +-
 .../jdbc/source/reader/JdbcSourceReaderTest.java   |   2 +-
 .../source/reader/JdbcSourceSplitReaderTest.java   |   2 +-
 .../split/JdbcSourceSplitSerializerTest.java   |   3 +-
 22 files changed, 1631 insertions(+), 160 deletions(-)
 create mode 100644 
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcSlideTimingParameterProvider.java
 create mode 100644 
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/ContinuousUnBoundingSettings.java
 create mode 100644 
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceStreamRelatedITCase.java



(flink) branch master updated: [FLINK-35157][runtime] Sources with watermark alignment get stuck once some subtasks finish (#24757)

2024-06-13 Thread fanrui
This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 93b9bdf798a [FLINK-35157][runtime] Sources with watermark alignment 
get stuck once some subtasks finish (#24757)
93b9bdf798a is described below

commit 93b9bdf798aec933122bdee6e9b70860eadd0864
Author: elon-X <34712973+elo...@users.noreply.github.com>
AuthorDate: Fri Jun 14 09:26:56 2024 +0800

[FLINK-35157][runtime] Sources with watermark alignment get stuck once some 
subtasks finish (#24757)
---
 .../source/coordinator/SourceCoordinator.java  | 14 +++--
 .../streaming/api/operators/SourceOperator.java|  8 ++-
 .../api/operators/SourceOperatorAlignmentTest.java | 31 ++
 .../api/datastream/WatermarkAlignmentITCase.java   | 72 ++
 4 files changed, 118 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index e64eb0424ca..3133bbe7ce7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -198,12 +198,16 @@ public class SourceCoordinator
 subTaskIds,
 operatorName);
 
-// Subtask maybe during deploying or restarting, so we only send 
WatermarkAlignmentEvent
-// to ready task to avoid period task fail (Java-ThreadPoolExecutor 
will not schedule
-// the period task if it throws an exception).
 for (Integer subtaskId : subTaskIds) {
-context.sendEventToSourceOperatorIfTaskReady(
-subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
+// when subtask have been finished, do not send event.
+if (!context.hasNoMoreSplits(subtaskId)) {
+// Subtask maybe during deploying or restarting, so we only 
send
+// WatermarkAlignmentEvent to ready task to avoid period task 
fail
+// (Java-ThreadPoolExecutor will not schedule the period task 
if it throws an
+// exception).
+context.sendEventToSourceOperatorIfTaskReady(
+subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
+}
 }
 }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index d49de8c622c..972415c1ff0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -434,7 +434,7 @@ public class SourceOperator extends AbstractStr
 // introduces a small performance regression (probably 
because of an extra
 // virtual call)
 processingTimeService.scheduleWithFixedDelay(
-this::emitLatestWatermark,
+time -> emitLatestWatermark(),
 watermarkAlignmentParams.getUpdateInterval(),
 watermarkAlignmentParams.getUpdateInterval());
 }
@@ -449,6 +449,10 @@ public class SourceOperator extends AbstractStr
 sourceMetricGroup.idlingStarted();
 return DataInputStatus.END_OF_DATA;
 case DATA_FINISHED:
+if (watermarkAlignmentParams.isEnabled()) {
+latestWatermark = Watermark.MAX_WATERMARK.getTimestamp();
+emitLatestWatermark();
+}
 sourceMetricGroup.idlingStarted();
 return DataInputStatus.END_OF_INPUT;
 case WAITING_FOR_ALIGNMENT:
@@ -507,7 +511,7 @@ public class SourceOperator extends AbstractStr
 }
 }
 
-private void emitLatestWatermark(long time) {
+private void emitLatestWatermark() {
 checkState(currentMainOutput != null);
 if (latestWatermark == Watermark.UNINITIALIZED.getTimestamp()) {
 return;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
index db3f8f0ed1f..30b8fdd5063 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
@@ -27,

(flink-cdc) annotated tag release-3.1.1-rc0 updated (b163b8e15 -> 8df995069)

2024-06-13 Thread renqs
This is an automated email from the ASF dual-hosted git repository.

renqs pushed a change to annotated tag release-3.1.1-rc0
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


*** WARNING: tag release-3.1.1-rc0 was modified! ***

from b163b8e15 (commit)
  to 8df995069 (tag)
 tagging b163b8e1589184bd7716cf6d002f3472766eb5db (commit)
  by Qingsheng Ren
  on Thu Jun 13 23:29:43 2024 +0800

- Log -
release-3.1.1-rc0
-BEGIN PGP SIGNATURE-

iQIzBAABCAAdFiEEob1Hf3nQNtLDDKfbyoruwvbrBAsFAmZrEGcACgkQyoruwvbr
BAukxA/+LOk/veVgrl9wyYYuYiqx5DwrWLjzxMTMAcgwt9bBo2Lr4G5vVov6tQA/
d+1zkcmHnlyHfayV3ac5Tn2+rJgQyprqj5L7V9/E1en+pW07g/ZF5d8wYvqFCe8x
XYGooyLzeGRt4Q+DgXEZ3HxFBcyMarA4/FCCiAgQjhQ8ph2EvUBJC1DQlqz/yPaY
5s03BnMlJ+BK/yNPOApBSv3X56ysDNsISabqO3qSq1z/1VyaJEW8ALf687rVj5v4
o8hedD4BDaeJSyPzJrhag//hPU7VIC5wXg8IQp8mP/bEjbQJ+JdkHrrs8HY6QEQt
ksB+HsL+hWLBZPOzutnfcYjfVqPe/Y4pAwUxRIdlSXo7vnlITq2LOC/gD9zaypzz
SHlc5NQnHVydDOt2U+4b/SsBfwImEWekJWIpSh6Cq1YrmyeQVcMV0+7xf/Y9rdby
4+BAAzZfG9q6g27mJmjOrb7Vjq8qSy1PRk6jgv0IsaiNt6NpUwBRTAxwNlDxgHqW
T/BCKmHTpVD6TYvz/yLKUydD9uJR+CoruWoYBXbvDJuvQw0udB+YOl34zpvkpYhF
KUfnA7DhjnODR3u3EmIke5xfNbCl47L0jYtkdwD6LG1NeYFAoIT0MPypguVo1FPW
NpVNObpThZR45G+yERSU/lfJfgFl1xsRs0Ktnype9OeybGM/nAQ=
=cQ+s
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



svn commit: r69697 - in /dev/flink/flink-cdc-3.1.1-rc0: ./ flink-cdc-3.1.1-bin.tar.gz flink-cdc-3.1.1-bin.tar.gz.asc flink-cdc-3.1.1-bin.tar.gz.sha512 flink-cdc-3.1.1-src.tgz flink-cdc-3.1.1-src.tgz.a

2024-06-13 Thread renqs
Author: renqs
Date: Thu Jun 13 15:26:50 2024
New Revision: 69697

Log:
Apache Flink CDC, version 3.1.1, release candidate 0

Added:
dev/flink/flink-cdc-3.1.1-rc0/
dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz   (with props)
dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz.asc
dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz.sha512
dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz   (with props)
dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz.asc
dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz.sha512

Added: dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz
==
Binary file - no diff available.

Propchange: dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz
--
svn:mime-type = application/octet-stream

Added: dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz.asc
==
--- dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz.asc (added)
+++ dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz.asc Thu Jun 13 
15:26:50 2024
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCAAdFiEEob1Hf3nQNtLDDKfbyoruwvbrBAsFAmZrDwgACgkQyoruwvbr
+BAsIZxAAmS2cYYWKsaRbQg0AJ41ZODUrCL/8pedFx9moKdERxQbBnrYKvkiI9Num
+IPUdIV1u6njIdSNRZQTpI9Jt7rpM43aocJfwfXKF/rR750xvAB049y3U/1LcUxQG
+/+HykacJCSx/jIDogLGXdVIA5T5OOg9omGea4DPnUnuXFAAxbSX92JK39b24F7DQ
+j6haNqC/YPImanHThRfH41kQe/UBkK6vE54JLGoZfxYthHe/eLSbS26R/wN5cvxj
+CYqOG2kSqpuJ0zBnivXc/ttnd7zEcgw0mWXf3a6O9Rr/EVJjfp/16MdFpQCu9u3d
+9EdJk9OFYi+QWinh4AHY2VDc6k+71jqqv4kY9yruqMg/G/Z3Ybo66KmoPGXoLSmZ
+HuqzrvBmOcsD7L/UvHrO2frDfBdRORyv4M8WpdSWOLSSkKERICwXjZTw9h9ad1Ov
+V6nxN0IINRMhO5o5Bqddo8LRMSVMZnxKdvH5p/NsoxKNxT2HBKIFwpAZewkpkPwO
+1SWMMj8mI6dAdp8pghOB0CSWJM6h9jT3WmUZNwm3lp2EjSgfFv0PA2qj4lbxDQrM
+5YIwjed9q4O52drjM3B7g9ykLr3b48KYx1kiwsaJbvs8FfUMBRnmzch60pfb3ORf
+cYHTDvzvJIzYH7X1op8b5XOC46F/C3scEQkAzJw0jInvmMt2W1w=
+=KxW1
+-END PGP SIGNATURE-

Added: dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz.sha512
==
--- dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz.sha512 (added)
+++ dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz.sha512 Thu Jun 13 
15:26:50 2024
@@ -0,0 +1 @@
+580ac2e4ffb88cc98d3f747083c5340335b3aa79193a7fad5a782d43a49f11dc73871dc6fb92cdc93d41e499a84a28ee997ae17a4cf625d349a6735e49e98750
  flink-cdc-3.1.1-bin.tar.gz

Added: dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz
==
Binary file - no diff available.

Propchange: dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz
--
svn:mime-type = application/octet-stream

Added: dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz.asc
==
--- dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz.asc (added)
+++ dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz.asc Thu Jun 13 
15:26:50 2024
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCAAdFiEEob1Hf3nQNtLDDKfbyoruwvbrBAsFAmZrAqQACgkQyoruwvbr
+BAt22g/6Anb+Ze2Yycar6+FVCk/DBg4IJ34bN5aJTMLQHk+n/sDmeHUHHRBkhOxf
++ffeUO7tBpKC75XCheSVfCncgWCjCg4aIFSK168zYOjE/W56oZCLAUOK6ZURqVNJ
+QMoGK8jPlYTECfnG6rHbucM5e2IhCQIRQd3jqgoe1OXuPuyiVl9gmJ0fr393P+qB
+OiG38i/yzvtIusW4ElYWhDZ+OoW8LytOR4CDX6l5LMvRxg5XlGImnIYw8Bohobfm
++TC8Q/SMLpARtiguewEn/zp93zi5mps92R2LLm3zoXTZPpxPqKGyipV6W3cp3NzF
+fCMFq2qKHnL6u0zo0VQCfOm8/WZ+LmdMwIHE9ZyrBlIH2oAiwYDFNaJKqsbgfYZ7
+/o78lSzYrdwQ0txIVnFPEQWvKvDlRUkNJx9aRftGJR+4IJj7TO3wOnoXT8FDphiU
+O57owEzm6V0T4tIR+SlEHOjVrjDTMkznmLJgXiUFWJAsQeNHh2b4TqtO1veVS0dB
+3H6yjKaOsjWBuxOCCBZ0pAnIiSR9OUNwZlQ3sm2tEr9VBJzhtolSrFAJ6GeEhfNR
+aKqy0UUeoPqax0q6eM7vi69HBd4Ko64C4XzDxE4QRtVd/BG/K1Ka4Q1/jDgzk6x2
+2EGW2N/bAfX3Gna2SXOlpkWFrgNYRID/F3w55pqFo+Z/nX7QjtM=
+=Mk3F
+-END PGP SIGNATURE-

Added: dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz.sha512
==
--- dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz.sha512 (added)
+++ dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz.sha512 Thu Jun 13 
15:26:50 2024
@@ -0,0 +1 @@
+fbd3a3c7e4769e59882a7823770f7408744a0ecf2aa0cf40d5149697375c99c4734fed54ba6902f86dc3531f910a7c1e4f975d2b3425d416f3c1e21c36cd75ad
  flink-cdc-3.1.1-src.tgz




(flink-cdc) branch release-3.1 updated: [FLINK-35592][cdc] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp (#3380)

2024-06-13 Thread renqs
This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch release-3.1
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/release-3.1 by this push:
 new f476a5b1d [FLINK-35592][cdc] Fix MysqlDebeziumTimeConverter miss 
timezone convert to timestamp (#3380)
f476a5b1d is described below

commit f476a5b1dd99fc63962bc75048c0b12957e53da8
Author: ConradJam 
AuthorDate: Thu Jun 13 22:22:09 2024 +0800

[FLINK-35592][cdc] Fix MysqlDebeziumTimeConverter miss timezone convert to 
timestamp (#3380)
---
 .../converters/MysqlDebeziumTimeConverter.java | 112 -
 .../MysqlDebeziumTimeConverterITCase.java  |   8 +-
 .../src/test/resources/ddl/date_convert_test.sql   |   3 +-
 3 files changed, 73 insertions(+), 50 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java
index 493fd682c..1eb98947d 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java
@@ -113,54 +113,73 @@ public class MysqlDebeziumTimeConverter
 registration.register(
 SchemaBuilder.string().name(schemaName).optional(),
 value -> {
-log.debug(
-"find schema need to change dateType, field 
name:{} ,field type:{} ,field value:{} ,field "
-+ "default:{}",
-field.name(),
-columnType,
-value == null ? "null" : value,
-field.hasDefaultValue() ? field.defaultValue() : 
"null");
-if (value == null) {
-return convertDateDefaultValue(field);
-}
-switch (columnType.toUpperCase(Locale.ROOT)) {
-case "DATE":
-if (value instanceof Integer) {
-return this.convertToDate(
-columnType, 
LocalDate.ofEpochDay((Integer) value));
-}
-return this.convertToDate(columnType, value);
-case "TIME":
-if (value instanceof Long) {
-long l =
-Math.multiplyExact(
-(Long) value, 
TimeUnit.MICROSECONDS.toNanos(1));
-return this.convertToTime(columnType, 
LocalTime.ofNanoOfDay(l));
-}
-return this.convertToTime(columnType, value);
-case "DATETIME":
-if (value instanceof Long) {
-if (getTimePrecision(field) <= 3) {
-return this.convertToTimestamp(
-columnType,
-
Conversions.toInstantFromMillis((Long) value));
-}
-if (getTimePrecision(field) <= 6) {
-return this.convertToTimestamp(
-columnType,
-
Conversions.toInstantFromMicros((Long) value));
-}
-}
-return this.convertToTimestamp(columnType, value);
-case "TIMESTAMP":
-return 
this.convertToTimestampWithTimezone(columnType, value);
-default:
-throw new IllegalArgumentException(
-"Unknown field type  " + 
columnType.toUpperCase(Locale.ROOT));
+try {
+return convertDateObject(field, value, columnType);
+} catch (Exception e) {
+printConvertDateErrorClassLogs(field, registration, 
value);
+throw new RuntimeException("MysqlDebeziumConverter 
error", e);
 }
 });
 }
 
+private void printConvertDateErrorClassLogs(
+RelationalColumn field,
+ConverterRegistration registration,
+Object value) {
+bool

(flink) branch master updated (9d169038784 -> d0f9bb40a61)

2024-06-13 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 9d169038784 [FLINK-35164][table] Support `ALTER CATALOG RESET` syntax
 add d0f9bb40a61 [FLINK-34172] Add support for altering a distribution via 
ALTER TABLE

No new revisions were added by this update.

Summary of changes:
 .../src/main/codegen/data/Parser.tdd   |   1 +
 .../src/main/codegen/includes/parserImpls.ftl  |  21 +--
 .../flink/sql/parser/ddl/SqlAlterTableAdd.java |   9 +-
 ...ark.java => SqlAlterTableDropDistribution.java} |  14 +-
 .../flink/sql/parser/ddl/SqlAlterTableModify.java  |   9 +-
 .../flink/sql/parser/ddl/SqlAlterTableSchema.java  |  21 +++
 .../flink/sql/parser/ddl/SqlCreateTable.java   |   1 +
 .../flink/sql/parser/ddl/SqlDistribution.java  |  21 ++-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  57 +++-
 .../operations/ddl/AlterTableChangeOperation.java  |   9 ++
 .../apache/flink/table/catalog/TableChange.java| 158 +
 .../planner/operations/AlterSchemaConverter.java   |  99 +++--
 .../operations/SqlCreateTableConverter.java|  29 +---
 .../operations/SqlNodeToOperationConversion.java   |  31 
 .../planner/utils/OperationConverterUtils.java |  31 
 .../operations/SqlDdlToOperationConverterTest.java | 143 +--
 16 files changed, 578 insertions(+), 76 deletions(-)
 copy 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/{SqlAlterTableDropWatermark.java
 => SqlAlterTableDropDistribution.java} (80%)



(flink) branch master updated: [FLINK-35164][table] Support `ALTER CATALOG RESET` syntax

2024-06-13 Thread jchan
This is an automated email from the ASF dual-hosted git repository.

jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 9d169038784 [FLINK-35164][table] Support `ALTER CATALOG RESET` syntax
9d169038784 is described below

commit 9d1690387849303b27050bb0cefaa1bad6e3fb98
Author: Yubin Li 
AuthorDate: Thu Jun 13 19:19:47 2024 +0800

[FLINK-35164][table] Support `ALTER CATALOG RESET` syntax

This closes #24763
---
 docs/content.zh/docs/dev/table/sql/alter.md|  20 +-
 docs/content/docs/dev/table/sql/alter.md   |  20 +-
 .../src/test/resources/sql/catalog_database.q  | 187 ++
 .../src/test/resources/sql/catalog_database.q  | 218 -
 .../src/main/codegen/data/Parser.tdd   |   1 +
 .../src/main/codegen/includes/parserImpls.ftl  |  24 ++-
 .../flink/sql/parser/ddl/SqlAlterCatalogReset.java |  76 +++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |   3 +-
 .../apache/flink/table/catalog/CatalogManager.java |  12 +-
 .../ddl/AlterCatalogOptionsOperation.java  |   4 +-
 ...ration.java => AlterCatalogResetOperation.java} |  30 ++-
 .../converters/SqlAlterCatalogResetConverter.java  |  48 +
 .../operations/converters/SqlNodeConverters.java   |   1 +
 .../operations/SqlDdlToOperationConverterTest.java |  21 ++
 14 files changed, 448 insertions(+), 217 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/sql/alter.md 
b/docs/content.zh/docs/dev/table/sql/alter.md
index 808ee893023..39de6985c9d 100644
--- a/docs/content.zh/docs/dev/table/sql/alter.md
+++ b/docs/content.zh/docs/dev/table/sql/alter.md
@@ -28,7 +28,7 @@ under the License.
 
 
 
-ALTER 语句用于修改一个已经在 [Catalog]({{< ref "docs/dev/table/catalogs" >}}) 
中注册的表、视图或函数定义。
+ALTER 语句用于修改一个已经在 [Catalog]({{< ref "docs/dev/table/catalogs" >}}) 
中注册的表、视图或函数定义,或 catalog 本身的定义。
 
 Flink SQL 目前支持以下 ALTER 语句:
 
@@ -36,6 +36,7 @@ Flink SQL 目前支持以下 ALTER 语句:
 - ALTER VIEW
 - ALTER DATABASE
 - ALTER FUNCTION
+- ALTER CATALOG
 
 ## 执行 ALTER 语句
 
@@ -538,10 +539,14 @@ ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
 
 Language tag 用于指定 Flink runtime 如何执行这个函数。目前,只支持 JAVA,SCALA 和 PYTHON,且函数的默认语言为 
JAVA。
 
+{{< top >}}
+
 ## ALTER CATALOG
 
 ```sql
-ALTER CATALOG catalog_name SET (key1=val1, ...)
+ALTER CATALOG catalog_name 
+SET (key1=val1, ...)
+  | RESET (key1, ...)
 ```
 
 ### SET
@@ -555,4 +560,15 @@ ALTER CATALOG catalog_name SET (key1=val1, ...)
 ALTER CATALOG cat2 SET ('default-database'='db');
 ```
 
+### RESET
+
+为指定的 catalog 重置一个或多个属性。
+
+`RESET` 语句示例如下。
+
+```sql
+-- reset 'default-database'
+ALTER CATALOG cat2 RESET ('default-database');
+```
+
 {{< top >}}
diff --git a/docs/content/docs/dev/table/sql/alter.md 
b/docs/content/docs/dev/table/sql/alter.md
index 54fdf8f15b0..e842d137ce0 100644
--- a/docs/content/docs/dev/table/sql/alter.md
+++ b/docs/content/docs/dev/table/sql/alter.md
@@ -28,7 +28,7 @@ under the License.
 
 
 
-ALTER statements are used to modified a registered table/view/function 
definition in the [Catalog]({{< ref "docs/dev/table/catalogs" >}}).
+ALTER statements are used to modify the definition of a table, view or 
function that has already been registered in the [Catalog]({{< ref 
"docs/dev/table/catalogs" >}}), or the definition of a catalog itself.
 
 Flink SQL supports the following ALTER statements for now:
 
@@ -36,6 +36,7 @@ Flink SQL supports the following ALTER statements for now:
 - ALTER VIEW
 - ALTER DATABASE
 - ALTER FUNCTION
+- ALTER CATALOG
 
 ## Run an ALTER statement
 
@@ -540,10 +541,14 @@ If the function doesn't exist, nothing happens.
 
 Language tag to instruct flink runtime how to execute the function. Currently 
only JAVA, SCALA and PYTHON are supported, the default language for a function 
is JAVA.
 
+{{< top >}}
+
 ## ALTER CATALOG
 
 ```sql
-ALTER CATALOG catalog_name SET (key1=val1, ...)
+ALTER CATALOG catalog_name 
+SET (key1=val1, ...)
+  | RESET (key1, ...)
 ```
 
 ### SET
@@ -557,4 +562,15 @@ The following examples illustrate the usage of the `SET` 
statements.
 ALTER CATALOG cat2 SET ('default-database'='db');
 ```
 
+### RESET
+
+Reset one or more properties to its default value in the specified catalog.
+
+The following examples illustrate the usage of the `RESET` statements.
+
+```sql
+-- reset 'default-database'
+ALTER CATALOG cat2 RESET ('default-database');
+```
+
 {{< top >}}
diff --git 
a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q 
b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
index c9a5b02116a..b99af7344f9 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
@@ -92,6 +92,110 @@ drop catalog c1;
 org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a 
catalog which is currently in use.
 !error
 
+c

(flink) branch master updated (14fdd13b632 -> f0b01277dd2)

2024-06-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 14fdd13b632 [FLINK-25537] [JUnit5 Migration] Module: flink-core 
with,Package: core (#24881)
 add f0b01277dd2 [FLINK-35378][FLIP-453] Promote Unified Sink API V2 to 
Public and Deprecate SinkFunction. This closes #24805

No new revisions were added by this update.

Summary of changes:
 .../18509c9e-3250-4c52-91b9-11ccefc85db1 |  9 +
 .../e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5 | 16 
 .../org/apache/flink/api/connector/sink2/Committer.java  |  6 +++---
 .../flink/api/connector/sink2/CommitterInitContext.java  |  4 ++--
 .../flink/api/connector/sink2/CommittingSinkWriter.java  |  4 ++--
 .../java/org/apache/flink/api/connector/sink2/Sink.java  |  3 ++-
 .../org/apache/flink/api/connector/sink2/SinkWriter.java |  6 +++---
 .../flink/api/connector/sink2/StatefulSinkWriter.java|  4 ++--
 .../flink/api/connector/sink2/SupportsCommitter.java |  4 ++--
 .../flink/api/connector/sink2/SupportsWriterState.java   |  3 ++-
 .../flink/api/connector/sink2/WriterInitContext.java |  6 ++
 .../streaming/api/functions/sink/DiscardingSink.java |  3 +++
 .../streaming/api/functions/sink/PrintSinkFunction.java  |  3 +++
 .../streaming/api/functions/sink/RichSinkFunction.java   |  8 +++-
 .../flink/streaming/api/functions/sink/SinkFunction.java |  3 +++
 .../streaming/api/functions/sink/SocketClientSink.java   |  3 +++
 .../api/functions/sink/TwoPhaseCommitSinkFunction.java   |  3 +++
 pom.xml  |  5 +
 18 files changed, 64 insertions(+), 29 deletions(-)