This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 61374638c02 [FLINK-25485][connector/jdbc] Add default jdbc option 'rewriteBatchedStatements' for MySQL dialect 61374638c02 is described below commit 61374638c02e5f996cad678253c99363f7ae01a5 Author: Luning (Lucas) Wang <wang4lun...@gmail.com> AuthorDate: Tue Aug 9 16:18:59 2022 +0800 [FLINK-25485][connector/jdbc] Add default jdbc option 'rewriteBatchedStatements' for MySQL dialect This closes #18469. --- .../flink/connector/jdbc/dialect/JdbcDialect.java | 12 +++++++ .../connector/jdbc/dialect/mysql/MySqlDialect.java | 18 ++++++++++ .../internal/options/JdbcConnectorOptions.java | 2 +- .../jdbc/dialect/mysql/MySqlDialectTest.java | 41 ++++++++++++++++++++++ 4 files changed, 72 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java index 6d4c1db91b4..6cc6bbd57d1 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java @@ -142,4 +142,16 @@ public interface JdbcDialect extends Serializable { */ String getSelectFromStatement( String tableName, String[] selectFields, String[] conditionFields); + + /** + * Appends default JDBC properties to url for current dialect. Some database dialects will set + * default JDBC properties for performance or optimization consideration, such as MySQL dialect + * uses 'rewriteBatchedStatements=true' to enable execute multiple MySQL statements in batch + * mode. + * + * @return A JDBC url that has appended the default properties. + */ + default String appendDefaultUrlProperties(String url) { + return url; + } } diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialect.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialect.java index 918af883685..b44155913aa 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialect.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialect.java @@ -47,6 +47,10 @@ public class MySqlDialect extends AbstractDialect { private static final int MAX_DECIMAL_PRECISION = 65; private static final int MIN_DECIMAL_PRECISION = 1; + // The JDBC option to enable execute multiple MySQL statements in batch mode: + // https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-performance-extensions.html#cj-conn-prop_rewriteBatchedStatements + private static final String REWRITE_BATCHED_STATEMENTS = "rewriteBatchedStatements"; + @Override public JdbcRowConverter getRowConverter(RowType rowType) { return new MySQLRowConverter(rowType); @@ -126,4 +130,18 @@ public class MySqlDialect extends AbstractDialect { LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE); } + + @Override + public String appendDefaultUrlProperties(String url) { + if (!url.contains(REWRITE_BATCHED_STATEMENTS)) { + String defaultUrlProperties = REWRITE_BATCHED_STATEMENTS + "=true"; + if (url.contains("?")) { + return url + "&" + defaultUrlProperties; + } else { + return url + "?" + defaultUrlProperties; + } + } else { + return url; + } + } } diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java index 6158475a6aa..89d22bd3067 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java @@ -181,7 +181,7 @@ public class JdbcConnectorOptions extends JdbcConnectionOptions { } return new JdbcConnectorOptions( - dbURL, + dialect.appendDefaultUrlProperties(dbURL), tableName, driverName, username, diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java new file mode 100644 index 00000000000..006cc7685c6 --- /dev/null +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.dialect.mysql; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link MySqlDialect}. */ +public class MySqlDialectTest { + + @Test + void testAppendDefaultUrlProperties() { + MySqlDialect dialect = new MySqlDialect(); + assertThat(dialect.appendDefaultUrlProperties("jdbc:mysql://localhost:3306/foo")) + .isEqualTo("jdbc:mysql://localhost:3306/foo?rewriteBatchedStatements=true"); + assertThat(dialect.appendDefaultUrlProperties("jdbc:mysql://localhost:3306/foo?foo=bar")) + .isEqualTo("jdbc:mysql://localhost:3306/foo?foo=bar&rewriteBatchedStatements=true"); + assertThat( + dialect.appendDefaultUrlProperties( + "jdbc:mysql://localhost:3306/foo?foo=bar&rewriteBatchedStatements=false")) + .isEqualTo( + "jdbc:mysql://localhost:3306/foo?foo=bar&rewriteBatchedStatements=false"); + } +}