This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e0d55ff [FLINK-30453] Fix 'can't find CatalogFactory' error when 
using FLINK sql-client to add table store bundle jar
7e0d55ff is described below

commit 7e0d55ff3dc9fd48455b17d9a439647b0554d020
Author: yuzelin <33053040+yuze...@users.noreply.github.com>
AuthorDate: Tue Dec 20 19:40:13 2022 +0800

    [FLINK-30453] Fix 'can't find CatalogFactory' error when using FLINK 
sql-client to add table store bundle jar
    
    This closes #442.
---
 .../flink/table/store/connector/FlinkCatalogFactory.java     | 12 +++++++++---
 .../apache/flink/table/store/connector/FlinkCatalogTest.java |  4 +++-
 .../flink/table/store/file/catalog/CatalogFactory.java       | 10 +++++++++-
 3 files changed, 21 insertions(+), 5 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
index 9c70ffe9..43665476 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
@@ -54,11 +54,17 @@ public class FlinkCatalogFactory implements 
org.apache.flink.table.factories.Cat
 
     @Override
     public FlinkCatalog createCatalog(Context context) {
-        return createCatalog(context.getName(), 
Configuration.fromMap(context.getOptions()));
+        return createCatalog(
+                context.getName(),
+                Configuration.fromMap(context.getOptions()),
+                context.getClassLoader());
     }
 
-    public static FlinkCatalog createCatalog(String catalogName, Configuration 
options) {
+    public static FlinkCatalog createCatalog(
+            String catalogName, Configuration options, ClassLoader 
classLoader) {
         return new FlinkCatalog(
-                CatalogFactory.createCatalog(options), catalogName, 
options.get(DEFAULT_DATABASE));
+                CatalogFactory.createCatalog(options, classLoader),
+                catalogName,
+                options.get(DEFAULT_DATABASE));
     }
 }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
index e7a6d6fe..93271875 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
@@ -75,7 +75,9 @@ public class FlinkCatalogTest {
         String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
         Configuration conf = new Configuration();
         conf.setString("warehouse", path);
-        catalog = FlinkCatalogFactory.createCatalog("test-catalog", conf);
+        catalog =
+                FlinkCatalogFactory.createCatalog(
+                        "test-catalog", conf, 
FlinkCatalogTest.class.getClassLoader());
     }
 
     private ResolvedSchema createSchema() {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
index 66129710..53a68e02 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
@@ -49,7 +49,15 @@ public interface CatalogFactory {
         return new Path(warehouse);
     }
 
+    /**
+     * If the ClassLoader is not specified, using the context ClassLoader of 
current thread as
+     * default.
+     */
     static Catalog createCatalog(Configuration options) {
+        return createCatalog(options, 
Thread.currentThread().getContextClassLoader());
+    }
+
+    static Catalog createCatalog(Configuration options, ClassLoader 
classLoader) {
         // manual validation
         // because different catalog types may have different options
         // we can't list them all in the optionalOptions() method
@@ -57,7 +65,7 @@ public interface CatalogFactory {
 
         String metastore = options.get(METASTORE);
         List<CatalogFactory> factories = new ArrayList<>();
-        ServiceLoader.load(CatalogFactory.class, 
Thread.currentThread().getContextClassLoader())
+        ServiceLoader.load(CatalogFactory.class, classLoader)
                 .iterator()
                 .forEachRemaining(
                         f -> {

Reply via email to