This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 79153d305ba [feat](paimon) support jdbc catalog type (#61094)
79153d305ba is described below

commit 79153d305bafcc0ae3df0150410e5a2cc145fba4
Author: Chenjunwei <[email protected]>
AuthorDate: Thu Mar 12 14:49:29 2026 +0800

    [feat](paimon) support jdbc catalog type (#61094)
    
    ## Summary
    - add Paimon JDBC metastore properties and driver loading support
    - register paimon.catalog.type=jdbc in paimon properties/catalog factory
    - allow paimon jdbc type in external table/system table thrift
    conversion
    - add FE unit tests for jdbc properties mapping and validation
    
    ## Test
    - ./run-fe-ut.sh --run
    
org.apache.doris.datasource.property.metastore.PaimonJdbcMetaStorePropertiesTest
---
 .../datasource/paimon/PaimonExternalCatalog.java   |   1 +
 .../paimon/PaimonExternalCatalogFactory.java       |   1 +
 .../datasource/paimon/PaimonExternalTable.java     |   5 +-
 .../datasource/paimon/PaimonSysExternalTable.java  |   5 +-
 .../metastore/PaimonJdbcMetaStoreProperties.java   | 247 +++++++++++++++++++++
 .../metastore/PaimonPropertiesFactory.java         |   1 +
 .../PaimonJdbcMetaStorePropertiesTest.java         | 155 +++++++++++++
 .../paimon/test_paimon_jdbc_catalog.out            |   5 +
 .../paimon/test_paimon_jdbc_catalog.groovy         | 213 ++++++++++++++++++
 9 files changed, 629 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
index 87f606dfcca..1cb69fa423b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -47,6 +47,7 @@ public class PaimonExternalCatalog extends ExternalCatalog {
     public static final String PAIMON_HMS = "hms";
     public static final String PAIMON_DLF = "dlf";
     public static final String PAIMON_REST = "rest";
+    public static final String PAIMON_JDBC = "jdbc";
     public static final String PAIMON_TABLE_CACHE_ENABLE = 
"meta.cache.paimon.table.enable";
     public static final String PAIMON_TABLE_CACHE_TTL_SECOND = 
"meta.cache.paimon.table.ttl-second";
     public static final String PAIMON_TABLE_CACHE_CAPACITY = 
"meta.cache.paimon.table.capacity";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
index ae19907d932..affe4995f10 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java
@@ -38,6 +38,7 @@ public class PaimonExternalCatalogFactory {
             case PaimonExternalCatalog.PAIMON_FILESYSTEM:
             case PaimonExternalCatalog.PAIMON_DLF:
             case PaimonExternalCatalog.PAIMON_REST:
+            case PaimonExternalCatalog.PAIMON_JDBC:
                 return new PaimonExternalCatalog(catalogId, name, resource, 
props, comment);
             default:
                 throw new DdlException("Unknown " + 
PaimonExternalCatalog.PAIMON_CATALOG_TYPE
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
index 8e17599c7dd..4c3ee9a3947 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
@@ -181,7 +181,8 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
         if (PaimonExternalCatalog.PAIMON_HMS.equals(getPaimonCatalogType())
                 || 
PaimonExternalCatalog.PAIMON_FILESYSTEM.equals(getPaimonCatalogType())
                 || 
PaimonExternalCatalog.PAIMON_DLF.equals(getPaimonCatalogType())
-                || 
PaimonExternalCatalog.PAIMON_REST.equals(getPaimonCatalogType())) {
+                || 
PaimonExternalCatalog.PAIMON_REST.equals(getPaimonCatalogType())
+                || 
PaimonExternalCatalog.PAIMON_JDBC.equals(getPaimonCatalogType())) {
             THiveTable tHiveTable = new THiveTable(dbName, name, new 
HashMap<>());
             TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.HIVE_TABLE, schema.size(), 0,
                     getName(), dbName);
@@ -189,7 +190,7 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
             return tTableDescriptor;
         } else {
             throw new IllegalArgumentException(
-                    "Currently only supports hms/dlf/rest/filesystem catalog, 
do not support :"
+                    "Currently only supports hms/dlf/rest/filesystem/jdbc 
catalog, do not support: "
                     + getPaimonCatalogType());
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSysExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSysExternalTable.java
index 3245b909562..e2300e20555 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSysExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSysExternalTable.java
@@ -198,7 +198,8 @@ public class PaimonSysExternalTable extends ExternalTable {
         if (PaimonExternalCatalog.PAIMON_HMS.equals(catalogType)
                 || PaimonExternalCatalog.PAIMON_FILESYSTEM.equals(catalogType)
                 || PaimonExternalCatalog.PAIMON_DLF.equals(catalogType)
-                || PaimonExternalCatalog.PAIMON_REST.equals(catalogType)) {
+                || PaimonExternalCatalog.PAIMON_REST.equals(catalogType)
+                || PaimonExternalCatalog.PAIMON_JDBC.equals(catalogType)) {
             THiveTable tHiveTable = new THiveTable(dbName, name, new 
HashMap<>());
             TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.HIVE_TABLE, schema.size(), 0,
                     getName(), dbName);
@@ -206,7 +207,7 @@ public class PaimonSysExternalTable extends ExternalTable {
             return tTableDescriptor;
         } else {
             throw new IllegalArgumentException(
-                    "Currently only supports hms/dlf/rest/filesystem catalog, 
do not support :" + catalogType);
+                    "Currently only supports hms/dlf/rest/filesystem/jdbc 
catalog, do not support: " + catalogType);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java
new file mode 100644
index 00000000000..2b32367aba1
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.catalog.JdbcResource;
+import 
org.apache.doris.common.security.authentication.HadoopExecutionAuthenticator;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.property.ConnectorProperty;
+import org.apache.doris.datasource.property.storage.HdfsProperties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.jdbc.JdbcCatalogFactory;
+import org.apache.paimon.options.CatalogOptions;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PaimonJdbcMetaStoreProperties extends AbstractPaimonProperties {
+    private static final Logger LOG = 
LogManager.getLogger(PaimonJdbcMetaStoreProperties.class);
+    private static final String JDBC_PREFIX = "jdbc.";
+    private static final Map<URL, ClassLoader> DRIVER_CLASS_LOADER_CACHE = new 
ConcurrentHashMap<>();
+    private static final Set<String> REGISTERED_DRIVER_KEYS = 
ConcurrentHashMap.newKeySet();
+
+    @ConnectorProperty(
+            names = {"uri", "paimon.jdbc.uri"},
+            required = true,
+            description = "JDBC connection URI for the Paimon JDBC catalog."
+    )
+    private String uri = "";
+
+    @ConnectorProperty(
+            names = {"paimon.jdbc.user", "jdbc.user"},
+            required = false,
+            description = "Username for the Paimon JDBC catalog."
+    )
+    private String jdbcUser;
+
+    @ConnectorProperty(
+            names = {"paimon.jdbc.password", "jdbc.password"},
+            required = false,
+            sensitive = true,
+            description = "Password for the Paimon JDBC catalog."
+    )
+    private String jdbcPassword;
+
+    @ConnectorProperty(
+            names = {"paimon.jdbc.driver_url", "jdbc.driver_url"},
+            required = false,
+            description = "JDBC driver JAR file path or URL. "
+                    + "Can be a local file name (will look in 
$DORIS_HOME/plugins/jdbc_drivers/) "
+                    + "or a full URL (http://, https://, file://)."
+    )
+    private String driverUrl;
+
+    @ConnectorProperty(
+            names = {"paimon.jdbc.driver_class", "jdbc.driver_class"},
+            required = false,
+            description = "JDBC driver class name. If specified with 
paimon.jdbc.driver_url, "
+                    + "the driver will be loaded dynamically."
+    )
+    private String driverClass;
+
+    protected PaimonJdbcMetaStoreProperties(Map<String, String> props) {
+        super(props);
+    }
+
+    @Override
+    public String getPaimonCatalogType() {
+        return PaimonExternalCatalog.PAIMON_JDBC;
+    }
+
+    @Override
+    protected void checkRequiredProperties() {
+        super.checkRequiredProperties();
+        if (StringUtils.isBlank(warehouse)) {
+            throw new IllegalArgumentException("Property warehouse is 
required.");
+        }
+    }
+
+    @Override
+    public Catalog initializeCatalog(String catalogName, 
List<StorageProperties> storagePropertiesList) {
+        buildCatalogOptions();
+        Configuration conf = new Configuration();
+        for (StorageProperties storageProperties : storagePropertiesList) {
+            if (storageProperties.getHadoopStorageConfig() != null) {
+                conf.addResource(storageProperties.getHadoopStorageConfig());
+            }
+            if 
(storageProperties.getType().equals(StorageProperties.Type.HDFS)) {
+                this.executionAuthenticator = new 
HadoopExecutionAuthenticator(((HdfsProperties) storageProperties)
+                        .getHadoopAuthenticator());
+            }
+        }
+        appendUserHadoopConfig(conf);
+        if (StringUtils.isNotBlank(driverUrl)) {
+            registerJdbcDriver(driverUrl, driverClass);
+            LOG.info("Using dynamic JDBC driver for Paimon JDBC catalog from: 
{}", driverUrl);
+        }
+        CatalogContext catalogContext = CatalogContext.create(catalogOptions, 
conf);
+        try {
+            return this.executionAuthenticator.execute(() -> 
CatalogFactory.createCatalog(catalogContext));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to create Paimon catalog with 
JDBC metastore: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected void appendCustomCatalogOptions() {
+        catalogOptions.set(CatalogOptions.URI.key(), uri);
+        addIfNotBlank("jdbc.user", jdbcUser);
+        addIfNotBlank("jdbc.password", jdbcPassword);
+        appendRawJdbcCatalogOptions();
+    }
+
+    @Override
+    protected String getMetastoreType() {
+        return JdbcCatalogFactory.IDENTIFIER;
+    }
+
+    private void addIfNotBlank(String key, String value) {
+        if (StringUtils.isNotBlank(value)) {
+            catalogOptions.set(key, value);
+        }
+    }
+
+    private void appendRawJdbcCatalogOptions() {
+        origProps.forEach((key, value) -> {
+            if (key != null && key.startsWith(JDBC_PREFIX) && 
!catalogOptions.keySet().contains(key)) {
+                catalogOptions.set(key, value);
+            }
+        });
+    }
+
+    /**
+     * Register JDBC driver with DriverManager.
+     * This is necessary because DriverManager.getConnection() doesn't use 
Thread.contextClassLoader.
+     */
+    private void registerJdbcDriver(String driverUrl, String driverClassName) {
+        try {
+            if (StringUtils.isBlank(driverClassName)) {
+                throw new IllegalArgumentException(
+                        "jdbc.driver_class or paimon.jdbc.driver_class is 
required when jdbc.driver_url "
+                                + "or paimon.jdbc.driver_url is specified");
+            }
+
+            String fullDriverUrl = JdbcResource.getFullDriverUrl(driverUrl);
+            URL url = new URL(fullDriverUrl);
+            String driverKey = fullDriverUrl + "#" + driverClassName;
+            if (!REGISTERED_DRIVER_KEYS.add(driverKey)) {
+                LOG.info("JDBC driver already registered for Paimon catalog: 
{} from {}",
+                        driverClassName, fullDriverUrl);
+                return;
+            }
+            try {
+                ClassLoader classLoader = 
DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(url, u -> {
+                    ClassLoader parent = getClass().getClassLoader();
+                    return URLClassLoader.newInstance(new URL[] {u}, parent);
+                });
+                Class<?> loadedDriverClass = Class.forName(driverClassName, 
true, classLoader);
+                java.sql.Driver driver = (java.sql.Driver) 
loadedDriverClass.getDeclaredConstructor().newInstance();
+                java.sql.DriverManager.registerDriver(new DriverShim(driver));
+                LOG.info("Successfully registered JDBC driver for Paimon 
catalog: {} from {}",
+                        driverClassName, fullDriverUrl);
+            } catch (ClassNotFoundException e) {
+                REGISTERED_DRIVER_KEYS.remove(driverKey);
+                throw new IllegalArgumentException("Failed to load JDBC driver 
class: " + driverClassName, e);
+            } catch (Exception e) {
+                REGISTERED_DRIVER_KEYS.remove(driverKey);
+                throw new RuntimeException("Failed to register JDBC driver: " 
+ driverClassName, e);
+            }
+        } catch (MalformedURLException e) {
+            throw new IllegalArgumentException("Invalid driver URL: " + 
driverUrl, e);
+        } catch (IllegalArgumentException e) {
+            throw e;
+        }
+    }
+
+    private static class DriverShim implements java.sql.Driver {
+        private final java.sql.Driver delegate;
+
+        DriverShim(java.sql.Driver delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public java.sql.Connection connect(String url, java.util.Properties 
info) throws java.sql.SQLException {
+            return delegate.connect(url, info);
+        }
+
+        @Override
+        public boolean acceptsURL(String url) throws java.sql.SQLException {
+            return delegate.acceptsURL(url);
+        }
+
+        @Override
+        public java.sql.DriverPropertyInfo[] getPropertyInfo(String url, 
java.util.Properties info)
+                throws java.sql.SQLException {
+            return delegate.getPropertyInfo(url, info);
+        }
+
+        @Override
+        public int getMajorVersion() {
+            return delegate.getMajorVersion();
+        }
+
+        @Override
+        public int getMinorVersion() {
+            return delegate.getMinorVersion();
+        }
+
+        @Override
+        public boolean jdbcCompliant() {
+            return delegate.jdbcCompliant();
+        }
+
+        @Override
+        public java.util.logging.Logger getParentLogger() throws 
java.sql.SQLFeatureNotSupportedException {
+            return delegate.getParentLogger();
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
index 958daafd175..cc9b1ecef49 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonPropertiesFactory.java
@@ -29,6 +29,7 @@ public class PaimonPropertiesFactory extends 
AbstractMetastorePropertiesFactory
         register("filesystem", PaimonFileSystemMetaStoreProperties::new);
         register("hms", PaimonHMSMetaStoreProperties::new);
         register("rest", PaimonRestMetaStoreProperties::new);
+        register("jdbc", PaimonJdbcMetaStoreProperties::new);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java
new file mode 100644
index 00000000000..471d2f05a95
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+
+import org.apache.paimon.options.CatalogOptions;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PaimonJdbcMetaStorePropertiesTest {
+
+    @Test
+    public void testBasicJdbcProperties() throws Exception {
+        Map<String, String> props = new HashMap<>();
+        props.put("type", "paimon");
+        props.put("paimon.catalog.type", "jdbc");
+        props.put("uri", "jdbc:mysql://localhost:3306/paimon");
+        props.put("warehouse", "s3://warehouse/path");
+        props.put("paimon.jdbc.user", "paimon");
+        props.put("paimon.jdbc.password", "secret");
+
+        PaimonJdbcMetaStoreProperties jdbcProps = 
(PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props);
+        jdbcProps.initNormalizeAndCheckProps();
+        jdbcProps.buildCatalogOptions();
+
+        Assertions.assertEquals(PaimonExternalCatalog.PAIMON_JDBC, 
jdbcProps.getPaimonCatalogType());
+        Assertions.assertEquals("jdbc", 
jdbcProps.getCatalogOptions().get(CatalogOptions.METASTORE.key()));
+        Assertions.assertEquals("jdbc:mysql://localhost:3306/paimon",
+                jdbcProps.getCatalogOptions().get(CatalogOptions.URI.key()));
+        Assertions.assertEquals("paimon", 
jdbcProps.getCatalogOptions().get("jdbc.user"));
+        Assertions.assertEquals("secret", 
jdbcProps.getCatalogOptions().get("jdbc.password"));
+    }
+
+    @Test
+    public void testJdbcPrefixPassthrough() throws Exception {
+        Map<String, String> props = new HashMap<>();
+        props.put("type", "paimon");
+        props.put("paimon.catalog.type", "jdbc");
+        props.put("uri", "jdbc:mysql://localhost:3306/paimon");
+        props.put("warehouse", "s3://warehouse/path");
+        props.put("paimon.jdbc.useSSL", "true");
+        props.put("paimon.jdbc.verifyServerCertificate", "true");
+
+        PaimonJdbcMetaStoreProperties jdbcProps = 
(PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props);
+        jdbcProps.initNormalizeAndCheckProps();
+        jdbcProps.buildCatalogOptions();
+
+        Assertions.assertEquals("true", 
jdbcProps.getCatalogOptions().get("jdbc.useSSL"));
+        Assertions.assertEquals("true", 
jdbcProps.getCatalogOptions().get("jdbc.verifyServerCertificate"));
+    }
+
+    @Test
+    public void testRawJdbcPrefixPassthrough() throws Exception {
+        Map<String, String> props = new HashMap<>();
+        props.put("type", "paimon");
+        props.put("paimon.catalog.type", "jdbc");
+        props.put("uri", "jdbc:mysql://localhost:3306/paimon");
+        props.put("warehouse", "s3://warehouse/path");
+        props.put("jdbc.user", "raw_user");
+        props.put("jdbc.password", "raw_password");
+        props.put("jdbc.useSSL", "true");
+        props.put("jdbc.verifyServerCertificate", "true");
+
+        PaimonJdbcMetaStoreProperties jdbcProps = 
(PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props);
+        jdbcProps.initNormalizeAndCheckProps();
+        jdbcProps.buildCatalogOptions();
+
+        Assertions.assertEquals("raw_user", 
jdbcProps.getCatalogOptions().get("jdbc.user"));
+        Assertions.assertEquals("raw_password", 
jdbcProps.getCatalogOptions().get("jdbc.password"));
+        Assertions.assertEquals("true", 
jdbcProps.getCatalogOptions().get("jdbc.useSSL"));
+        Assertions.assertEquals("true", 
jdbcProps.getCatalogOptions().get("jdbc.verifyServerCertificate"));
+    }
+
+    @Test
+    public void testFactoryCreateJdbcType() throws Exception {
+        Map<String, String> props = new HashMap<>();
+        props.put("type", "paimon");
+        props.put("paimon.catalog.type", "jdbc");
+        props.put("uri", "jdbc:mysql://localhost:3306/paimon");
+        props.put("warehouse", "s3://warehouse/path");
+
+        MetastoreProperties properties = MetastoreProperties.create(props);
+        Assertions.assertEquals(PaimonJdbcMetaStoreProperties.class, 
properties.getClass());
+    }
+
+    @Test
+    public void testMissingWarehouse() throws Exception {
+        Map<String, String> props = new HashMap<>();
+        props.put("type", "paimon");
+        props.put("paimon.catalog.type", "jdbc");
+        props.put("uri", "jdbc:mysql://localhost:3306/paimon");
+
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
MetastoreProperties.create(props));
+    }
+
+    @Test
+    public void testMissingUri() throws Exception {
+        Map<String, String> props = new HashMap<>();
+        props.put("type", "paimon");
+        props.put("paimon.catalog.type", "jdbc");
+        props.put("warehouse", "s3://warehouse/path");
+
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
MetastoreProperties.create(props));
+    }
+
+    @Test
+    public void testDriverClassRequiredWhenDriverUrlIsSet() throws Exception {
+        Map<String, String> props = new HashMap<>();
+        props.put("type", "paimon");
+        props.put("paimon.catalog.type", "jdbc");
+        props.put("uri", "jdbc:mysql://localhost:3306/paimon");
+        props.put("warehouse", "s3://warehouse/path");
+        props.put("paimon.jdbc.driver_url", 
"https://example.com/mysql-connector-java.jar";);
+
+        PaimonJdbcMetaStoreProperties jdbcProps = 
(PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props);
+        jdbcProps.initNormalizeAndCheckProps();
+        Assertions.assertThrows(IllegalArgumentException.class,
+                () -> jdbcProps.initializeCatalog("paimon_catalog", 
Collections.emptyList()));
+    }
+
+    @Test
+    public void testRawDriverClassRequiredWhenDriverUrlIsSet() throws 
Exception {
+        Map<String, String> props = new HashMap<>();
+        props.put("type", "paimon");
+        props.put("paimon.catalog.type", "jdbc");
+        props.put("uri", "jdbc:mysql://localhost:3306/paimon");
+        props.put("warehouse", "s3://warehouse/path");
+        props.put("jdbc.driver_url", 
"https://example.com/mysql-connector-java.jar";);
+
+        PaimonJdbcMetaStoreProperties jdbcProps = 
(PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props);
+        jdbcProps.initNormalizeAndCheckProps();
+        Assertions.assertThrows(IllegalArgumentException.class,
+                () -> jdbcProps.initializeCatalog("paimon_catalog", 
Collections.emptyList()));
+    }
+}
diff --git 
a/regression-test/data/external_table_p0/paimon/test_paimon_jdbc_catalog.out 
b/regression-test/data/external_table_p0/paimon/test_paimon_jdbc_catalog.out
new file mode 100644
index 00000000000..0866e5961ba
--- /dev/null
+++ b/regression-test/data/external_table_p0/paimon/test_paimon_jdbc_catalog.out
@@ -0,0 +1,5 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !paimon_jdbc_select --
+1      alice   2025-01-01
+2      bob     2025-01-02
+
diff --git 
a/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy
 
b/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy
new file mode 100644
index 00000000000..0f653063f14
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_jdbc_catalog", "p0,external") {
+    String enabled = context.config.otherConfigs.get("enablePaimonTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("Paimon test is not enabled, skip this test")
+        return
+    }
+
+    String enabledJdbc = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabledJdbc == null || !enabledJdbc.equalsIgnoreCase("true")) {
+        logger.info("Paimon JDBC catalog test requires enableJdbcTest, skip 
this test")
+        return
+    }
+
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String minioPort = 
context.config.otherConfigs.get("paimon_jdbc_minio_port")
+    if (minioPort == null || minioPort.isEmpty()) {
+        minioPort = context.config.otherConfigs.get("iceberg_minio_port")
+    }
+    String jdbcPort = context.config.otherConfigs.get("pg_14_port")
+    if (externalEnvIp == null || externalEnvIp.isEmpty()
+            || minioPort == null || minioPort.isEmpty()
+            || jdbcPort == null || jdbcPort.isEmpty()) {
+        logger.info("Paimon JDBC catalog test environment is not fully 
configured, skip this test")
+        return
+    }
+
+    String minioAk = context.config.otherConfigs.get("paimon_jdbc_minio_ak")
+    if (minioAk == null || minioAk.isEmpty()) {
+        minioAk = "admin"
+    }
+    String minioSk = context.config.otherConfigs.get("paimon_jdbc_minio_sk")
+    if (minioSk == null || minioSk.isEmpty()) {
+        minioSk = "password"
+    }
+    String warehouseBucket = 
context.config.otherConfigs.get("paimon_jdbc_warehouse_bucket")
+    if (warehouseBucket == null || warehouseBucket.isEmpty()) {
+        warehouseBucket = "warehouse"
+    }
+
+    String catalogName = "test_paimon_jdbc_catalog"
+    String dbName = "paimon_jdbc_db"
+    String driverName = "postgresql-42.5.0.jar"
+    String driverDownloadUrl = 
"${getS3Url()}/regression/jdbc_driver/${driverName}"
+    String jdbcDriversDir = getFeConfig("jdbc_drivers_dir")
+    String localDriverDir = "${context.config.dataPath}/jdbc_driver"
+    String localDriverPath = "${localDriverDir}/${driverName}"
+    String sparkDriverPath = "/tmp/${driverName}"
+    String sparkSeedCatalogName = "${catalogName}_seed"
+
+    assertTrue(jdbcDriversDir != null && !jdbcDriversDir.isEmpty(), 
"jdbc_drivers_dir must be configured")
+
+    def executeCommand = { String cmd, Boolean mustSuc ->
+        try {
+            logger.info("execute ${cmd}")
+            def proc = new ProcessBuilder("/bin/bash", "-c", 
cmd).redirectErrorStream(true).start()
+            int exitcode = proc.waitFor()
+            String output = proc.text
+            if (exitcode != 0) {
+                logger.info("exit code: ${exitcode}, output\n: ${output}")
+                if (mustSuc) {
+                    assertTrue(false, "Execute failed: ${cmd}")
+                }
+            }
+            return output
+        } catch (IOException e) {
+            assertTrue(false, "Execute timeout: ${cmd}")
+        }
+    }
+
+    executeCommand("mkdir -p ${localDriverDir}", false)
+    executeCommand("mkdir -p ${jdbcDriversDir}", true)
+    if (!new File(localDriverPath).exists()) {
+        executeCommand("/usr/bin/curl --max-time 600 ${driverDownloadUrl} 
--output ${localDriverPath}", true)
+    }
+    executeCommand("cp -f ${localDriverPath} ${jdbcDriversDir}/${driverName}", 
true)
+
+    String sparkContainerName = executeCommand("docker ps --filter 
name=spark-iceberg --format {{.Names}}", false)
+            ?.trim()
+    if (sparkContainerName == null || sparkContainerName.isEmpty()) {
+        logger.info("spark-iceberg container not found, skip this test")
+        return
+    }
+    executeCommand("docker cp ${localDriverPath} 
${sparkContainerName}:${sparkDriverPath}", true)
+
+    def sparkPaimonJdbc = { String sqlText ->
+        String escapedSql = sqlText.replaceAll('"', '\\\\"')
+        String command = """docker exec ${sparkContainerName} spark-sql 
--master spark://${sparkContainerName}:7077 \
+--jars ${sparkDriverPath} \
+--driver-class-path ${sparkDriverPath} \
+--conf spark.driver.extraClassPath=${sparkDriverPath} \
+--conf spark.executor.extraClassPath=${sparkDriverPath} \
+--conf 
spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
 \
+--conf 
spark.sql.catalog.${sparkSeedCatalogName}=org.apache.paimon.spark.SparkCatalog \
+--conf 
spark.sql.catalog.${sparkSeedCatalogName}.warehouse=s3://${warehouseBucket}/paimon_jdbc_catalog/
 \
+--conf spark.sql.catalog.${sparkSeedCatalogName}.metastore=jdbc \
+--conf 
spark.sql.catalog.${sparkSeedCatalogName}.uri=jdbc:postgresql://${externalEnvIp}:${jdbcPort}/postgres
 \
+--conf spark.sql.catalog.${sparkSeedCatalogName}.catalog-key=${catalogName} \
+--conf spark.sql.catalog.${sparkSeedCatalogName}.jdbc.user=postgres \
+--conf spark.sql.catalog.${sparkSeedCatalogName}.jdbc.password=123456 \
+--conf spark.sql.catalog.${sparkSeedCatalogName}.lock.enabled=false \
+--conf 
spark.sql.catalog.${sparkSeedCatalogName}.s3.endpoint=http://${externalEnvIp}:${minioPort}
 \
+--conf spark.sql.catalog.${sparkSeedCatalogName}.s3.access-key=${minioAk} \
+--conf spark.sql.catalog.${sparkSeedCatalogName}.s3.secret-key=${minioSk} \
+--conf spark.sql.catalog.${sparkSeedCatalogName}.s3.region=us-east-1 \
+--conf spark.sql.catalog.${sparkSeedCatalogName}.s3.path.style.access=true \
+-e "${escapedSql}" """
+        executeCommand(command, true)
+    }
+
+    try {
+        sql """switch internal"""
+        sql """DROP CATALOG IF EXISTS ${catalogName}"""
+        sql """
+            CREATE CATALOG ${catalogName} PROPERTIES (
+                'type' = 'paimon',
+                'paimon.catalog.type' = 'jdbc',
+                'uri' = 
'jdbc:postgresql://${externalEnvIp}:${jdbcPort}/postgres',
+                'warehouse' = 's3://${warehouseBucket}/paimon_jdbc_catalog/',
+                'paimon.catalog-key' = '${catalogName}',
+                'paimon.jdbc.driver_url' = 
'file://${jdbcDriversDir}/${driverName}',
+                'paimon.jdbc.driver_class' = 'org.postgresql.Driver',
+                'paimon.jdbc.user' = 'postgres',
+                'paimon.jdbc.password' = '123456',
+                's3.endpoint' = 'http://${externalEnvIp}:${minioPort}',
+                's3.access_key' = '${minioAk}',
+                's3.secret_key' = '${minioSk}',
+                's3.region' = 'us-east-1',
+                'use_path_style' = 'true'
+            )
+        """
+
+        sql """SWITCH ${catalogName}"""
+        def catalogs = sql """SHOW CATALOGS"""
+        assertTrue(catalogs.toString().contains(catalogName))
+
+        sql """DROP DATABASE IF EXISTS ${dbName} FORCE"""
+        sql """CREATE DATABASE ${dbName}"""
+        def databases = sql """SHOW DATABASES"""
+        assertTrue(databases.toString().contains(dbName))
+
+        sql """USE ${dbName}"""
+        sql """DROP TABLE IF EXISTS paimon_jdbc_tbl"""
+        sql """
+            CREATE TABLE ${dbName}.paimon_jdbc_tbl (
+                id INT,
+                name STRING,
+                dt DATE
+            ) ENGINE=paimon
+            PROPERTIES (
+                'primary-key' = 'id',
+                'bucket' = '2'
+            )
+        """
+
+        def tables = sql """SHOW TABLES"""
+        assertTrue(tables.toString().contains("paimon_jdbc_tbl"))
+
+        sparkPaimonJdbc """
+            INSERT INTO ${sparkSeedCatalogName}.${dbName}.paimon_jdbc_tbl 
VALUES
+            (1, 'alice', DATE '2025-01-01'),
+            (2, 'bob', DATE '2025-01-02')
+        """
+
+        def descResult = sql """DESC paimon_jdbc_tbl"""
+        assertTrue(descResult.toString().contains("id"))
+        assertTrue(descResult.toString().contains("name"))
+        assertTrue(descResult.toString().contains("dt"))
+
+        order_qt_paimon_jdbc_select """SELECT * FROM paimon_jdbc_tbl ORDER BY 
id"""
+
+        def rowCount = sql """SELECT COUNT(*) FROM paimon_jdbc_tbl"""
+        assertEquals(1, rowCount.size())
+        assertEquals("2", rowCount[0][0].toString())
+
+        def schemaDesc = sql """DESC paimon_jdbc_tbl\$schemas"""
+        assertTrue(schemaDesc.toString().contains("schema_id"))
+
+        def schemaCount = sql """SELECT COUNT(*) FROM 
paimon_jdbc_tbl\$schemas"""
+        assertEquals(1, schemaCount.size())
+        assertTrue(schemaCount[0][0].toString().toInteger() >= 1)
+    } finally {
+        try {
+            sql """SWITCH ${catalogName}"""
+            sql """DROP TABLE IF EXISTS ${dbName}.paimon_jdbc_tbl"""
+            sql """DROP DATABASE IF EXISTS ${dbName} FORCE"""
+        } catch (Exception e) {
+            logger.info("Cleanup in catalog ${catalogName} failed: 
${e.getMessage()}")
+        }
+        try {
+            sql """SWITCH internal"""
+        } catch (Exception e) {
+            logger.info("Switch back to internal catalog failed: 
${e.getMessage()}")
+        }
+        sql """DROP CATALOG IF EXISTS ${catalogName}"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to