This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 79153d305ba [feat](paimon) support jdbc catalog type (#61094)
79153d305ba is described below
commit 79153d305bafcc0ae3df0150410e5a2cc145fba4
Author: Chenjunwei <[email protected]>
AuthorDate: Thu Mar 12 14:49:29 2026 +0800
[feat](paimon) support jdbc catalog type (#61094)
## Summary
- add Paimon JDBC metastore properties and driver loading support
- register paimon.catalog.type=jdbc in paimon properties/catalog factory
- allow paimon jdbc type in external table/system table thrift
conversion
- add FE unit tests for jdbc properties mapping and validation
## Test
- ./run-fe-ut.sh --run
org.apache.doris.datasource.property.metastore.PaimonJdbcMetaStorePropertiesTest
---
.../datasource/paimon/PaimonExternalCatalog.java | 1 +
.../paimon/PaimonExternalCatalogFactory.java | 1 +
.../datasource/paimon/PaimonExternalTable.java | 5 +-
.../datasource/paimon/PaimonSysExternalTable.java | 5 +-
.../metastore/PaimonJdbcMetaStoreProperties.java | 247 +++++++++++++++++++++
.../metastore/PaimonPropertiesFactory.java | 1 +
.../PaimonJdbcMetaStorePropertiesTest.java | 155 +++++++++++++
.../paimon/test_paimon_jdbc_catalog.out | 5 +
.../paimon/test_paimon_jdbc_catalog.groovy | 213 ++++++++++++++++++
9 files changed, 629 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
index 87f606dfcca..1cb69fa423b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -47,6 +47,7 @@ public class PaimonExternalCatalog extends ExternalCatalog {
public static final String PAIMON_HMS = "hms";
public static final String PAIMON_DLF = "dlf";
public static final String PAIMON_REST = "rest";
+ public static final String PAIMON_JDBC = "jdbc";
public static final String PAIMON_TABLE_CACHE_ENABLE =
"meta.cache.paimon.table.enable";
public static final String PAIMON_TABLE_CACHE_TTL_SECOND =
"meta.cache.paimon.table.ttl-second";
public static final String PAIMON_TABLE_CACHE_CAPACITY =
"meta.cache.paimon.table.capacity";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
index ae19907d932..affe4995f10 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
@@ -38,6 +38,7 @@ public class PaimonExternalCatalogFactory {
case PaimonExternalCatalog.PAIMON_FILESYSTEM:
case PaimonExternalCatalog.PAIMON_DLF:
case PaimonExternalCatalog.PAIMON_REST:
+ case PaimonExternalCatalog.PAIMON_JDBC:
return new PaimonExternalCatalog(catalogId, name, resource,
props, comment);
default:
throw new DdlException("Unknown " +
PaimonExternalCatalog.PAIMON_CATALOG_TYPE
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
index 8e17599c7dd..4c3ee9a3947 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
@@ -181,7 +181,8 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
if (PaimonExternalCatalog.PAIMON_HMS.equals(getPaimonCatalogType())
||
PaimonExternalCatalog.PAIMON_FILESYSTEM.equals(getPaimonCatalogType())
||
PaimonExternalCatalog.PAIMON_DLF.equals(getPaimonCatalogType())
- ||
PaimonExternalCatalog.PAIMON_REST.equals(getPaimonCatalogType())) {
+ ||
PaimonExternalCatalog.PAIMON_REST.equals(getPaimonCatalogType())
+ ||
PaimonExternalCatalog.PAIMON_JDBC.equals(getPaimonCatalogType())) {
THiveTable tHiveTable = new THiveTable(dbName, name, new
HashMap<>());
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(),
TTableType.HIVE_TABLE, schema.size(), 0,
getName(), dbName);
@@ -189,7 +190,7 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
return tTableDescriptor;
} else {
throw new IllegalArgumentException(
- "Currently only supports hms/dlf/rest/filesystem catalog,
do not support :"
+ "Currently only supports hms/dlf/rest/filesystem/jdbc
catalog, do not support: "
+ getPaimonCatalogType());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSysExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSysExternalTable.java
index 3245b909562..e2300e20555 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSysExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSysExternalTable.java
@@ -198,7 +198,8 @@ public class PaimonSysExternalTable extends ExternalTable {
if (PaimonExternalCatalog.PAIMON_HMS.equals(catalogType)
|| PaimonExternalCatalog.PAIMON_FILESYSTEM.equals(catalogType)
|| PaimonExternalCatalog.PAIMON_DLF.equals(catalogType)
- || PaimonExternalCatalog.PAIMON_REST.equals(catalogType)) {
+ || PaimonExternalCatalog.PAIMON_REST.equals(catalogType)
+ || PaimonExternalCatalog.PAIMON_JDBC.equals(catalogType)) {
THiveTable tHiveTable = new THiveTable(dbName, name, new
HashMap<>());
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(),
TTableType.HIVE_TABLE, schema.size(), 0,
getName(), dbName);
@@ -206,7 +207,7 @@ public class PaimonSysExternalTable extends ExternalTable {
return tTableDescriptor;
} else {
throw new IllegalArgumentException(
- "Currently only supports hms/dlf/rest/filesystem catalog,
do not support :" + catalogType);
+ "Currently only supports hms/dlf/rest/filesystem/jdbc
catalog, do not support: " + catalogType);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java
new file mode 100644
index 00000000000..2b32367aba1
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.catalog.JdbcResource;
+import
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.property.ConnectorProperty;
+import org.apache.doris.datasource.property.storage.HdfsProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.jdbc.JdbcCatalogFactory;
+import org.apache.paimon.options.CatalogOptions;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PaimonJdbcMetaStoreProperties extends AbstractPaimonProperties {
+ private static final Logger LOG =
LogManager.getLogger(PaimonJdbcMetaStoreProperties.class);
+ private static final String JDBC_PREFIX = "jdbc.";
+ private static final Map<URL, ClassLoader> DRIVER_CLASS_LOADER_CACHE = new
ConcurrentHashMap<>();
+ private static final Set<String> REGISTERED_DRIVER_KEYS =
ConcurrentHashMap.newKeySet();
+
+ @ConnectorProperty(
+ names = {"uri", "paimon.jdbc.uri"},
+ required = true,
+ description = "JDBC connection URI for the Paimon JDBC catalog."
+ )
+ private String uri = "";
+
+ @ConnectorProperty(
+ names = {"paimon.jdbc.user", "jdbc.user"},
+ required = false,
+ description = "Username for the Paimon JDBC catalog."
+ )
+ private String jdbcUser;
+
+ @ConnectorProperty(
+ names = {"paimon.jdbc.password", "jdbc.password"},
+ required = false,
+ sensitive = true,
+ description = "Password for the Paimon JDBC catalog."
+ )
+ private String jdbcPassword;
+
+ @ConnectorProperty(
+ names = {"paimon.jdbc.driver_url", "jdbc.driver_url"},
+ required = false,
+ description = "JDBC driver JAR file path or URL. "
+ + "Can be a local file name (will look in
$DORIS_HOME/plugins/jdbc_drivers/) "
+ + "or a full URL (http://, https://, file://)."
+ )
+ private String driverUrl;
+
+ @ConnectorProperty(
+ names = {"paimon.jdbc.driver_class", "jdbc.driver_class"},
+ required = false,
+ description = "JDBC driver class name. If specified with
paimon.jdbc.driver_url, "
+ + "the driver will be loaded dynamically."
+ )
+ private String driverClass;
+
+ protected PaimonJdbcMetaStoreProperties(Map<String, String> props) {
+ super(props);
+ }
+
+ @Override
+ public String getPaimonCatalogType() {
+ return PaimonExternalCatalog.PAIMON_JDBC;
+ }
+
+ @Override
+ protected void checkRequiredProperties() {
+ super.checkRequiredProperties();
+ if (StringUtils.isBlank(warehouse)) {
+ throw new IllegalArgumentException("Property warehouse is
required.");
+ }
+ }
+
+ @Override
+ public Catalog initializeCatalog(String catalogName,
List<StorageProperties> storagePropertiesList) {
+ buildCatalogOptions();
+ Configuration conf = new Configuration();
+ for (StorageProperties storageProperties : storagePropertiesList) {
+ if (storageProperties.getHadoopStorageConfig() != null) {
+ conf.addResource(storageProperties.getHadoopStorageConfig());
+ }
+ if
(storageProperties.getType().equals(StorageProperties.Type.HDFS)) {
+ this.executionAuthenticator = new
HadoopExecutionAuthenticator(((HdfsProperties) storageProperties)
+ .getHadoopAuthenticator());
+ }
+ }
+ appendUserHadoopConfig(conf);
+ if (StringUtils.isNotBlank(driverUrl)) {
+ registerJdbcDriver(driverUrl, driverClass);
+ LOG.info("Using dynamic JDBC driver for Paimon JDBC catalog from:
{}", driverUrl);
+ }
+ CatalogContext catalogContext = CatalogContext.create(catalogOptions,
conf);
+ try {
+ return this.executionAuthenticator.execute(() ->
CatalogFactory.createCatalog(catalogContext));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create Paimon catalog with
JDBC metastore: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected void appendCustomCatalogOptions() {
+ catalogOptions.set(CatalogOptions.URI.key(), uri);
+ addIfNotBlank("jdbc.user", jdbcUser);
+ addIfNotBlank("jdbc.password", jdbcPassword);
+ appendRawJdbcCatalogOptions();
+ }
+
+ @Override
+ protected String getMetastoreType() {
+ return JdbcCatalogFactory.IDENTIFIER;
+ }
+
+ private void addIfNotBlank(String key, String value) {
+ if (StringUtils.isNotBlank(value)) {
+ catalogOptions.set(key, value);
+ }
+ }
+
+ private void appendRawJdbcCatalogOptions() {
+ origProps.forEach((key, value) -> {
+ if (key != null && key.startsWith(JDBC_PREFIX) &&
!catalogOptions.keySet().contains(key)) {
+ catalogOptions.set(key, value);
+ }
+ });
+ }
+
+ /**
+ * Register JDBC driver with DriverManager.
+ * This is necessary because DriverManager.getConnection() doesn't use
Thread.contextClassLoader.
+ */
+ private void registerJdbcDriver(String driverUrl, String driverClassName) {
+ try {
+ if (StringUtils.isBlank(driverClassName)) {
+ throw new IllegalArgumentException(
+ "jdbc.driver_class or paimon.jdbc.driver_class is
required when jdbc.driver_url "
+ + "or paimon.jdbc.driver_url is specified");
+ }
+
+ String fullDriverUrl = JdbcResource.getFullDriverUrl(driverUrl);
+ URL url = new URL(fullDriverUrl);
+ String driverKey = fullDriverUrl + "#" + driverClassName;
+ if (!REGISTERED_DRIVER_KEYS.add(driverKey)) {
+ LOG.info("JDBC driver already registered for Paimon catalog:
{} from {}",
+ driverClassName, fullDriverUrl);
+ return;
+ }
+ try {
+ ClassLoader classLoader =
DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(url, u -> {
+ ClassLoader parent = getClass().getClassLoader();
+ return URLClassLoader.newInstance(new URL[] {u}, parent);
+ });
+ Class<?> loadedDriverClass = Class.forName(driverClassName,
true, classLoader);
+ java.sql.Driver driver = (java.sql.Driver)
loadedDriverClass.getDeclaredConstructor().newInstance();
+ java.sql.DriverManager.registerDriver(new DriverShim(driver));
+ LOG.info("Successfully registered JDBC driver for Paimon
catalog: {} from {}",
+ driverClassName, fullDriverUrl);
+ } catch (ClassNotFoundException e) {
+ REGISTERED_DRIVER_KEYS.remove(driverKey);
+ throw new IllegalArgumentException("Failed to load JDBC driver
class: " + driverClassName, e);
+ } catch (Exception e) {
+ REGISTERED_DRIVER_KEYS.remove(driverKey);
+ throw new RuntimeException("Failed to register JDBC driver: "
+ driverClassName, e);
+ }
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("Invalid driver URL: " +
driverUrl, e);
+ } catch (IllegalArgumentException e) {
+ throw e;
+ }
+ }
+
+ private static class DriverShim implements java.sql.Driver {
+ private final java.sql.Driver delegate;
+
+ DriverShim(java.sql.Driver delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public java.sql.Connection connect(String url, java.util.Properties
info) throws java.sql.SQLException {
+ return delegate.connect(url, info);
+ }
+
+ @Override
+ public boolean acceptsURL(String url) throws java.sql.SQLException {
+ return delegate.acceptsURL(url);
+ }
+
+ @Override
+ public java.sql.DriverPropertyInfo[] getPropertyInfo(String url,
java.util.Properties info)
+ throws java.sql.SQLException {
+ return delegate.getPropertyInfo(url, info);
+ }
+
+ @Override
+ public int getMajorVersion() {
+ return delegate.getMajorVersion();
+ }
+
+ @Override
+ public int getMinorVersion() {
+ return delegate.getMinorVersion();
+ }
+
+ @Override
+ public boolean jdbcCompliant() {
+ return delegate.jdbcCompliant();
+ }
+
+ @Override
+ public java.util.logging.Logger getParentLogger() throws
java.sql.SQLFeatureNotSupportedException {
+ return delegate.getParentLogger();
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
index 958daafd175..cc9b1ecef49 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
@@ -29,6 +29,7 @@ public class PaimonPropertiesFactory extends
AbstractMetastorePropertiesFactory
register("filesystem", PaimonFileSystemMetaStoreProperties::new);
register("hms", PaimonHMSMetaStoreProperties::new);
register("rest", PaimonRestMetaStoreProperties::new);
+ register("jdbc", PaimonJdbcMetaStoreProperties::new);
}
@Override
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java
new file mode 100644
index 00000000000..471d2f05a95
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+
+import org.apache.paimon.options.CatalogOptions;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PaimonJdbcMetaStorePropertiesTest {
+
+ @Test
+ public void testBasicJdbcProperties() throws Exception {
+ Map<String, String> props = new HashMap<>();
+ props.put("type", "paimon");
+ props.put("paimon.catalog.type", "jdbc");
+ props.put("uri", "jdbc:mysql://localhost:3306/paimon");
+ props.put("warehouse", "s3://warehouse/path");
+ props.put("paimon.jdbc.user", "paimon");
+ props.put("paimon.jdbc.password", "secret");
+
+ PaimonJdbcMetaStoreProperties jdbcProps =
(PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props);
+ jdbcProps.initNormalizeAndCheckProps();
+ jdbcProps.buildCatalogOptions();
+
+ Assertions.assertEquals(PaimonExternalCatalog.PAIMON_JDBC,
jdbcProps.getPaimonCatalogType());
+ Assertions.assertEquals("jdbc",
jdbcProps.getCatalogOptions().get(CatalogOptions.METASTORE.key()));
+ Assertions.assertEquals("jdbc:mysql://localhost:3306/paimon",
+ jdbcProps.getCatalogOptions().get(CatalogOptions.URI.key()));
+ Assertions.assertEquals("paimon",
jdbcProps.getCatalogOptions().get("jdbc.user"));
+ Assertions.assertEquals("secret",
jdbcProps.getCatalogOptions().get("jdbc.password"));
+ }
+
+ @Test
+ public void testJdbcPrefixPassthrough() throws Exception {
+ Map<String, String> props = new HashMap<>();
+ props.put("type", "paimon");
+ props.put("paimon.catalog.type", "jdbc");
+ props.put("uri", "jdbc:mysql://localhost:3306/paimon");
+ props.put("warehouse", "s3://warehouse/path");
+ props.put("paimon.jdbc.useSSL", "true");
+ props.put("paimon.jdbc.verifyServerCertificate", "true");
+
+ PaimonJdbcMetaStoreProperties jdbcProps =
(PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props);
+ jdbcProps.initNormalizeAndCheckProps();
+ jdbcProps.buildCatalogOptions();
+
+ Assertions.assertEquals("true",
jdbcProps.getCatalogOptions().get("jdbc.useSSL"));
+ Assertions.assertEquals("true",
jdbcProps.getCatalogOptions().get("jdbc.verifyServerCertificate"));
+ }
+
+ @Test
+ public void testRawJdbcPrefixPassthrough() throws Exception {
+ Map<String, String> props = new HashMap<>();
+ props.put("type", "paimon");
+ props.put("paimon.catalog.type", "jdbc");
+ props.put("uri", "jdbc:mysql://localhost:3306/paimon");
+ props.put("warehouse", "s3://warehouse/path");
+ props.put("jdbc.user", "raw_user");
+ props.put("jdbc.password", "raw_password");
+ props.put("jdbc.useSSL", "true");
+ props.put("jdbc.verifyServerCertificate", "true");
+
+ PaimonJdbcMetaStoreProperties jdbcProps =
(PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props);
+ jdbcProps.initNormalizeAndCheckProps();
+ jdbcProps.buildCatalogOptions();
+
+ Assertions.assertEquals("raw_user",
jdbcProps.getCatalogOptions().get("jdbc.user"));
+ Assertions.assertEquals("raw_password",
jdbcProps.getCatalogOptions().get("jdbc.password"));
+ Assertions.assertEquals("true",
jdbcProps.getCatalogOptions().get("jdbc.useSSL"));
+ Assertions.assertEquals("true",
jdbcProps.getCatalogOptions().get("jdbc.verifyServerCertificate"));
+ }
+
+ @Test
+ public void testFactoryCreateJdbcType() throws Exception {
+ Map<String, String> props = new HashMap<>();
+ props.put("type", "paimon");
+ props.put("paimon.catalog.type", "jdbc");
+ props.put("uri", "jdbc:mysql://localhost:3306/paimon");
+ props.put("warehouse", "s3://warehouse/path");
+
+ MetastoreProperties properties = MetastoreProperties.create(props);
+ Assertions.assertEquals(PaimonJdbcMetaStoreProperties.class,
properties.getClass());
+ }
+
+ @Test
+ public void testMissingWarehouse() throws Exception {
+ Map<String, String> props = new HashMap<>();
+ props.put("type", "paimon");
+ props.put("paimon.catalog.type", "jdbc");
+ props.put("uri", "jdbc:mysql://localhost:3306/paimon");
+
+ Assertions.assertThrows(IllegalArgumentException.class, () ->
MetastoreProperties.create(props));
+ }
+
+ @Test
+ public void testMissingUri() throws Exception {
+ Map<String, String> props = new HashMap<>();
+ props.put("type", "paimon");
+ props.put("paimon.catalog.type", "jdbc");
+ props.put("warehouse", "s3://warehouse/path");
+
+ Assertions.assertThrows(IllegalArgumentException.class, () ->
MetastoreProperties.create(props));
+ }
+
+ @Test
+ public void testDriverClassRequiredWhenDriverUrlIsSet() throws Exception {
+ Map<String, String> props = new HashMap<>();
+ props.put("type", "paimon");
+ props.put("paimon.catalog.type", "jdbc");
+ props.put("uri", "jdbc:mysql://localhost:3306/paimon");
+ props.put("warehouse", "s3://warehouse/path");
+ props.put("paimon.jdbc.driver_url",
"https://example.com/mysql-connector-java.jar");
+
+ PaimonJdbcMetaStoreProperties jdbcProps =
(PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props);
+ jdbcProps.initNormalizeAndCheckProps();
+ Assertions.assertThrows(IllegalArgumentException.class,
+ () -> jdbcProps.initializeCatalog("paimon_catalog",
Collections.emptyList()));
+ }
+
+ @Test
+ public void testRawDriverClassRequiredWhenDriverUrlIsSet() throws
Exception {
+ Map<String, String> props = new HashMap<>();
+ props.put("type", "paimon");
+ props.put("paimon.catalog.type", "jdbc");
+ props.put("uri", "jdbc:mysql://localhost:3306/paimon");
+ props.put("warehouse", "s3://warehouse/path");
+ props.put("jdbc.driver_url",
"https://example.com/mysql-connector-java.jar");
+
+ PaimonJdbcMetaStoreProperties jdbcProps =
(PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props);
+ jdbcProps.initNormalizeAndCheckProps();
+ Assertions.assertThrows(IllegalArgumentException.class,
+ () -> jdbcProps.initializeCatalog("paimon_catalog",
Collections.emptyList()));
+ }
+}
diff --git
a/regression-test/data/external_table_p0/paimon/test_paimon_jdbc_catalog.out
b/regression-test/data/external_table_p0/paimon/test_paimon_jdbc_catalog.out
new file mode 100644
index 00000000000..0866e5961ba
--- /dev/null
+++ b/regression-test/data/external_table_p0/paimon/test_paimon_jdbc_catalog.out
@@ -0,0 +1,5 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !paimon_jdbc_select --
+1 alice 2025-01-01
+2 bob 2025-01-02
+
diff --git
a/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy
b/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy
new file mode 100644
index 00000000000..0f653063f14
--- /dev/null
+++
b/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_jdbc_catalog", "p0,external") {
+ String enabled = context.config.otherConfigs.get("enablePaimonTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("Paimon test is not enabled, skip this test")
+ return
+ }
+
+ String enabledJdbc = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabledJdbc == null || !enabledJdbc.equalsIgnoreCase("true")) {
+ logger.info("Paimon JDBC catalog test requires enableJdbcTest, skip
this test")
+ return
+ }
+
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String minioPort =
context.config.otherConfigs.get("paimon_jdbc_minio_port")
+ if (minioPort == null || minioPort.isEmpty()) {
+ minioPort = context.config.otherConfigs.get("iceberg_minio_port")
+ }
+ String jdbcPort = context.config.otherConfigs.get("pg_14_port")
+ if (externalEnvIp == null || externalEnvIp.isEmpty()
+ || minioPort == null || minioPort.isEmpty()
+ || jdbcPort == null || jdbcPort.isEmpty()) {
+ logger.info("Paimon JDBC catalog test environment is not fully
configured, skip this test")
+ return
+ }
+
+ String minioAk = context.config.otherConfigs.get("paimon_jdbc_minio_ak")
+ if (minioAk == null || minioAk.isEmpty()) {
+ minioAk = "admin"
+ }
+ String minioSk = context.config.otherConfigs.get("paimon_jdbc_minio_sk")
+ if (minioSk == null || minioSk.isEmpty()) {
+ minioSk = "password"
+ }
+ String warehouseBucket =
context.config.otherConfigs.get("paimon_jdbc_warehouse_bucket")
+ if (warehouseBucket == null || warehouseBucket.isEmpty()) {
+ warehouseBucket = "warehouse"
+ }
+
+ String catalogName = "test_paimon_jdbc_catalog"
+ String dbName = "paimon_jdbc_db"
+ String driverName = "postgresql-42.5.0.jar"
+ String driverDownloadUrl =
"${getS3Url()}/regression/jdbc_driver/${driverName}"
+ String jdbcDriversDir = getFeConfig("jdbc_drivers_dir")
+ String localDriverDir = "${context.config.dataPath}/jdbc_driver"
+ String localDriverPath = "${localDriverDir}/${driverName}"
+ String sparkDriverPath = "/tmp/${driverName}"
+ String sparkSeedCatalogName = "${catalogName}_seed"
+
+ assertTrue(jdbcDriversDir != null && !jdbcDriversDir.isEmpty(),
"jdbc_drivers_dir must be configured")
+
+ def executeCommand = { String cmd, Boolean mustSuc ->
+ try {
+ logger.info("execute ${cmd}")
+ def proc = new ProcessBuilder("/bin/bash", "-c",
cmd).redirectErrorStream(true).start()
+ int exitcode = proc.waitFor()
+ String output = proc.text
+ if (exitcode != 0) {
+ logger.info("exit code: ${exitcode}, output\n: ${output}")
+ if (mustSuc) {
+ assertTrue(false, "Execute failed: ${cmd}")
+ }
+ }
+ return output
+ } catch (IOException e) {
+ assertTrue(false, "Execute timeout: ${cmd}")
+ }
+ }
+
+ executeCommand("mkdir -p ${localDriverDir}", false)
+ executeCommand("mkdir -p ${jdbcDriversDir}", true)
+ if (!new File(localDriverPath).exists()) {
+ executeCommand("/usr/bin/curl --max-time 600 ${driverDownloadUrl}
--output ${localDriverPath}", true)
+ }
+ executeCommand("cp -f ${localDriverPath} ${jdbcDriversDir}/${driverName}",
true)
+
+ String sparkContainerName = executeCommand("docker ps --filter
name=spark-iceberg --format {{.Names}}", false)
+ ?.trim()
+ if (sparkContainerName == null || sparkContainerName.isEmpty()) {
+ logger.info("spark-iceberg container not found, skip this test")
+ return
+ }
+ executeCommand("docker cp ${localDriverPath}
${sparkContainerName}:${sparkDriverPath}", true)
+
+ def sparkPaimonJdbc = { String sqlText ->
+ String escapedSql = sqlText.replaceAll('"', '\\\\"')
+ String command = """docker exec ${sparkContainerName} spark-sql
--master spark://${sparkContainerName}:7077 \
+--jars ${sparkDriverPath} \
+--driver-class-path ${sparkDriverPath} \
+--conf spark.driver.extraClassPath=${sparkDriverPath} \
+--conf spark.executor.extraClassPath=${sparkDriverPath} \
+--conf
spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
\
+--conf
spark.sql.catalog.${sparkSeedCatalogName}=org.apache.paimon.spark.SparkCatalog \
+--conf
spark.sql.catalog.${sparkSeedCatalogName}.warehouse=s3://${warehouseBucket}/paimon_jdbc_catalog/
\
+--conf spark.sql.catalog.${sparkSeedCatalogName}.metastore=jdbc \
+--conf
spark.sql.catalog.${sparkSeedCatalogName}.uri=jdbc:postgresql://${externalEnvIp}:${jdbcPort}/postgres
\
+--conf spark.sql.catalog.${sparkSeedCatalogName}.catalog-key=${catalogName} \
+--conf spark.sql.catalog.${sparkSeedCatalogName}.jdbc.user=postgres \
+--conf spark.sql.catalog.${sparkSeedCatalogName}.jdbc.password=123456 \
+--conf spark.sql.catalog.${sparkSeedCatalogName}.lock.enabled=false \
+--conf
spark.sql.catalog.${sparkSeedCatalogName}.s3.endpoint=http://${externalEnvIp}:${minioPort}
\
+--conf spark.sql.catalog.${sparkSeedCatalogName}.s3.access-key=${minioAk} \
+--conf spark.sql.catalog.${sparkSeedCatalogName}.s3.secret-key=${minioSk} \
+--conf spark.sql.catalog.${sparkSeedCatalogName}.s3.region=us-east-1 \
+--conf spark.sql.catalog.${sparkSeedCatalogName}.s3.path.style.access=true \
+-e "${escapedSql}" """
+ executeCommand(command, true)
+ }
+
+ try {
+ sql """switch internal"""
+ sql """DROP CATALOG IF EXISTS ${catalogName}"""
+ sql """
+ CREATE CATALOG ${catalogName} PROPERTIES (
+ 'type' = 'paimon',
+ 'paimon.catalog.type' = 'jdbc',
+ 'uri' =
'jdbc:postgresql://${externalEnvIp}:${jdbcPort}/postgres',
+ 'warehouse' = 's3://${warehouseBucket}/paimon_jdbc_catalog/',
+ 'paimon.catalog-key' = '${catalogName}',
+ 'paimon.jdbc.driver_url' =
'file://${jdbcDriversDir}/${driverName}',
+ 'paimon.jdbc.driver_class' = 'org.postgresql.Driver',
+ 'paimon.jdbc.user' = 'postgres',
+ 'paimon.jdbc.password' = '123456',
+ 's3.endpoint' = 'http://${externalEnvIp}:${minioPort}',
+ 's3.access_key' = '${minioAk}',
+ 's3.secret_key' = '${minioSk}',
+ 's3.region' = 'us-east-1',
+ 'use_path_style' = 'true'
+ )
+ """
+
+ sql """SWITCH ${catalogName}"""
+ def catalogs = sql """SHOW CATALOGS"""
+ assertTrue(catalogs.toString().contains(catalogName))
+
+ sql """DROP DATABASE IF EXISTS ${dbName} FORCE"""
+ sql """CREATE DATABASE ${dbName}"""
+ def databases = sql """SHOW DATABASES"""
+ assertTrue(databases.toString().contains(dbName))
+
+ sql """USE ${dbName}"""
+ sql """DROP TABLE IF EXISTS paimon_jdbc_tbl"""
+ sql """
+ CREATE TABLE ${dbName}.paimon_jdbc_tbl (
+ id INT,
+ name STRING,
+ dt DATE
+ ) ENGINE=paimon
+ PROPERTIES (
+ 'primary-key' = 'id',
+ 'bucket' = '2'
+ )
+ """
+
+ def tables = sql """SHOW TABLES"""
+ assertTrue(tables.toString().contains("paimon_jdbc_tbl"))
+
+ sparkPaimonJdbc """
+ INSERT INTO ${sparkSeedCatalogName}.${dbName}.paimon_jdbc_tbl
VALUES
+ (1, 'alice', DATE '2025-01-01'),
+ (2, 'bob', DATE '2025-01-02')
+ """
+
+ def descResult = sql """DESC paimon_jdbc_tbl"""
+ assertTrue(descResult.toString().contains("id"))
+ assertTrue(descResult.toString().contains("name"))
+ assertTrue(descResult.toString().contains("dt"))
+
+ order_qt_paimon_jdbc_select """SELECT * FROM paimon_jdbc_tbl ORDER BY
id"""
+
+ def rowCount = sql """SELECT COUNT(*) FROM paimon_jdbc_tbl"""
+ assertEquals(1, rowCount.size())
+ assertEquals("2", rowCount[0][0].toString())
+
+ def schemaDesc = sql """DESC paimon_jdbc_tbl\$schemas"""
+ assertTrue(schemaDesc.toString().contains("schema_id"))
+
+ def schemaCount = sql """SELECT COUNT(*) FROM
paimon_jdbc_tbl\$schemas"""
+ assertEquals(1, schemaCount.size())
+ assertTrue(schemaCount[0][0].toString().toInteger() >= 1)
+ } finally {
+ try {
+ sql """SWITCH ${catalogName}"""
+ sql """DROP TABLE IF EXISTS ${dbName}.paimon_jdbc_tbl"""
+ sql """DROP DATABASE IF EXISTS ${dbName} FORCE"""
+ } catch (Exception e) {
+ logger.info("Cleanup in catalog ${catalogName} failed:
${e.getMessage()}")
+ }
+ try {
+ sql """SWITCH internal"""
+ } catch (Exception e) {
+ logger.info("Switch back to internal catalog failed:
${e.getMessage()}")
+ }
+ sql """DROP CATALOG IF EXISTS ${catalogName}"""
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]