This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 0564fbf  Flink: Use Namespace in FlinkCatalog. (#2392)
0564fbf is described below

commit 0564fbf821298e57f63f85438f46c257cc563eb9
Author: openinx <[email protected]>
AuthorDate: Wed Mar 31 16:36:03 2021 +0800

    Flink: Use Namespace in FlinkCatalog. (#2392)
---
 .../main/java/org/apache/iceberg/flink/FlinkCatalog.java   | 14 ++++++--------
 .../java/org/apache/iceberg/flink/FlinkCatalogFactory.java | 11 +++++++----
 2 files changed, 13 insertions(+), 12 deletions(-)

diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java 
b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index b9cb501..de62e67 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -91,17 +91,15 @@ public class FlinkCatalog extends AbstractCatalog {
 
   private final CatalogLoader catalogLoader;
   private final Catalog icebergCatalog;
-  private final String[] baseNamespace;
+  private final Namespace baseNamespace;
   private final SupportsNamespaces asNamespaceCatalog;
   private final Closeable closeable;
   private final boolean cacheEnabled;
 
-  // TODO - Update baseNamespace to use Namespace class
-  // https://github.com/apache/iceberg/issues/1541
   public FlinkCatalog(
       String catalogName,
       String defaultDatabase,
-      String[] baseNamespace,
+      Namespace baseNamespace,
       CatalogLoader catalogLoader,
       boolean cacheEnabled) {
     super(catalogName, defaultDatabase);
@@ -141,9 +139,9 @@ public class FlinkCatalog extends AbstractCatalog {
   }
 
   private Namespace toNamespace(String database) {
-    String[] namespace = new String[baseNamespace.length + 1];
-    System.arraycopy(baseNamespace, 0, namespace, 0, baseNamespace.length);
-    namespace[baseNamespace.length] = database;
+    String[] namespace = new String[baseNamespace.levels().length + 1];
+    System.arraycopy(baseNamespace.levels(), 0, namespace, 0, 
baseNamespace.levels().length);
+    namespace[baseNamespace.levels().length] = database;
     return Namespace.of(namespace);
   }
 
@@ -157,7 +155,7 @@ public class FlinkCatalog extends AbstractCatalog {
       return Collections.singletonList(getDefaultDatabase());
     }
 
-    return 
asNamespaceCatalog.listNamespaces(Namespace.of(baseNamespace)).stream()
+    return asNamespaceCatalog.listNamespaces(baseNamespace).stream()
         .map(n -> n.level(n.levels().length - 1))
         .collect(Collectors.toList());
   }
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java 
b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 2ee63e4..b129d0b 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -33,8 +33,8 @@ import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.base.Splitter;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -120,9 +120,12 @@ public class FlinkCatalogFactory implements CatalogFactory 
{
   protected Catalog createCatalog(String name, Map<String, String> properties, 
Configuration hadoopConf) {
     CatalogLoader catalogLoader = createCatalogLoader(name, properties, 
hadoopConf);
     String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE, 
"default");
-    String[] baseNamespace = properties.containsKey(BASE_NAMESPACE) ?
-        
Splitter.on('.').splitToList(properties.get(BASE_NAMESPACE)).toArray(new 
String[0]) :
-        new String[0];
+
+    Namespace baseNamespace = Namespace.empty();
+    if (properties.containsKey(BASE_NAMESPACE)) {
+      baseNamespace = 
Namespace.of(properties.get(BASE_NAMESPACE).split("\\."));
+    }
+
     boolean cacheEnabled = 
Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
     return new FlinkCatalog(name, defaultDatabase, baseNamespace, 
catalogLoader, cacheEnabled);
   }

Reply via email to