This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 0564fbf Flink: Use Namespace in FlinkCatalog. (#2392)
0564fbf is described below
commit 0564fbf821298e57f63f85438f46c257cc563eb9
Author: openinx <[email protected]>
AuthorDate: Wed Mar 31 16:36:03 2021 +0800
Flink: Use Namespace in FlinkCatalog. (#2392)
---
.../main/java/org/apache/iceberg/flink/FlinkCatalog.java | 14 ++++++--------
.../java/org/apache/iceberg/flink/FlinkCatalogFactory.java | 11 +++++++----
2 files changed, 13 insertions(+), 12 deletions(-)
diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index b9cb501..de62e67 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -91,17 +91,15 @@ public class FlinkCatalog extends AbstractCatalog {
private final CatalogLoader catalogLoader;
private final Catalog icebergCatalog;
- private final String[] baseNamespace;
+ private final Namespace baseNamespace;
private final SupportsNamespaces asNamespaceCatalog;
private final Closeable closeable;
private final boolean cacheEnabled;
- // TODO - Update baseNamespace to use Namespace class
- // https://github.com/apache/iceberg/issues/1541
public FlinkCatalog(
String catalogName,
String defaultDatabase,
- String[] baseNamespace,
+ Namespace baseNamespace,
CatalogLoader catalogLoader,
boolean cacheEnabled) {
super(catalogName, defaultDatabase);
@@ -141,9 +139,9 @@ public class FlinkCatalog extends AbstractCatalog {
}
private Namespace toNamespace(String database) {
- String[] namespace = new String[baseNamespace.length + 1];
- System.arraycopy(baseNamespace, 0, namespace, 0, baseNamespace.length);
- namespace[baseNamespace.length] = database;
+ String[] namespace = new String[baseNamespace.levels().length + 1];
+ System.arraycopy(baseNamespace.levels(), 0, namespace, 0,
baseNamespace.levels().length);
+ namespace[baseNamespace.levels().length] = database;
return Namespace.of(namespace);
}
@@ -157,7 +155,7 @@ public class FlinkCatalog extends AbstractCatalog {
return Collections.singletonList(getDefaultDatabase());
}
- return
asNamespaceCatalog.listNamespaces(Namespace.of(baseNamespace)).stream()
+ return asNamespaceCatalog.listNamespaces(baseNamespace).stream()
.map(n -> n.level(n.levels().length - 1))
.collect(Collectors.toList());
}
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 2ee63e4..b129d0b 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -33,8 +33,8 @@ import org.apache.flink.table.factories.CatalogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -120,9 +120,12 @@ public class FlinkCatalogFactory implements CatalogFactory
{
protected Catalog createCatalog(String name, Map<String, String> properties,
Configuration hadoopConf) {
CatalogLoader catalogLoader = createCatalogLoader(name, properties,
hadoopConf);
String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE,
"default");
- String[] baseNamespace = properties.containsKey(BASE_NAMESPACE) ?
-
Splitter.on('.').splitToList(properties.get(BASE_NAMESPACE)).toArray(new
String[0]) :
- new String[0];
+
+ Namespace baseNamespace = Namespace.empty();
+ if (properties.containsKey(BASE_NAMESPACE)) {
+ baseNamespace =
Namespace.of(properties.get(BASE_NAMESPACE).split("\\."));
+ }
+
boolean cacheEnabled =
Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
return new FlinkCatalog(name, defaultDatabase, baseNamespace,
catalogLoader, cacheEnabled);
}