This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new c13c8b920e8 branch-4.1: [improve](streaming-job) support SSL and align
MySQL CDC source with PG #62700 (#63044)
c13c8b920e8 is described below
commit c13c8b920e8a925f8fa70a3320f89948d4a259d2
Author: wudi <[email protected]>
AuthorDate: Thu May 7 14:47:49 2026 +0800
branch-4.1: [improve](streaming-job) support SSL and align MySQL CDC source
with PG #62700 (#63044)
Cherry-picked from #62700
---
.licenserc.yaml | 1 +
.../docker-compose/mysql/certs/root.crt | 19 +++
.../docker-compose/mysql/certs/server.crt | 18 ++
.../docker-compose/mysql/certs/server.key | 28 +++
.../docker-compose/mysql/mysql-5.7.yaml.tpl | 19 ++-
.../apache/doris/job/cdc/DataSourceConfigKeys.java | 4 +
.../streaming/DataSourceConfigValidator.java | 19 +++
.../streaming/DataSourceConfigValidatorTest.java | 81 +++++++++
.../source/reader/mysql/MySqlSourceReader.java | 48 ++++++
.../apache/doris/cdcclient/utils/SmallFileMgr.java | 93 ++++++++++
.../source/reader/mysql/MySqlSourceReaderTest.java | 62 +++++++
.../doris/cdcclient/utils/SmallFileMgrTest.java | 119 +++++++++++++
.../cdc/test_streaming_mysql_job_col_filter.out | 9 +
.../cdc/test_streaming_mysql_job_ssl.out | 9 +
.../cdc/test_streaming_mysql_job_table_mapping.out | 19 +++
.../cdc/test_streaming_mysql_job_col_filter.groovy | 175 +++++++++++++++++++
.../cdc/test_streaming_mysql_job_ssl.groovy | 158 +++++++++++++++++
.../test_streaming_mysql_job_table_mapping.groovy | 190 +++++++++++++++++++++
18 files changed, 1069 insertions(+), 2 deletions(-)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 5cd7c30afa8..5bdfa221eb5 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -95,6 +95,7 @@ header:
- "docker/thirdparties/docker-compose/kerberos/sql/**"
- "docker/thirdparties/docker-compose/iceberg/spark-defaults.conf.tpl"
- "docker/thirdparties/docker-compose/postgresql/certs/**"
+ - "docker/thirdparties/docker-compose/mysql/certs/**"
- "conf/mysql_ssl_default_certificate/*"
- "conf/mysql_ssl_default_certificate/client_certificate/ca.pem"
- "conf/mysql_ssl_default_certificate/client_certificate/client-cert.pem"
diff --git a/docker/thirdparties/docker-compose/mysql/certs/root.crt
b/docker/thirdparties/docker-compose/mysql/certs/root.crt
new file mode 100644
index 00000000000..94095b762c6
--- /dev/null
+++ b/docker/thirdparties/docker-compose/mysql/certs/root.crt
@@ -0,0 +1,19 @@
+-----BEGIN CERTIFICATE-----
+MIIDDzCCAfegAwIBAgIULswy9ovSHXeKSxoEen2Y3xEZqBgwDQYJKoZIhvcNAQEL
+BQAwFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMB4XDTI2MDMwMzA4MjMxM1oXDTM2
+MDIyOTA4MjMxM1owFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMIIBIjANBgkqhkiG
+9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsVFJhj3Y7zamNZiq9SefnnKAKaOXXUbXo/Fq
+V6VNzMSkZuwDfRo/RKjvVaUru/JSd7QoV5zGyUYb+oHx/R233R1M0sd23+eR1mRQ
+w771DmXthbdpIPBEwlmh0LMsiH9cJ7R2iRigCzfd2/SbJC3cvX6CtzyNqSkZboVO
+fswkotF4ZaJgOiBile4A/zWWqeA07QVd8tusdxaoOJv0E/pjcLi5peGXtQA6SSj4
+tp20K/tlrRS1Zc0dKgxU7YohxNBwW4QF0uOVR/QBmfzEpMdxKlwcEnHubPAemgt1
+bp9g9Buwo7oWMvDJuS40xMPOlDhshrzNM8CoWIihgndMPG/LsQIDAQABo1MwUTAd
+BgNVHQ4EFgQUHBKhmdKPD+b1xDjzzkQVaVETSfUwHwYDVR0jBBgwFoAUHBKhmdKP
+D+b1xDjzzkQVaVETSfUwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOC
+AQEAnueVOIAk/XLQx3msDY58Reo+D1f/AUy/WTPzxeXCxXLScrjFCLXjrIDzgslN
+WnP7E5xNJxdrWgskS36IJxVg0+cUfy5kQYYfmWo1vOYdW/AMNBdQwmK5ve3r3Z/3
+dE2cV4uvL6n0iZZMxnsL5KXwLeSQeTtJepvWi27Z0t8P23lJHJKfl/Ek49ILIDgB
+zZIMKPgm6w7/U3jUWMUyQ+iI/XiEPrnn4url1FNViC8ucoIm8EU4ZE01j1mbZO8M
+JSa6InQEIx/1P675qYtuKWF75Tq/qU7+uX7/07AiTyYSrHMT+024TfbRCi1PF/Ka
+cx+pSJLima+3GHhK2Rj437yx1Q==
+-----END CERTIFICATE-----
diff --git a/docker/thirdparties/docker-compose/mysql/certs/server.crt
b/docker/thirdparties/docker-compose/mysql/certs/server.crt
new file mode 100644
index 00000000000..e8aaecb71ec
--- /dev/null
+++ b/docker/thirdparties/docker-compose/mysql/certs/server.crt
@@ -0,0 +1,18 @@
+-----BEGIN CERTIFICATE-----
+MIIC+zCCAeOgAwIBAgIUG/9rYO8McYBH83YOe4nzMcb4YCAwDQYJKoZIhvcNAQEL
+BQAwFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMB4XDTI2MDMwMzA4MjMyNloXDTM2
+MDIyOTA4MjMyNlowFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0B
+AQEFAAOCAQ8AMIIBCgKCAQEAtHYYwevcMqMPbCAaQlrX7qJtRXf/j+WfGFbM4/PZ
+Y6cjSsrqUgwHduMyE4yce9vWWygLJM/S9aBI3jsvhAdLVaFIXhOU4jMuyk+1RJvu
+k+iUJ3wabo2Zv6605wUU7wS0FCfJJMxG/zz5FYtX8kMw7sKJWLhB4C+oQlO+mSj4
+CKjg7mNZjgKz024/BW7FKhAaYYGI9GNmjIgvjSDXGOXzd2nM9XLoVNIkR8mgD69l
+yHHzhGUAdXDxaTr+026Z2uBrnip7ZjDIB65J/qrxSc8eK1ZhZzYdHBpLnP67zuWR
+iyKDNETpRa1SoWCk9/9+AGwygRcXC7h1GpMb46wce4/TtwIDAQABo0IwQDAdBgNV
+HQ4EFgQUEeFQVqK+A/H6R2iSiNW57cSilGcwHwYDVR0jBBgwFoAUHBKhmdKPD+b1
+xDjzzkQVaVETSfUwDQYJKoZIhvcNAQELBQADggEBAKpxfqPTPXL2+n/OW6F8cvwK
+aod3BOquIjIKm17+Uob0rcOnxssYNQa0g9pW2zgIlAS+QUZ1K46ygJWrLNKdpIzt
+mG2Hn6kUX9J7Xo+F5IldlX2bImi3b2/oI8IliLzawsofondCzL2BIfWLhE3LaISF
+iN8pfzjoHCZXfLm3oUzxaeltFqEP+cApig/hAO17FkMHY6sl9QII94MV2d9gVwVl
+pAi1ALOzOQKbsTCdRspoadPqmZ7AgbtS3RiVMmCZHwrtCvdIcaBuiPy5KiBFPCEX
+Cdia+GWqETKBNpornHeMQ7d/J2ilbFRs+mRAUtyeWK0ilcdOxbmMOzCsjIV8kgI=
+-----END CERTIFICATE-----
diff --git a/docker/thirdparties/docker-compose/mysql/certs/server.key
b/docker/thirdparties/docker-compose/mysql/certs/server.key
new file mode 100644
index 00000000000..d9eb2a8f4ad
--- /dev/null
+++ b/docker/thirdparties/docker-compose/mysql/certs/server.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC0dhjB69wyow9s
+IBpCWtfuom1Fd/+P5Z8YVszj89ljpyNKyupSDAd24zITjJx729ZbKAskz9L1oEje
+Oy+EB0tVoUheE5TiMy7KT7VEm+6T6JQnfBpujZm/rrTnBRTvBLQUJ8kkzEb/PPkV
+i1fyQzDuwolYuEHgL6hCU76ZKPgIqODuY1mOArPTbj8FbsUqEBphgYj0Y2aMiC+N
+INcY5fN3acz1cuhU0iRHyaAPr2XIcfOEZQB1cPFpOv7Tbpna4GueKntmMMgHrkn+
+qvFJzx4rVmFnNh0cGkuc/rvO5ZGLIoM0ROlFrVKhYKT3/34AbDKBFxcLuHUakxvj
+rBx7j9O3AgMBAAECggEABZ+8uxdWnQYl+4xlV5E0gmTx3dh8Qd351UfFsW0demDr
+lU1SI3I4I/Lelv8lyrLXZzjcwPfmezfec6RnF37p7ijSPgrIG2PLplCqJsy6BzK1
+ycH/yaYm6sIFSBqdF+ZO5QOaGOWZpA9lgsYHNVt/jdvJCq/50ZhJZO2fvfi9dr4I
+vLjcCX57t+V9n68zHCdw8pTw3eSvO34wv8FXXQyofYi6+swoV/NhGFS1xMlc2USO
+KQ0Do/Y8Dxr/5HawoiMTzO/o4M0Bdmb237fW4D0yVqaevjVWKe/wq2q3VZyBatB2
+XDMkL1ZaWiRsRZHoliiIh3K3gQ2jmtsMXjzv+IKdvQKBgQDgPsk7y5Ms5rjArL8g
+qCP2o8a/IvxzCwxcvK59nfmWFuFeJsxE3uvp89UriqC6yGD5yxAmjDKvHOFtV+CE
+KjCnMgt/jU6BpkaHzTRR8Gtt/RkILZTZiKoNdEgOTeBjHKCoOUoM7Dc78nW7Dp0F
+QoLdAe0g0pSRy5iFcWBiX7UP5QKBgQDOBBRfnaU6fICVH0SmqBoKVSCDm+saYMAW
+99mypm2xViP4VQOa1QjNRiEN9kllxD4I+S48kALSCpif+A/IE89bNgFNEOvTYbkW
++mvjoFLQtN79Tc8/G0CEi+WhRWWpY9WnMuzj1r/pAbC8uOEKvJ+tYfKmHZN5kvoC
+k0e2yMCDawKBgFi6Hw9sxkgO5m0+LMW0Ib62IK6CHlc6uOJ8uaH0fsvXM8b4HPzn
+I3tHQkJfMKeXH1/W7AYElQ1apQuJqMlClEujbo9CjxyXePLEy/3b3fYAHgZxWqMU
+Aw0dxGD8iVtN+Xd2a4lfcZ9jmRexeYmaPoNJ/tRs3eIuJ6QtLxDdg5vNAoGBAIqU
+C/BVZrN01Dl7Ev7XzMxufrSIyRixRAUvK20Urmy/eOqupQIdkxIhvlJZ/P1LiD8Y
+/pUWeg83uXrBrjvzt2OvbCie3UMPVSWzxacUTSC+ydCx6lqUxk1inVBiEgRjd3BE
+vTx1VBo0XOJVqmtCflZusH41HuKEj0/0KiU13OmJAoGAYkxy/U6uHHn6xB3KriID
+bZgfYRlLv1bD4AYiOcjFke3/4MZJ2U4t/x6uzEjQZd/0waSeE3YY/MfEXufdHM99
+ZUlAHwLhjLcY58HgkyMkw4sRaHYxTQdOuxcnmzX1+sHKxKXlYoboLgh8Qf9A4DcR
+HZde9n1uVLVtlBRTjjL5O84=
+-----END PRIVATE KEY-----
diff --git a/docker/thirdparties/docker-compose/mysql/mysql-5.7.yaml.tpl
b/docker/thirdparties/docker-compose/mysql/mysql-5.7.yaml.tpl
index 3ceeaa313e1..9fe7b1a38ef 100644
--- a/docker/thirdparties/docker-compose/mysql/mysql-5.7.yaml.tpl
+++ b/docker/thirdparties/docker-compose/mysql/mysql-5.7.yaml.tpl
@@ -20,7 +20,6 @@ version: "2.1"
services:
doris--mysql_57:
image: mysql:5.7.36
- command: --default-authentication-plugin=mysql_native_password
restart: always
environment:
MYSQL_ROOT_PASSWORD: 123456
@@ -29,15 +28,31 @@ services:
LANG: C.UTF-8
ports:
- ${DOCKER_MYSQL_57_EXTERNAL_PORT}:3306
+ entrypoint:
+ - bash
+ - -c
+ - |
+ chown mysql:mysql /etc/mysql/certs/*
+ chmod 600 /etc/mysql/certs/server.key
+ chmod 644 /etc/mysql/certs/server.crt /etc/mysql/certs/root.crt
+ exec docker-entrypoint.sh "$@"
+ - --
+ command:
+ - "mysqld"
+ - "--default-authentication-plugin=mysql_native_password"
+ - "--ssl-ca=/etc/mysql/certs/root.crt"
+ - "--ssl-cert=/etc/mysql/certs/server.crt"
+ - "--ssl-key=/etc/mysql/certs/server.key"
healthcheck:
test: mysqladmin ping -h 127.0.0.1 -u root
--password=$$MYSQL_ROOT_PASSWORD && mysql -h 127.0.0.1 -u root
--password=$$MYSQL_ROOT_PASSWORD -e "SELECT 1 FROM doris_test.deadline;"
interval: 5s
timeout: 60s
retries: 120
volumes:
- - ./data/:/var/lib/mysql
+ - ./data/:/var/lib/mysql
- ./init:/docker-entrypoint-initdb.d
- ./my.cnf:/etc/mysql/conf.d/my.cnf
+ - ./certs:/etc/mysql/certs
networks:
- doris--mysql_57
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index d31794766ad..72322da2668 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -40,6 +40,10 @@ public class DataSourceConfigKeys {
public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
public static final String SSL_MODE = "ssl_mode";
public static final String SSL_ROOTCERT = "ssl_rootcert";
+ // PG-style spelling; MySQL normalizes to underscore form.
+ public static final String SSL_MODE_DISABLE = "disable";
+ public static final String SSL_MODE_REQUIRE = "require";
+ public static final String SSL_MODE_VERIFY_CA = "verify-ca";
// PostgreSQL replication slot and publication config
public static final String SLOT_NAME = "slot_name";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index c4aad00d33a..f37cbbff5dd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -56,6 +56,12 @@ public class DataSourceConfigValidator {
DataSourceConfigKeys.PUBLICATION_NAME
);
+ private static final Set<String> ALLOW_SSL_MODES = Sets.newHashSet(
+ DataSourceConfigKeys.SSL_MODE_DISABLE,
+ DataSourceConfigKeys.SSL_MODE_REQUIRE,
+ DataSourceConfigKeys.SSL_MODE_VERIFY_CA
+ );
+
// Known suffixes for per-table config keys (format:
"table.<tableName>.<suffix>")
private static final Set<String> ALLOW_TABLE_LEVEL_SUFFIXES =
Sets.newHashSet(
DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX,
@@ -102,6 +108,16 @@ public class DataSourceConfigValidator {
throw new IllegalArgumentException("Invalid value for key '" +
key + "': " + value);
}
}
+
+ // Cross-field: verify-ca must be paired with a CA cert; otherwise the
reader will
+ // silently fall back to the JVM default truststore and likely fail to
connect.
+ if
(DataSourceConfigKeys.SSL_MODE_VERIFY_CA.equals(input.get(DataSourceConfigKeys.SSL_MODE))
+ && (input.get(DataSourceConfigKeys.SSL_ROOTCERT) == null
+ ||
input.get(DataSourceConfigKeys.SSL_ROOTCERT).trim().isEmpty())) {
+ throw new IllegalArgumentException(
+ "ssl_mode '" + DataSourceConfigKeys.SSL_MODE_VERIFY_CA
+ + "' requires ssl_rootcert to be set");
+ }
}
public static void validateTarget(Map<String, String> input) throws
IllegalArgumentException {
@@ -144,6 +160,9 @@ public class DataSourceConfigValidator {
return value.length() <= PG_MAX_IDENTIFIER_LENGTH
&& PG_IDENTIFIER_PATTERN.matcher(value).matches();
}
+ if (key.equals(DataSourceConfigKeys.SSL_MODE) &&
!ALLOW_SSL_MODES.contains(value)) {
+ return false;
+ }
return true;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
index 2f71b7664a8..ce570f440a8 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
@@ -30,6 +30,87 @@ public class DataSourceConfigValidatorTest {
private static final int PG_MAX_IDENTIFIER_LENGTH = 63;
+ private static Map<String, String> sslModeInput(String value) {
+ Map<String, String> input = new HashMap<>();
+ input.put(DataSourceConfigKeys.SSL_MODE, value);
+ return input;
+ }
+
+ @Test
+ public void testSslModeLegalValues() {
+ DataSourceConfigValidator.validateSource(
+ sslModeInput(DataSourceConfigKeys.SSL_MODE_DISABLE),
DataSourceType.MYSQL.name());
+ DataSourceConfigValidator.validateSource(
+ sslModeInput(DataSourceConfigKeys.SSL_MODE_REQUIRE),
DataSourceType.MYSQL.name());
+ // verify-ca additionally requires ssl_rootcert; covered by
testVerifyCaWithRootcertPasses.
+ }
+
+ @Test
+ public void testSslModeRejectsMysqlUnderscoreSpelling() {
+ assertReject(sslModeInput("verify_ca"));
+ }
+
+ @Test
+ public void testSslModeRejectsVerifyFull() {
+ assertReject(sslModeInput("verify-full"));
+ }
+
+ @Test
+ public void testSslModeRejectsPreferredAndAllow() {
+ assertReject(sslModeInput("preferred"));
+ assertReject(sslModeInput("prefer"));
+ assertReject(sslModeInput("allow"));
+ }
+
+ @Test
+ public void testSslModeRejectsUppercaseVariants() {
+ assertReject(sslModeInput("DISABLE"));
+ assertReject(sslModeInput("Verify-CA"));
+ }
+
+ @Test
+ public void testSslModeRejectsEmpty() {
+ assertReject(sslModeInput(""));
+ }
+
+ @Test
+ public void testSslModeOptional() {
+ // ssl_mode is not required; validateSource should pass when absent
+ Map<String, String> input = new HashMap<>();
+ input.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://host/db");
+ DataSourceConfigValidator.validateSource(input,
DataSourceType.MYSQL.name());
+ }
+
+ @Test
+ public void testVerifyCaRequiresRootcert() {
+ Map<String, String> input =
sslModeInput(DataSourceConfigKeys.SSL_MODE_VERIFY_CA);
+ assertReject(input);
+ }
+
+ @Test
+ public void testVerifyCaWithRootcertPasses() {
+ Map<String, String> input =
sslModeInput(DataSourceConfigKeys.SSL_MODE_VERIFY_CA);
+ input.put(DataSourceConfigKeys.SSL_ROOTCERT, "FILE:ca.pem");
+ DataSourceConfigValidator.validateSource(input,
DataSourceType.MYSQL.name());
+ }
+
+ @Test
+ public void testDisableWithoutRootcertPasses() {
+ DataSourceConfigValidator.validateSource(
+ sslModeInput(DataSourceConfigKeys.SSL_MODE_DISABLE),
DataSourceType.MYSQL.name());
+ DataSourceConfigValidator.validateSource(
+ sslModeInput(DataSourceConfigKeys.SSL_MODE_REQUIRE),
DataSourceType.MYSQL.name());
+ }
+
+ private static void assertReject(Map<String, String> input) {
+ try {
+ DataSourceConfigValidator.validateSource(input,
DataSourceType.MYSQL.name());
+ Assert.fail("expected IllegalArgumentException for input: " +
input);
+ } catch (IllegalArgumentException ignored) {
+ // expected
+ }
+ }
+
@Test
public void testSlotNameAndPublicationNameAllowed() {
Map<String, String> props = new HashMap<>();
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 9aa268ef09b..bf8ac56312b 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -25,6 +25,7 @@ import
org.apache.doris.cdcclient.source.reader.SnapshotReaderContext;
import org.apache.doris.cdcclient.source.reader.SplitReadResult;
import org.apache.doris.cdcclient.source.reader.SplitRecords;
import org.apache.doris.cdcclient.utils.ConfigUtil;
+import org.apache.doris.cdcclient.utils.SmallFileMgr;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.cdc.request.CompareOffsetRequest;
import org.apache.doris.job.cdc.request.FetchTableSplitsRequest;
@@ -878,6 +879,28 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
dbzProps.setProperty(
MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS.name(),
DEBEZIUM_HEARTBEAT_INTERVAL_MS + "");
+
+ if (cdcConfig.containsKey(DataSourceConfigKeys.SSL_MODE)) {
+ String normalized =
+
normalizeSslModeForMysql(cdcConfig.get(DataSourceConfigKeys.SSL_MODE));
+ dbzProps.put("database.ssl.mode", normalized);
+ // Flink CDC's forked MySqlConnection drops Debezium SSL props
from the snapshot
+ // JDBC URL, so mirror to Connector/J native names.
+ jdbcProperteis.put("sslMode", normalized);
+ }
+ if (cdcConfig.containsKey(DataSourceConfigKeys.SSL_ROOTCERT)) {
+ String fileName = cdcConfig.get(DataSourceConfigKeys.SSL_ROOTCERT);
+ String truststorePath =
SmallFileMgr.getPkcs12TruststorePath(fileName);
+ LOG.info("Using SSL truststore file path: {}", truststorePath);
+ dbzProps.put("database.ssl.truststore", truststorePath);
+ dbzProps.put("database.ssl.truststore.password",
SmallFileMgr.TRUSTSTORE_PASSWORD);
+ jdbcProperteis.put("trustCertificateKeyStoreUrl", "file:" +
truststorePath);
+ // Connector/J defaults keystore type to JKS; we generate PKCS12.
+ jdbcProperteis.put("trustCertificateKeyStoreType", "PKCS12");
+ jdbcProperteis.put(
+ "trustCertificateKeyStorePassword",
SmallFileMgr.TRUSTSTORE_PASSWORD);
+ }
+
configFactory.debeziumProperties(dbzProps);
configFactory.heartbeatInterval(Duration.ofMillis(DEBEZIUM_HEARTBEAT_INTERVAL_MS));
@@ -1056,6 +1079,31 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
return serializer.deserialize(config, element);
}
+ /** Map Doris ssl_mode (PG-style) to Debezium MySQL's underscore spelling.
*/
+ static String normalizeSslModeForMysql(String sslMode) {
+ if (sslMode == null) {
+ throw new IllegalArgumentException("ssl_mode must not be null");
+ }
+ switch (sslMode) {
+ case DataSourceConfigKeys.SSL_MODE_DISABLE:
+ return
MySqlConnectorConfig.SecureConnectionMode.DISABLED.getValue();
+ case DataSourceConfigKeys.SSL_MODE_REQUIRE:
+ return
MySqlConnectorConfig.SecureConnectionMode.REQUIRED.getValue();
+ case DataSourceConfigKeys.SSL_MODE_VERIFY_CA:
+ return
MySqlConnectorConfig.SecureConnectionMode.VERIFY_CA.getValue();
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported ssl_mode for MySQL: '"
+ + sslMode
+ + "'. Allowed: "
+ + String.join(
+ ", ",
+ DataSourceConfigKeys.SSL_MODE_DISABLE,
+ DataSourceConfigKeys.SSL_MODE_REQUIRE,
+
DataSourceConfigKeys.SSL_MODE_VERIFY_CA));
+ }
+ }
+
/**
* Filtered record iterator that only returns data change records,
filtering out watermark,
* heartbeat and other events. This is a private static inner class that
encapsulates record
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java
index 1e854097883..6f9616cadb7 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java
@@ -29,7 +29,15 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -47,9 +55,17 @@ public class SmallFileMgr {
private static final String FILE_PREFIX = "FILE:";
+ private static final String PKCS12_SUFFIX = ".p12";
+
+ /** JCA-required placeholder; a public-CA-only truststore has no secret to
protect. */
+ public static final String TRUSTSTORE_PASSWORD = "changeit";
+
/** In-memory cache: "file_id:md5" -> absolute local file path */
private static final Map<String, String> MEM_CACHE = new
ConcurrentHashMap<>();
+ /** In-memory cache for PKCS12 truststores derived from PEM CA certs. */
+ private static final Map<String, String> PKCS12_CACHE = new
ConcurrentHashMap<>();
+
/**
* Per-key locks to serialize concurrent downloads of the same file,
preventing tmp file
* corruption when multiple threads race on the same file_id:md5 key.
@@ -216,9 +232,86 @@ public class SmallFileMgr {
}
}
+ /**
+ * Resolve a FILE: reference to a PKCS12 truststore path, converting the
PEM on first access.
+ * For connectors (e.g. Debezium MySQL) that require JKS/PKCS12 rather
than raw PEM.
+ *
+ * @param filePath FILE reference, format: FILE:{file_id}:{md5}
+ * @return absolute local path to the PKCS12 truststore
+ */
+ public static String getPkcs12TruststorePath(String filePath) {
+ return pkcs12TruststorePath(getFilePath(filePath));
+ }
+
+ /** Package-private overload that accepts a custom local directory, used
for testing. */
+ static String getPkcs12TruststorePath(
+ String feMasterAddress, String filePath, String clusterToken,
String localDir) {
+ return pkcs12TruststorePath(getFilePath(feMasterAddress, filePath,
clusterToken, localDir));
+ }
+
+ private static String pkcs12TruststorePath(String pemPath) {
+ String cached = PKCS12_CACHE.get(pemPath);
+ if (cached != null && new File(cached).exists()) {
+ return cached;
+ }
+ Object lock = DOWNLOAD_LOCKS.computeIfAbsent(pemPath + PKCS12_SUFFIX,
k -> new Object());
+ synchronized (lock) {
+ String doubleChecked = PKCS12_CACHE.get(pemPath);
+ if (doubleChecked != null && new File(doubleChecked).exists()) {
+ return doubleChecked;
+ }
+ String p12Path = pemPath + PKCS12_SUFFIX;
+ if (!new File(p12Path).exists()) {
+ convertPemToPkcs12(pemPath, p12Path);
+ }
+ PKCS12_CACHE.put(pemPath, p12Path);
+ return p12Path;
+ }
+ }
+
+ private static void convertPemToPkcs12(String pemPath, String p12Path) {
+ Path tmpFile;
+ try {
+ Path p12 = Paths.get(p12Path);
+ tmpFile = Files.createTempFile(p12.getParent(), "p12-", ".tmp");
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create tmp file for PKCS12
truststore", e);
+ }
+ try {
+ KeyStore keyStore = KeyStore.getInstance("PKCS12");
+ keyStore.load(null);
+ CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ try (InputStream in = new FileInputStream(pemPath)) {
+ // A CA PEM may contain a chain (intermediate + root); import
each with a
+ // distinct alias, otherwise later entries overwrite earlier
ones.
+ int i = 0;
+ for (Certificate cert : cf.generateCertificates(in)) {
+ keyStore.setCertificateEntry("ca" + (i++), cert);
+ }
+ }
+ try (OutputStream os = Files.newOutputStream(tmpFile)) {
+ keyStore.store(os, TRUSTSTORE_PASSWORD.toCharArray());
+ }
+ Files.move(
+ tmpFile,
+ Paths.get(p12Path),
+ StandardCopyOption.ATOMIC_MOVE,
+ StandardCopyOption.REPLACE_EXISTING);
+ LOG.info("Generated PKCS12 truststore: {}", p12Path);
+ } catch (Exception e) {
+ try {
+ Files.deleteIfExists(tmpFile);
+ } catch (IOException ignored) {
+ // best effort
+ }
+ throw new RuntimeException("Failed to convert PEM to PKCS12: " +
pemPath, e);
+ }
+ }
+
/** Clears the in-memory cache. Exposed for testing. */
static void clearCache() {
MEM_CACHE.clear();
+ PKCS12_CACHE.clear();
DOWNLOAD_LOCKS.clear();
}
}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java
new file mode 100644
index 00000000000..1192df291d3
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader.mysql;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.jupiter.api.Test;
+
+public class MySqlSourceReaderTest {
+
+ @Test
+ void testNormalizeSslModeMapsAllLegalValues() {
+ assertEquals("disabled",
MySqlSourceReader.normalizeSslModeForMysql("disable"));
+ assertEquals("required",
MySqlSourceReader.normalizeSslModeForMysql("require"));
+ assertEquals("verify_ca",
MySqlSourceReader.normalizeSslModeForMysql("verify-ca"));
+ }
+
+ @Test
+ void testNormalizeSslModeRejectsNull() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> MySqlSourceReader.normalizeSslModeForMysql(null));
+ }
+
+ @Test
+ void testNormalizeSslModeRejectsMysqlUnderscoreSpelling() {
+ // FE validator rejects this, but guard reader-side too.
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> MySqlSourceReader.normalizeSslModeForMysql("verify_ca"));
+ }
+
+ @Test
+ void testNormalizeSslModeRejectsVerifyFull() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
MySqlSourceReader.normalizeSslModeForMysql("verify-full"));
+ }
+
+ @Test
+ void testNormalizeSslModeRejectsUppercase() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> MySqlSourceReader.normalizeSslModeForMysql("DISABLE"));
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java
index ae99d4db986..24bb4f5e615 100644
---
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SmallFileMgrTest.java
@@ -25,9 +25,13 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyStore;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -37,6 +41,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -375,4 +380,118 @@ class SmallFileMgrTest {
server.stop(0);
}
}
+
+ //
-------------------------------------------------------------------------
+ // PKCS12 truststore conversion
+ //
-------------------------------------------------------------------------
+
+ private static final String CA_PEM =
+ "-----BEGIN CERTIFICATE-----\n"
+ +
"MIIDDzCCAfegAwIBAgIULswy9ovSHXeKSxoEen2Y3xEZqBgwDQYJKoZIhvcNAQEL\n"
+ +
"BQAwFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMB4XDTI2MDMwMzA4MjMxM1oXDTM2\n"
+ +
"MDIyOTA4MjMxM1owFzEVMBMGA1UEAwwMTG9jYWwtRGV2LUNBMIIBIjANBgkqhkiG\n"
+ +
"9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsVFJhj3Y7zamNZiq9SefnnKAKaOXXUbXo/Fq\n"
+ +
"V6VNzMSkZuwDfRo/RKjvVaUru/JSd7QoV5zGyUYb+oHx/R233R1M0sd23+eR1mRQ\n"
+ +
"w771DmXthbdpIPBEwlmh0LMsiH9cJ7R2iRigCzfd2/SbJC3cvX6CtzyNqSkZboVO\n"
+ +
"fswkotF4ZaJgOiBile4A/zWWqeA07QVd8tusdxaoOJv0E/pjcLi5peGXtQA6SSj4\n"
+ +
"tp20K/tlrRS1Zc0dKgxU7YohxNBwW4QF0uOVR/QBmfzEpMdxKlwcEnHubPAemgt1\n"
+ +
"bp9g9Buwo7oWMvDJuS40xMPOlDhshrzNM8CoWIihgndMPG/LsQIDAQABo1MwUTAd\n"
+ +
"BgNVHQ4EFgQUHBKhmdKPD+b1xDjzzkQVaVETSfUwHwYDVR0jBBgwFoAUHBKhmdKP\n"
+ +
"D+b1xDjzzkQVaVETSfUwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOC\n"
+ +
"AQEAnueVOIAk/XLQx3msDY58Reo+D1f/AUy/WTPzxeXCxXLScrjFCLXjrIDzgslN\n"
+ +
"WnP7E5xNJxdrWgskS36IJxVg0+cUfy5kQYYfmWo1vOYdW/AMNBdQwmK5ve3r3Z/3\n"
+ +
"dE2cV4uvL6n0iZZMxnsL5KXwLeSQeTtJepvWi27Z0t8P23lJHJKfl/Ek49ILIDgB\n"
+ +
"zZIMKPgm6w7/U3jUWMUyQ+iI/XiEPrnn4url1FNViC8ucoIm8EU4ZE01j1mbZO8M\n"
+ +
"JSa6InQEIx/1P675qYtuKWF75Tq/qU7+uX7/07AiTyYSrHMT+024TfbRCi1PF/Ka\n"
+ + "cx+pSJLima+3GHhK2Rj437yx1Q==\n"
+ + "-----END CERTIFICATE-----\n";
+
+ private String preloadPem(String fileId, byte[] pemBytes) throws
IOException {
+ String md5 = DigestUtils.md5Hex(pemBytes);
+ File cachedFile = tempDir.resolve(fileId + "." + md5).toFile();
+ Files.write(cachedFile.toPath(), pemBytes);
+ return "FILE:" + fileId + ":" + md5;
+ }
+
+ private KeyStore loadPkcs12(String p12Path) throws Exception {
+ KeyStore keyStore = KeyStore.getInstance("PKCS12");
+ try (InputStream in = Files.newInputStream(Paths.get(p12Path))) {
+ keyStore.load(in, SmallFileMgr.TRUSTSTORE_PASSWORD.toCharArray());
+ }
+ return keyStore;
+ }
+
+ @Test
+ void testPkcs12SingleCertConversion() throws Exception {
+ String filePath = preloadPem("40001", CA_PEM.getBytes());
+ String p12Path = SmallFileMgr.getPkcs12TruststorePath(
+ "host:8030", filePath, "token", tempDir.toString());
+
+ assertTrue(p12Path.endsWith(".p12"));
+ assertTrue(new File(p12Path).exists());
+
+ KeyStore keyStore = loadPkcs12(p12Path);
+ assertEquals(1, keyStore.size());
+ assertTrue(keyStore.containsAlias("ca0"));
+ }
+
+ /**
+ * PEM with a chain (intermediate + root) must produce one keystore entry
per certificate.
+ * Using the same cert twice here is sufficient to prove alias uniqueness
- without distinct
+ * aliases the second entry would silently overwrite the first.
+ */
+ @Test
+ void testPkcs12MultipleCertsPreserveAllEntries() throws Exception {
+ String chainPem = CA_PEM + CA_PEM;
+ String filePath = preloadPem("40002", chainPem.getBytes());
+ String p12Path = SmallFileMgr.getPkcs12TruststorePath(
+ "host:8030", filePath, "token", tempDir.toString());
+
+ KeyStore keyStore = loadPkcs12(p12Path);
+ assertEquals(2, keyStore.size(), "chain with 2 certs must produce 2
entries");
+ assertTrue(keyStore.containsAlias("ca0"));
+ assertTrue(keyStore.containsAlias("ca1"));
+ }
+
+ @Test
+ void testPkcs12SecondCallUsesMemoryCacheWhenFilePresent() throws Exception
{
+ String filePath = preloadPem("40003", CA_PEM.getBytes());
+ String first = SmallFileMgr.getPkcs12TruststorePath(
+ "host:8030", filePath, "token", tempDir.toString());
+
+ long firstMtime = new File(first).lastModified();
+ String second = SmallFileMgr.getPkcs12TruststorePath(
+ "host:8030", filePath, "token", tempDir.toString());
+ assertEquals(first, second);
+ assertEquals(firstMtime, new File(second).lastModified(),
+ "second call should hit memory cache and not regenerate .p12");
+ }
+
+ @Test
+ void testPkcs12RegeneratesWhenCachedFileMissing() throws Exception {
+ String filePath = preloadPem("40005", CA_PEM.getBytes());
+ String first = SmallFileMgr.getPkcs12TruststorePath(
+ "host:8030", filePath, "token", tempDir.toString());
+
+ // Simulate external deletion after the cache entry was stored.
+ assertTrue(new File(first).delete());
+
+ String second = SmallFileMgr.getPkcs12TruststorePath(
+ "host:8030", filePath, "token", tempDir.toString());
+ assertEquals(first, second);
+ assertTrue(new File(second).exists(),
+ "cached path whose file disappeared should be regenerated on
next call");
+ }
+
+ @Test
+ void testPkcs12InvalidPemThrows() throws Exception {
+ byte[] invalid = ("-----BEGIN CERTIFICATE-----\n"
+ + "this-is-not-valid-base64!!!\n"
+ + "-----END CERTIFICATE-----\n").getBytes();
+ String filePath = preloadPem("40004", invalid);
+
+ assertThrows(RuntimeException.class,
+ () -> SmallFileMgr.getPkcs12TruststorePath(
+ "host:8030", filePath, "token", tempDir.toString()));
+ }
}
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.out
new file mode 100644
index 00000000000..c7ca9579309
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot --
+A1 1
+B1 2
+
+-- !select_incremental --
+B1 20
+C1 3
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.out
new file mode 100644
index 00000000000..93141f92358
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot_table1 --
+A1 1
+B1 2
+
+-- !select_binlog_table1 --
+B1 10
+Doris 18
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.out
new file mode 100644
index 00000000000..8d922a718f1
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot --
+1 Alice
+2 Bob
+
+-- !select_incremental --
+2 Bob_v2
+3 Carol
+
+-- !select_merge_snapshot --
+100 Src1_A
+200 Src2_A
+
+-- !select_merge_incremental --
+100 Src1_A
+101 Src1_B
+200 Src2_A
+201 Src2_B
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.groovy
new file mode 100644
index 00000000000..f89776cd231
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_col_filter.groovy
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_col_filter",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_col_filter"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_mysql_col_filter"
+ def mysqlDb = "test_cdc_db_col_filter"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}_err1'"""
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}_err2'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // Create MySQL table with an extra "secret" column to be excluded
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """CREATE TABLE ${mysqlDb}.${table1} (
+ `name` varchar(200) NOT NULL,
+ `age` int,
+ `secret` varchar(200),
+ PRIMARY KEY (`name`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${table1} VALUES ('A1', 1,
'secret_A1')"""
+ sql """INSERT INTO ${mysqlDb}.${table1} VALUES ('B1', 2,
'secret_B1')"""
+ }
+
+ // ── Validation: exclude a non-existent column should fail
──────────────
+ try {
+ sql """CREATE JOB ${jobName}_err1
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "table.${table1}.exclude_columns" = "nonexistent_col"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )"""
+ assert false : "Should have thrown exception for non-existent
excluded column"
+ } catch (Exception e) {
+ log.info("Expected error for non-existent column: " + e.message)
+ assert e.message.contains("does not exist") : "Unexpected error
message: " + e.message
+ }
+
+ // ── Validation: exclude a PK column should fail
────────────────────────
+ try {
+ sql """CREATE JOB ${jobName}_err2
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "table.${table1}.exclude_columns" = "name"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )"""
+ assert false : "Should have thrown exception for excluding PK
column"
+ } catch (Exception e) {
+ log.info("Expected error for PK column: " + e.message)
+ assert e.message.contains("primary key") : "Unexpected error
message: " + e.message
+ }
+
+ // ── Main job: exclude "secret" column
──────────────────────────────────
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "table.${table1}.exclude_columns" = "secret"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )"""
+
+ // Verify Doris table was created WITHOUT the excluded column
+ def colNames = (sql """desc ${currentDb}.${table1}""").collect { it[0]
}
+ assert !colNames.contains("secret") : "Excluded column 'secret' must
not appear in Doris table"
+ assert colNames.contains("name")
+ assert colNames.contains("age")
+
+ // Wait for snapshot to complete
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(1,
SECONDS).until({
+ def cnt = sql """select SucceedTaskCount from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
+ cnt.size() == 1 && cnt.get(0).get(0).toLong() >= 2
+ })
+ } catch (Exception ex) {
+ def showJob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showTask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showJob)
+ log.info("show task: " + showTask)
+ throw ex
+ }
+
+ // Snapshot: only name and age, secret absent
+ qt_select_snapshot """ SELECT * FROM ${table1} ORDER BY name ASC """
+
+ // ── Incremental DML: secret values must not appear in Doris
───────────
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """INSERT INTO ${mysqlDb}.${table1} VALUES ('C1', 3,
'secret_C1')"""
+ sql """UPDATE ${mysqlDb}.${table1} SET age = 20, secret =
'updated_secret' WHERE name = 'B1'"""
+ sql """DELETE FROM ${mysqlDb}.${table1} WHERE name = 'A1'"""
+ }
+ // Wait until C1 appears and A1 is gone
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def names = (sql """ SELECT name FROM ${table1} ORDER BY name
ASC """).collect { it[0] }
+ names.contains('C1') && !names.contains('A1')
+ })
+ } catch (Exception ex) {
+ def showJob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showTask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showJob)
+ log.info("show task: " + showTask)
+ throw ex
+ }
+
+ qt_select_incremental """ SELECT * FROM ${table1} ORDER BY name ASC """
+
+ // Doris table still has no secret column after DML events on excluded
column
+ def colNamesAfterDml = (sql """desc ${currentDb}.${table1}""").collect
{ it[0] }
+ assert !colNamesAfterDml.contains("secret") : "secret column must not
appear in Doris after DML on excluded column"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name = '${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.groovy
new file mode 100644
index 00000000000..51359cb5983
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_ssl.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_ssl",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_name_ssl"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_mysql_normal1_ssl"
+ def mysqlDb = "test_cdc_db_ssl"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // create test
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """CREATE TABLE ${mysqlDb}.${table1} (
+ `name` varchar(200) NOT NULL,
+ `age` int DEFAULT NULL,
+ PRIMARY KEY (`name`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('A1',
1);"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('B1',
2);"""
+ }
+
+ try {
+ sql """DROP FILE "mysql_ca.pem" FROM ${currentDb} PROPERTIES
("catalog" = "streaming_job")"""
+ } catch (Exception ignored) {
+ // ignore
+ }
+
+ sql """CREATE FILE "mysql_ca.pem"
+ IN ${currentDb}
+ PROPERTIES
+ (
+ "url" =
"https://qa-build.oss-cn-beijing.aliyuncs.com/jianxu/root.crt",
+ "catalog" = "streaming_job"
+ )
+ """
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "initial",
+ "ssl_mode" = "verify-ca",
+ "ssl_rootcert" = "FILE:mysql_ca.pem"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+ def showAllTables = sql """ show tables from ${currentDb}"""
+ log.info("showAllTables: " + showAllTables)
+ // check table created
+ def showTables = sql """ show tables from ${currentDb} like
'${table1}'; """
+ assert showTables.size() == 1
+
+ // check job running
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount
from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
+ log.info("jobSuccendCount: " + jobSuccendCount)
+ // check job status and succeed task count larger than
2
+ jobSuccendCount.size() == 1 &&
jobSuccendCount.get(0).get(0).toString().toLong() >= 2L
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ // check snapshot data
+ qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name
asc """
+
+ // mock incremental into
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """INSERT INTO ${mysqlDb}.${table1} (name,age) VALUES
('Doris',18);"""
+ sql """UPDATE ${mysqlDb}.${table1} SET age = 10 WHERE name =
'B1';"""
+ sql """DELETE FROM ${mysqlDb}.${table1} WHERE name = 'A1';"""
+ }
+
+ // wait for cdc incremental data
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def names = (sql """ SELECT name FROM ${table1} ORDER BY name
ASC """).collect { it[0] }
+ names.contains('Doris') && !names.contains('A1')
+ })
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ // check incremental data
+ qt_select_binlog_table1 """ SELECT * FROM ${table1} order by name asc
"""
+
+ def jobInfo = sql """
+ select status from jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("jobInfo: " + jobInfo)
+ assert jobInfo.get(0).get(0) == "RUNNING"
+
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ try {
+ sql """DROP FILE "mysql_ca.pem" FROM ${currentDb} PROPERTIES
("catalog" = "streaming_job")"""
+ } catch (Exception ignored) {
+ // ignore
+ }
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.groovy
new file mode 100644
index 00000000000..305290391e6
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_table_mapping.groovy
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_table_mapping",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_table_mapping"
+ def jobNameMerge = "test_streaming_mysql_table_mapping_merge"
+ def currentDb = (sql "select database()")[0][0]
+ def mysqlSrcTable = "mysql_src_table" // upstream MySQL table name
+ def dorisDstTable = "doris_dst_table_mysql" // downstream Doris table
name (mapped)
+ def mysqlSrcTable2 = "mysql_src_table2" // second upstream table
(multi-table merge)
+ def dorisMergeTable = "doris_merge_table_mysql"
+ def mysqlDb = "test_cdc_db_table_mapping"
+
+ // Cleanup
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """DROP JOB IF EXISTS where jobname = '${jobNameMerge}'"""
+ sql """drop table if exists ${currentDb}.${dorisDstTable} force"""
+ sql """drop table if exists ${currentDb}.${dorisMergeTable} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // ── Case 1: basic table name mapping
─────────────────────────────────
+ // MySQL table: mysql_src_table → Doris table: doris_dst_table_mysql
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlSrcTable}"""
+ sql """CREATE TABLE ${mysqlDb}.${mysqlSrcTable} (
+ `id` int NOT NULL,
+ `name` varchar(200),
+ PRIMARY KEY (`id`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable} VALUES (1,
'Alice')"""
+ sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable} VALUES (2, 'Bob')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${mysqlSrcTable}",
+ "offset" = "initial",
+ "table.${mysqlSrcTable}.target_table" = "${dorisDstTable}"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )"""
+
+ // Verify the Doris table was created with the mapped name, not the
source name
+ def tables = (sql """show tables from ${currentDb}""").collect { it[0]
}
+ assert tables.contains(dorisDstTable) : "Doris target table
'${dorisDstTable}' should exist"
+ assert !tables.contains(mysqlSrcTable) : "Source table name
'${mysqlSrcTable}' must NOT exist in Doris"
+
+ // Wait for snapshot
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(1,
SECONDS).until({
+ def cnt = sql """select SucceedTaskCount from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
+ cnt.size() == 1 && cnt.get(0).get(0).toLong() >= 2
+ })
+ } catch (Exception ex) {
+ log.info("show job: " + (sql """select * from
jobs("type"="insert") where Name='${jobName}'"""))
+ log.info("show task: " + (sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ qt_select_snapshot """ SELECT * FROM ${dorisDstTable} ORDER BY id ASC
"""
+
+ // Incremental: INSERT / UPDATE / DELETE must all land in
doris_dst_table_mysql
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable} VALUES (3,
'Carol')"""
+ sql """UPDATE ${mysqlDb}.${mysqlSrcTable} SET name = 'Bob_v2'
WHERE id = 2"""
+ sql """DELETE FROM ${mysqlDb}.${mysqlSrcTable} WHERE id = 1"""
+ }
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def ids = (sql """ SELECT id FROM ${dorisDstTable} ORDER BY id
ASC """).collect { it[0].toInteger() }
+ ids.contains(3) && !ids.contains(1)
+ })
+ } catch (Exception ex) {
+ log.info("show job: " + (sql """select * from
jobs("type"="insert") where Name='${jobName}'"""))
+ log.info("show task: " + (sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ qt_select_incremental """ SELECT * FROM ${dorisDstTable} ORDER BY id
ASC """
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ // ── Case 2: multi-table merge (two MySQL tables → one Doris table) ──
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlSrcTable}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlSrcTable2}"""
+ sql """CREATE TABLE ${mysqlDb}.${mysqlSrcTable} (
+ `id` int NOT NULL,
+ `name` varchar(200),
+ PRIMARY KEY (`id`)
+ ) ENGINE=InnoDB"""
+ sql """CREATE TABLE ${mysqlDb}.${mysqlSrcTable2} (
+ `id` int NOT NULL,
+ `name` varchar(200),
+ PRIMARY KEY (`id`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable} VALUES (100,
'Src1_A')"""
+ sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable2} VALUES (200,
'Src2_A')"""
+ }
+
+ sql """CREATE JOB ${jobNameMerge}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${mysqlSrcTable},${mysqlSrcTable2}",
+ "offset" = "initial",
+ "table.${mysqlSrcTable}.target_table" =
"${dorisMergeTable}",
+ "table.${mysqlSrcTable2}.target_table" =
"${dorisMergeTable}"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )"""
+
+ // Wait for snapshot rows from both source tables
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
+ def ids = (sql """ SELECT id FROM ${dorisMergeTable}
""").collect { it[0].toInteger() }
+ ids.contains(100) && ids.contains(200)
+ })
+ } catch (Exception ex) {
+ log.info("show job: " + (sql """select * from
jobs("type"="insert") where Name='${jobNameMerge}'"""))
+ log.info("show task: " + (sql """select * from
tasks("type"="insert") where JobName='${jobNameMerge}'"""))
+ throw ex
+ }
+
+ qt_select_merge_snapshot """ SELECT * FROM ${dorisMergeTable} ORDER BY
id ASC """
+
+ // Incremental from both source tables
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable} VALUES (101,
'Src1_B')"""
+ sql """INSERT INTO ${mysqlDb}.${mysqlSrcTable2} VALUES (201,
'Src2_B')"""
+ }
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def ids = (sql """ SELECT id FROM ${dorisMergeTable}
""").collect { it[0].toInteger() }
+ ids.contains(101) && ids.contains(201)
+ })
+ } catch (Exception ex) {
+ log.info("show job: " + (sql """select * from
jobs("type"="insert") where Name='${jobNameMerge}'"""))
+ log.info("show task: " + (sql """select * from
tasks("type"="insert") where JobName='${jobNameMerge}'"""))
+ throw ex
+ }
+
+ qt_select_merge_incremental """ SELECT * FROM ${dorisMergeTable} ORDER
BY id ASC """
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobNameMerge}'"""
+ def mergeJobCnt = sql """select count(1) from jobs("type"="insert")
where Name = '${jobNameMerge}'"""
+ assert mergeJobCnt.get(0).get(0) == 0
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]