(flink-cdc) branch master updated: [FLINK-35121][common] Adds validation for pipeline definition options
This is an automated email from the ASF dual-hosted git repository. jiabaosun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git The following commit(s) were added to refs/heads/master by this push: new 2bd2e4ce2 [FLINK-35121][common] Adds validation for pipeline definition options 2bd2e4ce2 is described below commit 2bd2e4ce24ec0cc6a11129e3e3b32af6a09dd977 Author: yux <34335406+yuxiq...@users.noreply.github.com> AuthorDate: Fri Jun 14 12:17:38 2024 +0800 [FLINK-35121][common] Adds validation for pipeline definition options --- .../docs/connectors/pipeline-connectors/mysql.md | 15 ++ .../docs/connectors/pipeline-connectors/mysql.md | 15 ++ .../org/apache/flink/cdc/cli/CliFrontendTest.java | 3 +- .../parser/YamlPipelineDefinitionParserTest.java | 8 +- .../definitions/pipeline-definition-full.yaml | 2 +- .../definitions/pipeline-definition-minimized.yaml | 3 + .../resources/global-config/global-config.yaml | 2 +- .../cdc/common/configuration/Configuration.java| 17 ++ .../flink/cdc/common/factories/FactoryHelper.java | 121 ++ .../cdc/common/factories/FactoryHelperTests.java | 174 + .../flink/cdc/composer/definition/PipelineDef.java | 45 ++ .../definition/PipelineValidationTest.java | 85 ++ .../doris/factory/DorisDataSinkFactory.java| 9 +- .../kafka/sink/KafkaDataSinkFactory.java | 15 +- .../kafka/sink/KafkaDataSinkFactoryTest.java | 54 ++- .../mysql/factory/MySqlDataSourceFactory.java | 26 +-- .../mysql/source/MySqlDataSourceFactoryTest.java | 74 + .../paimon/sink/PaimonDataSinkFactory.java | 17 +- .../paimon/sink/PaimonDataSinkOptions.java | 2 +- .../paimon/sink/PaimonDataSinkFactoryTest.java | 112 - .../starrocks/sink/StarRocksDataSinkFactory.java | 9 +- .../sink/StarRocksDataSinkFactoryTest.java | 108 - .../values/factory/ValuesDataFactory.java | 5 + 23 files changed, 876 insertions(+), 45 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md index b1f4a945e..9f9465ed7 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md @@ -275,6 +275,21 @@ pipeline: - `specific-offset`:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。 - `timestamp`:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。 +例如,可以在 YAML 配置文件中这样指定启动模式: + +```yaml +source: + type: mysql + scan.startup.mode: earliest-offset# Start from earliest offset + scan.startup.mode: latest-offset # Start from latest offset + scan.startup.mode: specific-offset# Start from specific offset + scan.startup.mode: timestamp # Start from timestamp + scan.startup.specific-offset.file: 'mysql-bin.03' # Binlog filename under specific offset startup mode + scan.startup.specific-offset.pos: 4 # Binlog position under specific offset mode + scan.startup.specific-offset.gtid-set: 24DA167-...# GTID set under specific offset startup mode + scan.startup.timestamp-millis: 166723200 # Timestamp under timestamp startup mode + # ... +``` ## 数据类型映射 diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index b450c4017..30feb47f7 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -284,6 +284,21 @@ The config option `scan.startup.mode` specifies the startup mode for MySQL CDC c specified with binlog filename and position, or a GTID set if GTID is enabled on server. - `timestamp`: Skip snapshot phase and start reading binlog events from a specific timestamp. +For example in YAML definition: + +```yaml +source: + type: mysql + scan.startup.mode: earliest-offset# Start from earliest offset + scan.startup.mode: latest-offset # Start from latest offset + scan.startup.mode: specific-offset# Start from specific offset + scan.startup.mode: timestamp # Start from timestamp + scan.startup.specific-offset.file: 'mysql-bin.03' # Binlog filename under specific offset startup mode + scan.startup.specific-offset.pos: 4 # Binlog position under specific offset mode + scan.startup.specific-offset.gtid-set: 24DA167-...# GTID set under specific offset startup mode + scan.startup.timestamp-millis: 166723200 # Timestamp under timestamp startup mode + # ... +``` ## Data Type Mapping diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.jav
(flink) branch master updated: [FLINK-35543][HIVE] Upgrade Hive 2.3 connector to version 2.3.10 (#24905)
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 6c05981948e [FLINK-35543][HIVE] Upgrade Hive 2.3 connector to version 2.3.10 (#24905) 6c05981948e is described below commit 6c05981948ea72f79fef94282d6e8c648951092b Author: Cheng Pan AuthorDate: Fri Jun 14 11:40:52 2024 +0800 [FLINK-35543][HIVE] Upgrade Hive 2.3 connector to version 2.3.10 (#24905) --- .../docs/connectors/table/hive/overview.md | 9 ++--- .../content/docs/connectors/table/hive/overview.md | 9 ++--- .../table/catalog/hive/client/HiveShimLoader.java | 4 +++ .../table/catalog/hive/client/HiveShimV2310.java | 22 + .../table/endpoint/hive/HiveServer2Endpoint.java | 5 --- .../connectors/hive/HiveRunnerShimLoader.java | 1 + .../pom.xml| 10 ++ .../src/main/resources/META-INF/NOTICE | 38 +- .../main/resources/META-INF/licenses/LICENSE.antlr | 0 .../resources/META-INF/licenses/LICENSE.javolution | 0 .../main/resources/META-INF/licenses/LICENSE.kryo | 0 .../resources/META-INF/licenses/LICENSE.minlog | 0 .../resources/META-INF/licenses/LICENSE.protobuf | 0 .../resources/META-INF/licenses/LICENSE.reflectasm | 0 .../resources/META-INF/licenses/LICENSE.slf4j-api | 0 .../main/resources/META-INF/licenses/LICENSE.jodd | 25 -- flink-connectors/pom.xml | 2 +- pom.xml| 2 +- ...modules-defining-excess-dependencies.modulelist | 2 +- 19 files changed, 58 insertions(+), 71 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/hive/overview.md b/docs/content.zh/docs/connectors/table/hive/overview.md index ea2a72056ed..49dcdd96292 100644 --- a/docs/content.zh/docs/connectors/table/hive/overview.md +++ b/docs/content.zh/docs/connectors/table/hive/overview.md @@ -54,6 +54,7 @@ Flink 支持以下的 Hive 版本。 - 2.3.7 - 2.3.8 - 2.3.9 +- 2.3.10 - 3.1 - 3.1.0 - 3.1.1 @@ -87,10 +88,10 @@ export HADOOP_CLASSPATH=`hadoop classpath` 下表列出了所有可用的 Hive jar。您可以选择一个并放在 Flink 发行版的`/lib/` 目录中。 -| Metastore version | Maven dependency | SQL Client JAR | -|:--|:-|:-| -| 2.3.0 - 2.3.9 | `flink-sql-connector-hive-2.3.9` | {{< stable >}}[Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.9{{< scala_version >}}/{{< version >}}/flink-sql-connector-hive-2.3.9{{< scala_version >}}-{{< version >}}.jar) {{< /stable >}}{{< unstable >}} Only available for stable releases {{< /unstable >}} | -| 3.0.0 - 3.1.3 | `flink-sql-connector-hive-3.1.3` | {{< stable >}}[Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3{{< scala_version >}}/{{< version >}}/flink-sql-connector-hive-3.1.3{{< scala_version >}}-{{< version >}}.jar) {{< /stable >}}{{< unstable >}} Only available for stable releases {{< /unstable >}} | +| Metastore version | Maven dependency | SQL Client JAR | +|:--|:--|:---| +| 2.3.0 - 2.3.10| `flink-sql-connector-hive-2.3.10` | {{< stable >}}[Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.10{{< scala_version >}}/{{< version >}}/flink-sql-connector-hive-2.3.10{{< scala_version >}}-{{< version >}}.jar) {{< /stable >}}{{< unstable >}} Only available for stable releases {{< /unstable >}} | +| 3.0.0 - 3.1.3 | `flink-sql-connector-hive-3.1.3` | {{< stable >}}[Download](https://
(flink-kubernetes-operator) branch main updated: [FLINK-35448] Translate pod templates documentation into Chinese
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new d920a235 [FLINK-35448] Translate pod templates documentation into Chinese d920a235 is described below commit d920a235a3357311b96e3c087d5918d83da27853 Author: caicancai <2356672...@qq.com> AuthorDate: Fri May 24 21:17:39 2024 +0800 [FLINK-35448] Translate pod templates documentation into Chinese Co-authored-by: Yuepeng Pan Co-authored-by: 1996fanrui <1996fan...@gmail.com> --- .../docs/custom-resource/pod-template.md | 42 +++--- docs/content/docs/custom-resource/pod-template.md | 2 +- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/docs/content.zh/docs/custom-resource/pod-template.md b/docs/content.zh/docs/custom-resource/pod-template.md index bf7097e5..a10efc79 100644 --- a/docs/content.zh/docs/custom-resource/pod-template.md +++ b/docs/content.zh/docs/custom-resource/pod-template.md @@ -26,21 +26,18 @@ under the License. # Pod template -The operator CRD is designed to have a minimal set of direct, short-hand CRD settings to express the most -basic attributes of a deployment. For all other settings the CRD provides the `flinkConfiguration` and -`podTemplate` fields. + -Pod templates permit customization of the Flink job and task manager pods, for example to specify -volume mounts, ephemeral storage, sidecar containers etc. +Operator CRD 被设计为一组直接、简短的 CRD 设置,以表达 deployment 的最基本属性。对于所有其他设置,CRD 提供了 `flinkConfiguration` 和 `podTemplate` 字段。 -Pod templates can be layered, as shown in the example below. -A common pod template may hold the settings that apply to both job and task manager, -like `volumeMounts`. Another template under job or task manager can define additional settings that supplement or override those -in the common template, such as a task manager sidecar. +Pod templates 允许自定义 Flink Job 和 Task Manager 的 pod,例如指定卷挂载、临时存储、sidecar 容器等。 -The operator is going to merge the common and specific templates for job and task manager respectively. +Pod template 可以被分层,如下面的示例所示。 +一个通用的 pod template 可以保存适用于作业和 task manager 的设置,比如 `volumeMounts`。作业或 task manager 下的另一个模板可以定义补充或覆盖通用模板中的其他设置,比如一个 task manager sidecar。 -Here the full example: +Operator 将分别合并作业和 task manager 的通用和特定模板。 + +下面是一个完整的示例: ```yaml apiVersion: flink.apache.org/v1beta1 @@ -93,18 +90,21 @@ spec: ``` {{< hint info >}} -When using the operator with Flink native Kubernetes integration, please refer to [pod template field precedence]( -https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#fields-overwritten-by-flink). +当使用与 Flink 原生 Kubernetes 集成的 operator 时,请参考 [pod template 字段优先级]( +https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#fields-overwritten-by-flink)。 {{< /hint >}} + ## Array Merging Behaviour -When layering pod templates (defining both a top level and jobmanager specific podtemplate for example) the corresponding yamls are merged together. + + +当分层 pod templates(例如同时定义顶层和 jobmanager 特定的 pod 模板)时,相应的 yaml 会合并在一起。 -The default behaviour of the pod template mechanism is to merge array arrays by merging the objects in the respective array positions. -This requires that containers in the podTemplates are defined in the same order otherwise the results may be undefined. +Pod 模板机制的默认行为是通过合并相应数组位置的对象来合并数组。 +这要求 podTemplates 中的容器以相同的顺序定义,否则结果可能未定义。 -Default behaviour (merge by position): +默认行为(按位置合并): ``` arr1: [{name: a, p1: v1}, {name: b, p1: v1}] @@ -113,10 +113,10 @@ arr1: [{name: a, p2: v2}, {name: c, p2: v2}] merged: [{name: a, p1: v1, p2: v2}, {name: c, p1: v1, p2: v2}] ``` -The operator supports an alternative array merging mechanism that can be enabled by the `kubernetes.operator.pod-template.merge-arrays-by-name` flag. -When true, instead of the default positional merging, object array elements that have a `name` property defined will be merged by their name and the resulting array will be a union of the two input arrays. +Operator 支持另一种数组合并机制,可以通过 `kubernetes.operator.pod-template.merge-arrays-by-name` 标志启用。 +当为 true 时,不会进行默认的位置合并,而是根据名称合并定义了 `name` 属性的对象数组元素,并且生成的数组将是两个输入数组的并集。 -Merge by name: +通过名称合并: ``` arr1: [{name: a, p1: v1}, {name: b, p1: v1}] @@ -125,4 +125,4 @@ arr1: [{name: a, p2: v2}, {name: c, p2: v2}] merged: [{name: a, p1: v1, p2: v2}, {name: b, p1: v1}, {name: c, p2: v2}] ``` -Merging by name can we be very convenient when merging container specs or when the base and override templates are not defined together. +当合并容器规格或者当基础模板和覆盖模板没有一起定义时,按名称合并可以非常方便。 diff --git a/docs/content/docs/custom-resource/pod-template.md b/docs/content/docs/custom-resource/pod-template.md index bf7097e5..3695864c 100644 --- a/docs/content/doc
(flink-connector-jdbc) 01/02: [FLINK-33461][Connector/JDBC] Support streaming related semantics for the new JDBC source
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit 190238a05bebe9a092e9cec84627127781d4d859 Author: Roc Marshal AuthorDate: Fri May 10 21:18:11 2024 +0800 [FLINK-33461][Connector/JDBC] Support streaming related semantics for the new JDBC source --- .../connections/SimpleJdbcConnectionProvider.java | 1 + .../flink/connector/jdbc/source/JdbcSource.java| 40 +-- .../connector/jdbc/source/JdbcSourceBuilder.java | 87 - .../source/enumerator/JdbcSourceEnumerator.java| 88 - .../enumerator/JdbcSqlSplitEnumeratorBase.java | 6 +- .../enumerator/SqlTemplateSplitEnumerator.java | 36 +- .../jdbc/source/reader/JdbcSourceSplitReader.java | 186 +++--- .../jdbc/source/split/JdbcSourceSplit.java | 28 +- .../source/split/JdbcSourceSplitSerializer.java| 6 +- .../jdbc/source/split/JdbcSourceSplitState.java| 2 +- .../jdbc/split/JdbcParameterValuesProvider.java| 8 + .../split/JdbcSlideTimingParameterProvider.java| 94 ++ .../jdbc/utils/ContinuousUnBoundingSettings.java | 86 + .../jdbc/source/JdbcSourceBuilderTest.java | 43 ++- .../jdbc/source/JdbcSourceStreamRelatedITCase.java | 374 + .../JdbcSourceEnumStateSerializerTest.java | 5 +- .../enumerator/JdbcSourceEnumeratorTest.java | 8 +- .../jdbc/source/reader/JdbcSourceReaderTest.java | 2 +- .../source/reader/JdbcSourceSplitReaderTest.java | 2 +- .../split/JdbcSourceSplitSerializerTest.java | 3 +- 20 files changed, 971 insertions(+), 134 deletions(-) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java index 811210af..4c48f799 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java @@ -79,6 +79,7 @@ public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Ser @Override public boolean isConnectionValid() throws SQLException { return connection != null +&& !connection.isClosed() && connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds()); } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java index fc145feb..08e4a777 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java @@ -40,9 +40,12 @@ import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader; import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer; +import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + import java.io.Serializable; import java.util.ArrayList; import java.util.Objects; @@ -55,6 +58,7 @@ public class JdbcSource private final Boundedness boundedness; private final TypeInformation typeInformation; +private final @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings; private final Configuration configuration; private final JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider; @@ -69,29 +73,20 @@ public class JdbcSource JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider, ResultExtractor resultExtractor, TypeInformation typeInformation, -DeliveryGuarantee deliveryGuarantee) { +@Nullable DeliveryGuarantee deliveryGuarantee, +@Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings) { this.configuration = Preconditions.checkNotNull(configuration); this.connectionProvider = Preconditions.checkNotNull(connectionProvider); this.sqlSplitEnumeratorProvider = Preconditions.checkNotNull(sqlSplitEnumeratorProvider); this.resultExtractor = Preconditions.checkNotNull(resultExtractor); -this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee); +this.deliveryGuarantee = +Objects.isNull(deliveryGuarantee) ? DeliveryGuarantee.NONE : deliveryGuarante
(flink-connector-jdbc) 02/02: [FLINK-33462][Connector/JDBC] Sort out the document page about the new Jdbc source.
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit 1bab53304c65384b4fbe6a5fe71de71a344a78fe Author: Roc Marshal AuthorDate: Tue May 28 23:53:27 2024 +0800 [FLINK-33462][Connector/JDBC] Sort out the document page about the new Jdbc source. --- docs/content.zh/docs/connectors/datastream/jdbc.md | 486 - docs/content/docs/connectors/datastream/jdbc.md| 200 - 2 files changed, 660 insertions(+), 26 deletions(-) diff --git a/docs/content.zh/docs/connectors/datastream/jdbc.md b/docs/content.zh/docs/connectors/datastream/jdbc.md index a57cf0bb..fddaa9f6 100644 --- a/docs/content.zh/docs/connectors/datastream/jdbc.md +++ b/docs/content.zh/docs/connectors/datastream/jdbc.md @@ -26,38 +26,341 @@ under the License. # JDBC Connector -该连接器可以向 JDBC 数据库写入数据。 +This connector provides a source that read data from a JDBC database and +provides a sink that writes data to a JDBC database. -添加下面的依赖以便使用该连接器(同时添加 JDBC 驱动): +To use it, add the following dependency to your project (along with your JDBC driver): {{< connector_artifact flink-connector-jdbc jdbc >}} -注意该连接器目前还 __不是__ 二进制发行版的一部分,如何在集群中运行请参考 [这里]({{< ref "docs/dev/configuration/overview" >}})。 +Note that the streaming connectors are currently __NOT__ part of the binary distribution. +See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}). +A driver dependency is also required to connect to a specified database. +Please consult your database documentation on how to add the corresponding driver. -已创建的 JDBC Sink 能够保证至少一次的语义。 -更有效的精确执行一次可以通过 upsert 语句或幂等更新实现。 +## JDBC Source -用法示例: -{{< tabs "4ab65f13-608a-411a-8d24-e303f384ab5d" >}} +Configuration goes as follow (see also {{< javadoc file="org/apache/flink/connector/jdbc/source/JdbcSource.html" name="JdbcSource javadoc" >}} +and {{< javadoc file="org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.html" name="JdbcSourceBuilder javadoc" >}}). + +### Usage + +{{< tabs "4ab65f13-607a-411a-8d24-e709f701cd41" >}} {{< tab "Java" >}} ```java +JdbcSource source = JdbcSourceBuilder.builder() +// Required +.setSql(...) +.setResultExtractor(...) +.setUsername(...) +.setPassword(...) +.setDriverName(...) +.setDBUrl(...) +.setTypeInformation(...) + + // Optional +.setContinuousUnBoundingSettings(...) +.setJdbcParameterValuesProvider(...) +.setDeliveryGuarantee(...) +.setConnectionCheckTimeoutSeconds(...) + +// The extended JDBC connection property passing +.setConnectionProperty("key", "value") + +// other attributes +.setSplitReaderFetchBatchSize(...) +.setResultSetType(...) +.setResultSetConcurrency(...) +.setAutoCommit(...) +.setResultSetFetchSize(...) +.setConnectionProvider(...) +.build(); + +``` +{{< /tab >}} +{{< tab "Python" >}} +```python +Still not supported in Python API. +``` +{{< /tab >}} +{{< /tabs >}} + +### Delivery guarantee + +The JDBC source provides `at-least-once`/`at-most-once(default)`/`exactly-once` guarantee. +The `JdbcSource` supports `Delivery guarantee` semantic based on `Concur` of `ResultSet`. + +**NOTE:** Here's a few disadvantage. It only makes sense for corresponding semantic +that the `ResultSet` corresponding to this SQL(`JdbcSourceSplit`) +remains unchanged in the whole lifecycle of `JdbcSourceSplit` processing. +Unfortunately, this condition is not met in most databases and data scenarios. +See [FLIP-239](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271) for more details. + +### ResultExtractor + +An `Extractor` to extract a record from `ResultSet` executed by a sql. + +```java +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; + +import java.sql.ResultSet; +import java.sql.SQLException; + +class Book { +public Book(Long id, String title) { +this.id = id; +this.title = title; +} + +final Long id; +final String title; +}; + +ResultExtractor resultExtractor = new ResultExtractor() { +@Override +public Object extract(ResultSet resultSet) throws SQLException { +return new Book(resultSet.getLong("id"), resultSet.getString("titile")); +} +}; + +``` + +### JdbcParameterValuesProvider + +A provider to provide parameters in sql to fulfill actual value in the corresponding placeholders, which is in the form of two-dimension array. +See {{< javadoc file="org/apache/flink/connector/jdbc/split/JdbcParameterValuesProvider.html" name="JdbcParameterValuesProvider javadoc" >}} for more details. + +```java + +class TestEntry { +... +}; + +ResultSetExtractor extractor = ...; + StreamExecutionEnvironment env = StreamExecutionEnvironment.g
(flink-connector-jdbc) branch main updated (3b063c9c -> 1bab5330)
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git from 3b063c9c [FLINK-35137][Connectors/JDBC] Update website data for 3.3-SNAPSHOT new 190238a0 [FLINK-33461][Connector/JDBC] Support streaming related semantics for the new JDBC source new 1bab5330 [FLINK-33462][Connector/JDBC] Sort out the document page about the new Jdbc source. The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/content.zh/docs/connectors/datastream/jdbc.md | 486 - docs/content/docs/connectors/datastream/jdbc.md| 200 - .../connections/SimpleJdbcConnectionProvider.java | 1 + .../flink/connector/jdbc/source/JdbcSource.java| 40 +- .../connector/jdbc/source/JdbcSourceBuilder.java | 87 +++- .../source/enumerator/JdbcSourceEnumerator.java| 88 +++- .../enumerator/JdbcSqlSplitEnumeratorBase.java | 6 +- .../enumerator/SqlTemplateSplitEnumerator.java | 36 +- .../jdbc/source/reader/JdbcSourceSplitReader.java | 186 +--- .../jdbc/source/split/JdbcSourceSplit.java | 28 +- .../source/split/JdbcSourceSplitSerializer.java| 6 +- .../jdbc/source/split/JdbcSourceSplitState.java| 2 +- .../jdbc/split/JdbcParameterValuesProvider.java| 8 + .../split/JdbcSlideTimingParameterProvider.java| 94 .../jdbc/utils/ContinuousUnBoundingSettings.java | 86 .../jdbc/source/JdbcSourceBuilderTest.java | 43 +- .../jdbc/source/JdbcSourceStreamRelatedITCase.java | 374 .../JdbcSourceEnumStateSerializerTest.java | 5 +- .../enumerator/JdbcSourceEnumeratorTest.java | 8 +- .../jdbc/source/reader/JdbcSourceReaderTest.java | 2 +- .../source/reader/JdbcSourceSplitReaderTest.java | 2 +- .../split/JdbcSourceSplitSerializerTest.java | 3 +- 22 files changed, 1631 insertions(+), 160 deletions(-) create mode 100644 flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcSlideTimingParameterProvider.java create mode 100644 flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/ContinuousUnBoundingSettings.java create mode 100644 flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceStreamRelatedITCase.java
(flink) branch master updated: [FLINK-35157][runtime] Sources with watermark alignment get stuck once some subtasks finish (#24757)
This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 93b9bdf798a [FLINK-35157][runtime] Sources with watermark alignment get stuck once some subtasks finish (#24757) 93b9bdf798a is described below commit 93b9bdf798aec933122bdee6e9b70860eadd0864 Author: elon-X <34712973+elo...@users.noreply.github.com> AuthorDate: Fri Jun 14 09:26:56 2024 +0800 [FLINK-35157][runtime] Sources with watermark alignment get stuck once some subtasks finish (#24757) --- .../source/coordinator/SourceCoordinator.java | 14 +++-- .../streaming/api/operators/SourceOperator.java| 8 ++- .../api/operators/SourceOperatorAlignmentTest.java | 31 ++ .../api/datastream/WatermarkAlignmentITCase.java | 72 ++ 4 files changed, 118 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index e64eb0424ca..3133bbe7ce7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -198,12 +198,16 @@ public class SourceCoordinator subTaskIds, operatorName); -// Subtask maybe during deploying or restarting, so we only send WatermarkAlignmentEvent -// to ready task to avoid period task fail (Java-ThreadPoolExecutor will not schedule -// the period task if it throws an exception). for (Integer subtaskId : subTaskIds) { -context.sendEventToSourceOperatorIfTaskReady( -subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); +// when subtask have been finished, do not send event. +if (!context.hasNoMoreSplits(subtaskId)) { +// Subtask maybe during deploying or restarting, so we only send +// WatermarkAlignmentEvent to ready task to avoid period task fail +// (Java-ThreadPoolExecutor will not schedule the period task if it throws an +// exception). +context.sendEventToSourceOperatorIfTaskReady( +subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); +} } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index d49de8c622c..972415c1ff0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -434,7 +434,7 @@ public class SourceOperator extends AbstractStr // introduces a small performance regression (probably because of an extra // virtual call) processingTimeService.scheduleWithFixedDelay( -this::emitLatestWatermark, +time -> emitLatestWatermark(), watermarkAlignmentParams.getUpdateInterval(), watermarkAlignmentParams.getUpdateInterval()); } @@ -449,6 +449,10 @@ public class SourceOperator extends AbstractStr sourceMetricGroup.idlingStarted(); return DataInputStatus.END_OF_DATA; case DATA_FINISHED: +if (watermarkAlignmentParams.isEnabled()) { +latestWatermark = Watermark.MAX_WATERMARK.getTimestamp(); +emitLatestWatermark(); +} sourceMetricGroup.idlingStarted(); return DataInputStatus.END_OF_INPUT; case WAITING_FOR_ALIGNMENT: @@ -507,7 +511,7 @@ public class SourceOperator extends AbstractStr } } -private void emitLatestWatermark(long time) { +private void emitLatestWatermark() { checkState(currentMainOutput != null); if (latestWatermark == Watermark.UNINITIALIZED.getTimestamp()) { return; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java index db3f8f0ed1f..30b8fdd5063 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java @@ -27,
(flink-cdc) annotated tag release-3.1.1-rc0 updated (b163b8e15 -> 8df995069)
This is an automated email from the ASF dual-hosted git repository. renqs pushed a change to annotated tag release-3.1.1-rc0 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git *** WARNING: tag release-3.1.1-rc0 was modified! *** from b163b8e15 (commit) to 8df995069 (tag) tagging b163b8e1589184bd7716cf6d002f3472766eb5db (commit) by Qingsheng Ren on Thu Jun 13 23:29:43 2024 +0800 - Log - release-3.1.1-rc0 -BEGIN PGP SIGNATURE- iQIzBAABCAAdFiEEob1Hf3nQNtLDDKfbyoruwvbrBAsFAmZrEGcACgkQyoruwvbr BAukxA/+LOk/veVgrl9wyYYuYiqx5DwrWLjzxMTMAcgwt9bBo2Lr4G5vVov6tQA/ d+1zkcmHnlyHfayV3ac5Tn2+rJgQyprqj5L7V9/E1en+pW07g/ZF5d8wYvqFCe8x XYGooyLzeGRt4Q+DgXEZ3HxFBcyMarA4/FCCiAgQjhQ8ph2EvUBJC1DQlqz/yPaY 5s03BnMlJ+BK/yNPOApBSv3X56ysDNsISabqO3qSq1z/1VyaJEW8ALf687rVj5v4 o8hedD4BDaeJSyPzJrhag//hPU7VIC5wXg8IQp8mP/bEjbQJ+JdkHrrs8HY6QEQt ksB+HsL+hWLBZPOzutnfcYjfVqPe/Y4pAwUxRIdlSXo7vnlITq2LOC/gD9zaypzz SHlc5NQnHVydDOt2U+4b/SsBfwImEWekJWIpSh6Cq1YrmyeQVcMV0+7xf/Y9rdby 4+BAAzZfG9q6g27mJmjOrb7Vjq8qSy1PRk6jgv0IsaiNt6NpUwBRTAxwNlDxgHqW T/BCKmHTpVD6TYvz/yLKUydD9uJR+CoruWoYBXbvDJuvQw0udB+YOl34zpvkpYhF KUfnA7DhjnODR3u3EmIke5xfNbCl47L0jYtkdwD6LG1NeYFAoIT0MPypguVo1FPW NpVNObpThZR45G+yERSU/lfJfgFl1xsRs0Ktnype9OeybGM/nAQ= =cQ+s -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
svn commit: r69697 - in /dev/flink/flink-cdc-3.1.1-rc0: ./ flink-cdc-3.1.1-bin.tar.gz flink-cdc-3.1.1-bin.tar.gz.asc flink-cdc-3.1.1-bin.tar.gz.sha512 flink-cdc-3.1.1-src.tgz flink-cdc-3.1.1-src.tgz.a
Author: renqs Date: Thu Jun 13 15:26:50 2024 New Revision: 69697 Log: Apache Flink CDC, version 3.1.1, release candidate 0 Added: dev/flink/flink-cdc-3.1.1-rc0/ dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz (with props) dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz.asc dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz.sha512 dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz (with props) dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz.asc dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz.sha512 Added: dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz == Binary file - no diff available. Propchange: dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz -- svn:mime-type = application/octet-stream Added: dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz.asc == --- dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz.asc (added) +++ dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz.asc Thu Jun 13 15:26:50 2024 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCAAdFiEEob1Hf3nQNtLDDKfbyoruwvbrBAsFAmZrDwgACgkQyoruwvbr +BAsIZxAAmS2cYYWKsaRbQg0AJ41ZODUrCL/8pedFx9moKdERxQbBnrYKvkiI9Num +IPUdIV1u6njIdSNRZQTpI9Jt7rpM43aocJfwfXKF/rR750xvAB049y3U/1LcUxQG +/+HykacJCSx/jIDogLGXdVIA5T5OOg9omGea4DPnUnuXFAAxbSX92JK39b24F7DQ +j6haNqC/YPImanHThRfH41kQe/UBkK6vE54JLGoZfxYthHe/eLSbS26R/wN5cvxj +CYqOG2kSqpuJ0zBnivXc/ttnd7zEcgw0mWXf3a6O9Rr/EVJjfp/16MdFpQCu9u3d +9EdJk9OFYi+QWinh4AHY2VDc6k+71jqqv4kY9yruqMg/G/Z3Ybo66KmoPGXoLSmZ +HuqzrvBmOcsD7L/UvHrO2frDfBdRORyv4M8WpdSWOLSSkKERICwXjZTw9h9ad1Ov +V6nxN0IINRMhO5o5Bqddo8LRMSVMZnxKdvH5p/NsoxKNxT2HBKIFwpAZewkpkPwO +1SWMMj8mI6dAdp8pghOB0CSWJM6h9jT3WmUZNwm3lp2EjSgfFv0PA2qj4lbxDQrM +5YIwjed9q4O52drjM3B7g9ykLr3b48KYx1kiwsaJbvs8FfUMBRnmzch60pfb3ORf +cYHTDvzvJIzYH7X1op8b5XOC46F/C3scEQkAzJw0jInvmMt2W1w= +=KxW1 +-END PGP SIGNATURE- Added: dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz.sha512 == --- dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz.sha512 (added) +++ dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-bin.tar.gz.sha512 Thu Jun 13 15:26:50 2024 @@ -0,0 +1 @@ +580ac2e4ffb88cc98d3f747083c5340335b3aa79193a7fad5a782d43a49f11dc73871dc6fb92cdc93d41e499a84a28ee997ae17a4cf625d349a6735e49e98750 flink-cdc-3.1.1-bin.tar.gz Added: dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz == Binary file - no diff available. Propchange: dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz -- svn:mime-type = application/octet-stream Added: dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz.asc == --- dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz.asc (added) +++ dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz.asc Thu Jun 13 15:26:50 2024 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCAAdFiEEob1Hf3nQNtLDDKfbyoruwvbrBAsFAmZrAqQACgkQyoruwvbr +BAt22g/6Anb+Ze2Yycar6+FVCk/DBg4IJ34bN5aJTMLQHk+n/sDmeHUHHRBkhOxf ++ffeUO7tBpKC75XCheSVfCncgWCjCg4aIFSK168zYOjE/W56oZCLAUOK6ZURqVNJ +QMoGK8jPlYTECfnG6rHbucM5e2IhCQIRQd3jqgoe1OXuPuyiVl9gmJ0fr393P+qB +OiG38i/yzvtIusW4ElYWhDZ+OoW8LytOR4CDX6l5LMvRxg5XlGImnIYw8Bohobfm ++TC8Q/SMLpARtiguewEn/zp93zi5mps92R2LLm3zoXTZPpxPqKGyipV6W3cp3NzF +fCMFq2qKHnL6u0zo0VQCfOm8/WZ+LmdMwIHE9ZyrBlIH2oAiwYDFNaJKqsbgfYZ7 +/o78lSzYrdwQ0txIVnFPEQWvKvDlRUkNJx9aRftGJR+4IJj7TO3wOnoXT8FDphiU +O57owEzm6V0T4tIR+SlEHOjVrjDTMkznmLJgXiUFWJAsQeNHh2b4TqtO1veVS0dB +3H6yjKaOsjWBuxOCCBZ0pAnIiSR9OUNwZlQ3sm2tEr9VBJzhtolSrFAJ6GeEhfNR +aKqy0UUeoPqax0q6eM7vi69HBd4Ko64C4XzDxE4QRtVd/BG/K1Ka4Q1/jDgzk6x2 +2EGW2N/bAfX3Gna2SXOlpkWFrgNYRID/F3w55pqFo+Z/nX7QjtM= +=Mk3F +-END PGP SIGNATURE- Added: dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz.sha512 == --- dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz.sha512 (added) +++ dev/flink/flink-cdc-3.1.1-rc0/flink-cdc-3.1.1-src.tgz.sha512 Thu Jun 13 15:26:50 2024 @@ -0,0 +1 @@ +fbd3a3c7e4769e59882a7823770f7408744a0ecf2aa0cf40d5149697375c99c4734fed54ba6902f86dc3531f910a7c1e4f975d2b3425d416f3c1e21c36cd75ad flink-cdc-3.1.1-src.tgz
(flink-cdc) branch release-3.1 updated: [FLINK-35592][cdc] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp (#3380)
This is an automated email from the ASF dual-hosted git repository. renqs pushed a commit to branch release-3.1 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git The following commit(s) were added to refs/heads/release-3.1 by this push: new f476a5b1d [FLINK-35592][cdc] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp (#3380) f476a5b1d is described below commit f476a5b1dd99fc63962bc75048c0b12957e53da8 Author: ConradJam AuthorDate: Thu Jun 13 22:22:09 2024 +0800 [FLINK-35592][cdc] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp (#3380) --- .../converters/MysqlDebeziumTimeConverter.java | 112 - .../MysqlDebeziumTimeConverterITCase.java | 8 +- .../src/test/resources/ddl/date_convert_test.sql | 3 +- 3 files changed, 73 insertions(+), 50 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java index 493fd682c..1eb98947d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java @@ -113,54 +113,73 @@ public class MysqlDebeziumTimeConverter registration.register( SchemaBuilder.string().name(schemaName).optional(), value -> { -log.debug( -"find schema need to change dateType, field name:{} ,field type:{} ,field value:{} ,field " -+ "default:{}", -field.name(), -columnType, -value == null ? "null" : value, -field.hasDefaultValue() ? field.defaultValue() : "null"); -if (value == null) { -return convertDateDefaultValue(field); -} -switch (columnType.toUpperCase(Locale.ROOT)) { -case "DATE": -if (value instanceof Integer) { -return this.convertToDate( -columnType, LocalDate.ofEpochDay((Integer) value)); -} -return this.convertToDate(columnType, value); -case "TIME": -if (value instanceof Long) { -long l = -Math.multiplyExact( -(Long) value, TimeUnit.MICROSECONDS.toNanos(1)); -return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l)); -} -return this.convertToTime(columnType, value); -case "DATETIME": -if (value instanceof Long) { -if (getTimePrecision(field) <= 3) { -return this.convertToTimestamp( -columnType, - Conversions.toInstantFromMillis((Long) value)); -} -if (getTimePrecision(field) <= 6) { -return this.convertToTimestamp( -columnType, - Conversions.toInstantFromMicros((Long) value)); -} -} -return this.convertToTimestamp(columnType, value); -case "TIMESTAMP": -return this.convertToTimestampWithTimezone(columnType, value); -default: -throw new IllegalArgumentException( -"Unknown field type " + columnType.toUpperCase(Locale.ROOT)); +try { +return convertDateObject(field, value, columnType); +} catch (Exception e) { +printConvertDateErrorClassLogs(field, registration, value); +throw new RuntimeException("MysqlDebeziumConverter error", e); } }); } +private void printConvertDateErrorClassLogs( +RelationalColumn field, +ConverterRegistration registration, +Object value) { +bool
(flink) branch master updated (9d169038784 -> d0f9bb40a61)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 9d169038784 [FLINK-35164][table] Support `ALTER CATALOG RESET` syntax add d0f9bb40a61 [FLINK-34172] Add support for altering a distribution via ALTER TABLE No new revisions were added by this update. Summary of changes: .../src/main/codegen/data/Parser.tdd | 1 + .../src/main/codegen/includes/parserImpls.ftl | 21 +-- .../flink/sql/parser/ddl/SqlAlterTableAdd.java | 9 +- ...ark.java => SqlAlterTableDropDistribution.java} | 14 +- .../flink/sql/parser/ddl/SqlAlterTableModify.java | 9 +- .../flink/sql/parser/ddl/SqlAlterTableSchema.java | 21 +++ .../flink/sql/parser/ddl/SqlCreateTable.java | 1 + .../flink/sql/parser/ddl/SqlDistribution.java | 21 ++- .../flink/sql/parser/FlinkSqlParserImplTest.java | 57 +++- .../operations/ddl/AlterTableChangeOperation.java | 9 ++ .../apache/flink/table/catalog/TableChange.java| 158 + .../planner/operations/AlterSchemaConverter.java | 99 +++-- .../operations/SqlCreateTableConverter.java| 29 +--- .../operations/SqlNodeToOperationConversion.java | 31 .../planner/utils/OperationConverterUtils.java | 31 .../operations/SqlDdlToOperationConverterTest.java | 143 +-- 16 files changed, 578 insertions(+), 76 deletions(-) copy flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/{SqlAlterTableDropWatermark.java => SqlAlterTableDropDistribution.java} (80%)
(flink) branch master updated: [FLINK-35164][table] Support `ALTER CATALOG RESET` syntax
This is an automated email from the ASF dual-hosted git repository. jchan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 9d169038784 [FLINK-35164][table] Support `ALTER CATALOG RESET` syntax 9d169038784 is described below commit 9d1690387849303b27050bb0cefaa1bad6e3fb98 Author: Yubin Li AuthorDate: Thu Jun 13 19:19:47 2024 +0800 [FLINK-35164][table] Support `ALTER CATALOG RESET` syntax This closes #24763 --- docs/content.zh/docs/dev/table/sql/alter.md| 20 +- docs/content/docs/dev/table/sql/alter.md | 20 +- .../src/test/resources/sql/catalog_database.q | 187 ++ .../src/test/resources/sql/catalog_database.q | 218 - .../src/main/codegen/data/Parser.tdd | 1 + .../src/main/codegen/includes/parserImpls.ftl | 24 ++- .../flink/sql/parser/ddl/SqlAlterCatalogReset.java | 76 +++ .../flink/sql/parser/FlinkSqlParserImplTest.java | 3 +- .../apache/flink/table/catalog/CatalogManager.java | 12 +- .../ddl/AlterCatalogOptionsOperation.java | 4 +- ...ration.java => AlterCatalogResetOperation.java} | 30 ++- .../converters/SqlAlterCatalogResetConverter.java | 48 + .../operations/converters/SqlNodeConverters.java | 1 + .../operations/SqlDdlToOperationConverterTest.java | 21 ++ 14 files changed, 448 insertions(+), 217 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/alter.md b/docs/content.zh/docs/dev/table/sql/alter.md index 808ee893023..39de6985c9d 100644 --- a/docs/content.zh/docs/dev/table/sql/alter.md +++ b/docs/content.zh/docs/dev/table/sql/alter.md @@ -28,7 +28,7 @@ under the License. -ALTER 语句用于修改一个已经在 [Catalog]({{< ref "docs/dev/table/catalogs" >}}) 中注册的表、视图或函数定义。 +ALTER 语句用于修改一个已经在 [Catalog]({{< ref "docs/dev/table/catalogs" >}}) 中注册的表、视图或函数定义,或 catalog 本身的定义。 Flink SQL 目前支持以下 ALTER 语句: @@ -36,6 +36,7 @@ Flink SQL 目前支持以下 ALTER 语句: - ALTER VIEW - ALTER DATABASE - ALTER FUNCTION +- ALTER CATALOG ## 执行 ALTER 语句 @@ -538,10 +539,14 @@ ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION Language tag 用于指定 Flink runtime 如何执行这个函数。目前,只支持 JAVA,SCALA 和 PYTHON,且函数的默认语言为 JAVA。 +{{< top >}} + ## ALTER CATALOG ```sql -ALTER CATALOG catalog_name SET (key1=val1, ...) +ALTER CATALOG catalog_name +SET (key1=val1, ...) + | RESET (key1, ...) ``` ### SET @@ -555,4 +560,15 @@ ALTER CATALOG catalog_name SET (key1=val1, ...) ALTER CATALOG cat2 SET ('default-database'='db'); ``` +### RESET + +为指定的 catalog 重置一个或多个属性。 + +`RESET` 语句示例如下。 + +```sql +-- reset 'default-database' +ALTER CATALOG cat2 RESET ('default-database'); +``` + {{< top >}} diff --git a/docs/content/docs/dev/table/sql/alter.md b/docs/content/docs/dev/table/sql/alter.md index 54fdf8f15b0..e842d137ce0 100644 --- a/docs/content/docs/dev/table/sql/alter.md +++ b/docs/content/docs/dev/table/sql/alter.md @@ -28,7 +28,7 @@ under the License. -ALTER statements are used to modified a registered table/view/function definition in the [Catalog]({{< ref "docs/dev/table/catalogs" >}}). +ALTER statements are used to modify the definition of a table, view or function that has already been registered in the [Catalog]({{< ref "docs/dev/table/catalogs" >}}), or the definition of a catalog itself. Flink SQL supports the following ALTER statements for now: @@ -36,6 +36,7 @@ Flink SQL supports the following ALTER statements for now: - ALTER VIEW - ALTER DATABASE - ALTER FUNCTION +- ALTER CATALOG ## Run an ALTER statement @@ -540,10 +541,14 @@ If the function doesn't exist, nothing happens. Language tag to instruct flink runtime how to execute the function. Currently only JAVA, SCALA and PYTHON are supported, the default language for a function is JAVA. +{{< top >}} + ## ALTER CATALOG ```sql -ALTER CATALOG catalog_name SET (key1=val1, ...) +ALTER CATALOG catalog_name +SET (key1=val1, ...) + | RESET (key1, ...) ``` ### SET @@ -557,4 +562,15 @@ The following examples illustrate the usage of the `SET` statements. ALTER CATALOG cat2 SET ('default-database'='db'); ``` +### RESET + +Reset one or more properties to its default value in the specified catalog. + +The following examples illustrate the usage of the `RESET` statements. + +```sql +-- reset 'default-database' +ALTER CATALOG cat2 RESET ('default-database'); +``` + {{< top >}} diff --git a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q index c9a5b02116a..b99af7344f9 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q @@ -92,6 +92,110 @@ drop catalog c1; org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a catalog which is currently in use. !error +c
(flink) branch master updated (14fdd13b632 -> f0b01277dd2)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 14fdd13b632 [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: core (#24881) add f0b01277dd2 [FLINK-35378][FLIP-453] Promote Unified Sink API V2 to Public and Deprecate SinkFunction. This closes #24805 No new revisions were added by this update. Summary of changes: .../18509c9e-3250-4c52-91b9-11ccefc85db1 | 9 + .../e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5 | 16 .../org/apache/flink/api/connector/sink2/Committer.java | 6 +++--- .../flink/api/connector/sink2/CommitterInitContext.java | 4 ++-- .../flink/api/connector/sink2/CommittingSinkWriter.java | 4 ++-- .../java/org/apache/flink/api/connector/sink2/Sink.java | 3 ++- .../org/apache/flink/api/connector/sink2/SinkWriter.java | 6 +++--- .../flink/api/connector/sink2/StatefulSinkWriter.java| 4 ++-- .../flink/api/connector/sink2/SupportsCommitter.java | 4 ++-- .../flink/api/connector/sink2/SupportsWriterState.java | 3 ++- .../flink/api/connector/sink2/WriterInitContext.java | 6 ++ .../streaming/api/functions/sink/DiscardingSink.java | 3 +++ .../streaming/api/functions/sink/PrintSinkFunction.java | 3 +++ .../streaming/api/functions/sink/RichSinkFunction.java | 8 +++- .../flink/streaming/api/functions/sink/SinkFunction.java | 3 +++ .../streaming/api/functions/sink/SocketClientSink.java | 3 +++ .../api/functions/sink/TwoPhaseCommitSinkFunction.java | 3 +++ pom.xml | 5 + 18 files changed, 64 insertions(+), 29 deletions(-)