This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f68967a7c93 [FLINK-32660][table] Introduce external FS compatible 
FileCatalogStore implementation (#23063)
f68967a7c93 is described below

commit f68967a7c938f6258125c3221be74522965d1ff2
Author: Ferenc Csaky <ferenc.cs...@pm.me>
AuthorDate: Fri Aug 11 11:56:00 2023 +0200

    [FLINK-32660][table] Introduce external FS compatible FileCatalogStore 
implementation (#23063)
---
 .../flink/table/catalog/FileCatalogStore.java      | 208 +++++++++++----------
 .../table/catalog/FileCatalogStoreFactory.java     |  28 ++-
 .../catalog/FileCatalogStoreFactoryOptions.java    |   9 -
 .../table/catalog/FileCatalogStoreFactoryTest.java |  55 ++++++
 .../flink/table/catalog/FileCatalogStoreTest.java  | 192 ++++++++++---------
 5 files changed, 282 insertions(+), 210 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java
index e19cee17cb6..b1b5c32f46c 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java
@@ -19,23 +19,26 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.util.FileUtils;
 
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.Yaml;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * A {@link CatalogStore} that stores all catalog configuration to a 
directory. Configuration of
@@ -46,42 +49,56 @@ public class FileCatalogStore extends AbstractCatalogStore {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FileCatalogStore.class);
 
-    private static final String FILE_EXTENSION = ".yaml";
+    static final String FILE_EXTENSION = ".yaml";
 
-    /** The directory path where catalog configurations will be stored. */
-    private final String catalogStoreDirectory;
-
-    /** The character set to use when reading and writing catalog files. */
-    private final String charset;
+    /** The YAML mapper to use when reading and writing catalog files. */
+    private static final YAMLMapper YAML_MAPPER =
+            new 
YAMLMapper().disable(YAMLGenerator.Feature.WRITE_DOC_START_MARKER);
 
-    /** The YAML parser to use when reading and writing catalog files. */
-    private final Yaml yaml = new Yaml();
+    /** The directory path where catalog configurations will be stored. */
+    private final Path catalogStorePath;
 
     /**
      * Creates a new {@link FileCatalogStore} instance with the specified 
directory path.
      *
-     * @param catalogStoreDirectory the directory path where catalog 
configurations will be stored
+     * @param catalogStorePath the directory path where catalog configurations 
will be stored
      */
-    public FileCatalogStore(String catalogStoreDirectory, String charset) {
-        this.catalogStoreDirectory = catalogStoreDirectory;
-        this.charset = charset;
+    public FileCatalogStore(String catalogStorePath) {
+        this.catalogStorePath = new Path(catalogStorePath);
     }
 
     /**
      * Opens the catalog store and initializes the catalog file map.
      *
-     * @throws CatalogException if the catalog store directory does not exist 
or if there is an
-     *     error reading the directory
+     * @throws CatalogException if the catalog store directory does not exist, 
not a directory, or
+     *     if there is an error reading the directory
      */
     @Override
     public void open() throws CatalogException {
-        super.open();
-
         try {
+            FileSystem fs = catalogStorePath.getFileSystem();
+            if (!fs.exists(catalogStorePath)) {
+                throw new CatalogException(
+                        String.format(
+                                "Failed to open catalog store. The catalog 
store directory %s does not exist.",
+                                catalogStorePath));
+            }
 
-        } catch (Throwable e) {
-            throw new CatalogException("Failed to open file catalog store 
directory", e);
+            if (!fs.getFileStatus(catalogStorePath).isDir()) {
+                throw new CatalogException(
+                        String.format(
+                                "Failed to open catalog store. The given 
catalog store path %s is not a directory.",
+                                catalogStorePath));
+            }
+        } catch (CatalogException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed to open file catalog store directory %s.", 
catalogStorePath),
+                    e);
         }
+        super.open();
     }
 
     /**
@@ -97,25 +114,29 @@ public class FileCatalogStore extends AbstractCatalogStore 
{
             throws CatalogException {
         checkOpenState();
 
-        Path filePath = getCatalogPath(catalogName);
+        Path catalogPath = getCatalogPath(catalogName);
         try {
-            File file = filePath.toFile();
-            if (file.exists()) {
+            FileSystem fs = catalogPath.getFileSystem();
+
+            if (fs.exists(catalogPath)) {
                 throw new CatalogException(
                         String.format(
                                 "Catalog %s's store file %s is already exist.",
-                                catalogName, filePath));
+                                catalogName, catalogPath));
+            }
+
+            try (FSDataOutputStream os = fs.create(catalogPath, 
WriteMode.NO_OVERWRITE)) {
+                YAML_MAPPER.writeValue(os, catalog.getConfiguration().toMap());
             }
-            // create a new file
-            file.createNewFile();
-            String yamlString = 
yaml.dumpAsMap(catalog.getConfiguration().toMap());
-            FileUtils.writeFile(file, yamlString, charset);
-            LOG.info("Catalog {}'s configuration saved to file {}", 
catalogName, filePath);
-        } catch (Throwable e) {
+
+            LOG.info("Catalog {}'s configuration saved to file {}", 
catalogName, catalogPath);
+        } catch (CatalogException e) {
+            throw e;
+        } catch (Exception e) {
             throw new CatalogException(
                     String.format(
-                            "Failed to save catalog %s's configuration to file 
%s : %s",
-                            catalogName, filePath, e.getMessage()),
+                            "Failed to store catalog %s's configuration to 
file %s.",
+                            catalogName, catalogPath),
                     e);
         }
     }
@@ -132,31 +153,23 @@ public class FileCatalogStore extends 
AbstractCatalogStore {
     public void removeCatalog(String catalogName, boolean ignoreIfNotExists)
             throws CatalogException {
         checkOpenState();
-
-        Path path = getCatalogPath(catalogName);
+        Path catalogPath = getCatalogPath(catalogName);
         try {
-            File file = path.toFile();
-            if (file.exists()) {
-                if (!file.isFile()) {
-                    throw new CatalogException(
-                            String.format(
-                                    "Catalog %s's store file %s is not a 
regular file",
-                                    catalogName, path.getFileName()));
-                }
-                Files.deleteIfExists(path);
-            } else {
-                if (!ignoreIfNotExists) {
-                    throw new CatalogException(
-                            String.format(
-                                    "Catalog %s's store file %s is not exist", 
catalogName, path));
-                }
+            FileSystem fs = catalogPath.getFileSystem();
+
+            if (fs.exists(catalogPath)) {
+                fs.delete(catalogPath, false);
+            } else if (!ignoreIfNotExists) {
+                throw new CatalogException(
+                        String.format(
+                                "Catalog %s's store file %s does not exist.",
+                                catalogName, catalogPath));
             }
-        } catch (Throwable e) {
+        } catch (CatalogException e) {
+            throw e;
+        } catch (Exception e) {
             throw new CatalogException(
-                    String.format(
-                            "Failed to delete catalog %s's store file: %s",
-                            catalogName, e.getMessage()),
-                    e);
+                    String.format("Failed to remove catalog %s's store file.", 
catalogName), e);
         }
     }
 
@@ -172,22 +185,28 @@ public class FileCatalogStore extends 
AbstractCatalogStore {
     @Override
     public Optional<CatalogDescriptor> getCatalog(String catalogName) throws 
CatalogException {
         checkOpenState();
-
-        Path path = getCatalogPath(catalogName);
+        Path catalogPath = getCatalogPath(catalogName);
         try {
-            File file = path.toFile();
-            if (!file.exists()) {
-                LOG.warn("Catalog {}'s store file %s does not exist.", 
catalogName, path);
+            FileSystem fs = catalogPath.getFileSystem();
+
+            if (!fs.exists(catalogPath)) {
                 return Optional.empty();
             }
-            String content = FileUtils.readFile(file, charset);
-            Map<String, String> options = yaml.load(content);
-            return Optional.of(CatalogDescriptor.of(catalogName, 
Configuration.fromMap(options)));
-        } catch (Throwable t) {
+
+            try (FSDataInputStream is = fs.open(catalogPath)) {
+                Map<String, String> configMap =
+                        YAML_MAPPER.readValue(is, new 
TypeReference<Map<String, String>>() {});
+
+                CatalogDescriptor catalog =
+                        CatalogDescriptor.of(catalogName, 
Configuration.fromMap(configMap));
+
+                return Optional.of(catalog);
+            }
+        } catch (Exception e) {
             throw new CatalogException(
                     String.format(
-                            "Failed to load catalog %s's configuration from 
file", catalogName),
-                    t);
+                            "Failed to load catalog %s's configuration from 
file.", catalogName),
+                    e);
         }
     }
 
@@ -201,8 +220,21 @@ public class FileCatalogStore extends AbstractCatalogStore 
{
     @Override
     public Set<String> listCatalogs() throws CatalogException {
         checkOpenState();
+        try {
+            FileStatus[] statusArr = 
catalogStorePath.getFileSystem().listStatus(catalogStorePath);
 
-        return Collections.unmodifiableSet(listAllCatalogFiles().keySet());
+            return Arrays.stream(statusArr)
+                    .filter(status -> !status.isDir())
+                    .map(FileStatus::getPath)
+                    .map(Path::getName)
+                    .map(filename -> filename.replace(FILE_EXTENSION, ""))
+                    .collect(Collectors.toSet());
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed to list file catalog store directory %s.", 
catalogStorePath),
+                    e);
+        }
     }
 
     /**
@@ -216,33 +248,19 @@ public class FileCatalogStore extends 
AbstractCatalogStore {
     @Override
     public boolean contains(String catalogName) throws CatalogException {
         checkOpenState();
-
-        return listAllCatalogFiles().containsKey(catalogName);
-    }
-
-    private Map<String, Path> listAllCatalogFiles() throws CatalogException {
-        Map<String, Path> files = new HashMap<>();
-        File directoryFile = new File(catalogStoreDirectory);
-        if (!directoryFile.isDirectory()) {
-            throw new CatalogException("File catalog store only support local 
directory");
-        }
-
+        Path catalogPath = getCatalogPath(catalogName);
         try {
-            Files.list(directoryFile.toPath())
-                    .filter(file -> 
file.getFileName().toString().endsWith(FILE_EXTENSION))
-                    .filter(Files::isRegularFile)
-                    .forEach(
-                            p ->
-                                    files.put(
-                                            
p.getFileName().toString().replace(FILE_EXTENSION, ""),
-                                            p));
-        } catch (Throwable t) {
-            throw new CatalogException("Failed to list file catalog store 
directory", t);
+            return catalogPath.getFileSystem().exists(catalogPath);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed to check if catalog %s exists in the 
catalog store.",
+                            catalogName),
+                    e);
         }
-        return files;
     }
 
     private Path getCatalogPath(String catalogName) {
-        return Paths.get(catalogStoreDirectory, catalogName + FILE_EXTENSION);
+        return new Path(catalogStorePath, catalogName + FILE_EXTENSION);
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactory.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactory.java
index 57d26f008e4..844220c22d2 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactory.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactory.java
@@ -20,42 +20,39 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.factories.CatalogStoreFactory;
-import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.FactoryHelper;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
-import static 
org.apache.flink.table.catalog.FileCatalogStoreFactoryOptions.CHARSET;
 import static 
org.apache.flink.table.catalog.FileCatalogStoreFactoryOptions.IDENTIFIER;
 import static 
org.apache.flink.table.catalog.FileCatalogStoreFactoryOptions.PATH;
 import static 
org.apache.flink.table.factories.FactoryUtil.createCatalogStoreFactoryHelper;
 
-/** CatalogStore factory for {@link FileCatalogStore}. */
+/** Catalog store factory for {@link FileCatalogStore}. */
 public class FileCatalogStoreFactory implements CatalogStoreFactory {
 
     private String path;
 
-    private String charset;
-
     @Override
     public CatalogStore createCatalogStore() {
-        return new FileCatalogStore(path, charset);
+        return new FileCatalogStore(path);
     }
 
     @Override
-    public void open(Context context) throws CatalogException {
-        FactoryUtil.FactoryHelper factoryHelper = 
createCatalogStoreFactoryHelper(this, context);
+    public void open(Context context) {
+        FactoryHelper<CatalogStoreFactory> factoryHelper =
+                createCatalogStoreFactoryHelper(this, context);
         factoryHelper.validate();
-        ReadableConfig options = factoryHelper.getOptions();
 
+        ReadableConfig options = factoryHelper.getOptions();
         path = options.get(PATH);
-        charset = options.get(CHARSET);
     }
 
     @Override
-    public void close() throws CatalogException {}
+    public void close() {}
 
     @Override
     public String factoryIdentifier() {
@@ -66,13 +63,12 @@ public class FileCatalogStoreFactory implements 
CatalogStoreFactory {
     public Set<ConfigOption<?>> requiredOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
         options.add(PATH);
-        return options;
+
+        return Collections.unmodifiableSet(options);
     }
 
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
-        Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(CHARSET);
-        return options;
+        return Collections.emptySet();
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactoryOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactoryOptions.java
index 51501a50912..7e1a30c1c51 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactoryOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactoryOptions.java
@@ -21,8 +21,6 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
-import java.nio.charset.StandardCharsets;
-
 /** {@link ConfigOption}s for {@link FileCatalogStoreFactory}. */
 public class FileCatalogStoreFactoryOptions {
 
@@ -35,12 +33,5 @@ public class FileCatalogStoreFactoryOptions {
                     .withDescription(
                             "The configuration option for specifying the path 
to the file catalog store.");
 
-    public static final ConfigOption<String> CHARSET =
-            ConfigOptions.key("charset")
-                    .stringType()
-                    .defaultValue(StandardCharsets.UTF_8.displayName())
-                    .withDescription(
-                            "The charset used for storing/reading the catalog 
configuration.");
-
     private FileCatalogStoreFactoryOptions() {}
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FileCatalogStoreFactoryTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FileCatalogStoreFactoryTest.java
new file mode 100644
index 00000000000..bfc7fc8e227
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FileCatalogStoreFactoryTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.factories.CatalogStoreFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for {@link FileCatalogStoreFactory}. */
+class FileCatalogStoreFactoryTest {
+
+    @Test
+    void testFileCatalogStoreFactoryDiscovery(@TempDir File tempFolder) {
+
+        String factoryIdentifier = FileCatalogStoreFactoryOptions.IDENTIFIER;
+        Map<String, String> options = new HashMap<>();
+        options.put("path", tempFolder.getAbsolutePath());
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+        final FactoryUtil.DefaultCatalogStoreContext discoveryContext =
+                new FactoryUtil.DefaultCatalogStoreContext(options, null, 
classLoader);
+        final CatalogStoreFactory factory =
+                FactoryUtil.discoverFactory(
+                        classLoader, CatalogStoreFactory.class, 
factoryIdentifier);
+        factory.open(discoveryContext);
+
+        CatalogStore catalogStore = factory.createCatalogStore();
+        assertThat(catalogStore instanceof FileCatalogStore).isTrue();
+
+        factory.close();
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FileCatalogStoreTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FileCatalogStoreTest.java
index ada5a11380f..f79df92b32e 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FileCatalogStoreTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FileCatalogStoreTest.java
@@ -20,130 +20,142 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.factories.CatalogStoreFactory;
-import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.util.FileUtils;
 
+import org.assertj.core.api.ThrowableAssert;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
+import java.nio.file.Path;
+import java.util.Set;
 
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
-/** Test for {@link FileCatalogStore}. */
-public class FileCatalogStoreTest {
-    @Test
-    void testFileCatalogStoreFactoryDiscovery(@TempDir File tempFolder) {
-
-        String factoryIdentifier = FileCatalogStoreFactoryOptions.IDENTIFIER;
-        Map<String, String> options = new HashMap<>();
-        options.put("path", tempFolder.getAbsolutePath());
-        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
-        final FactoryUtil.DefaultCatalogStoreContext discoveryContext =
-                new FactoryUtil.DefaultCatalogStoreContext(options, null, 
classLoader);
-        final CatalogStoreFactory factory =
-                FactoryUtil.discoverFactory(
-                        classLoader, CatalogStoreFactory.class, 
factoryIdentifier);
-        factory.open(discoveryContext);
-
-        CatalogStore catalogStore = factory.createCatalogStore();
-        assertThat(catalogStore instanceof FileCatalogStore).isTrue();
-
-        factory.close();
+/** Tests for {@link FileCatalogStore}. */
+class FileCatalogStoreTest {
+
+    private static final String CATALOG_STORE_DIR_NAME = "dummy-catalog-store";
+    private static final String DUMMY = "dummy";
+    private static final CatalogDescriptor DUMMY_CATALOG;
+
+    static {
+        Configuration conf = new Configuration();
+        conf.set(CommonCatalogOptions.CATALOG_TYPE, DUMMY);
+        conf.set(GenericInMemoryCatalogFactoryOptions.DEFAULT_DATABASE, 
"dummy_db");
+
+        DUMMY_CATALOG = CatalogDescriptor.of(DUMMY, conf);
     }
 
+    @TempDir private Path tempDir;
+
     @Test
-    void testFileCatalogStoreSaveAndRead(@TempDir File tempFolder) throws 
IOException {
-        CatalogStore catalogStore = new 
FileCatalogStore(tempFolder.getAbsolutePath(), "utf-8");
-        catalogStore.open();
+    void testNotOpened() {
+        CatalogStore catalogStore = initCatalogStore(false);
+
+        assertCatalogStoreNotOpened(catalogStore::listCatalogs);
+        assertCatalogStoreNotOpened(() -> catalogStore.contains(DUMMY));
+        assertCatalogStoreNotOpened(() -> catalogStore.getCatalog(DUMMY));
+        assertCatalogStoreNotOpened(() -> catalogStore.storeCatalog(DUMMY, 
DUMMY_CATALOG));
+        assertCatalogStoreNotOpened(() -> catalogStore.removeCatalog(DUMMY, 
true));
+    }
 
-        Configuration catalogConfiguration = new Configuration();
-        catalogConfiguration.setString("type", "generic_in_memory");
+    @Test
+    void testStoreDirNotExists() {
+        CatalogStore catalogStore = initCatalogStore(false);
+        Path catalogStorePath = tempDir.resolve(CATALOG_STORE_DIR_NAME);
 
-        // store catalog to catalog store
-        catalogStore.storeCatalog("cat1", CatalogDescriptor.of("cat1", 
catalogConfiguration));
-        catalogStore.storeCatalog("cat2", CatalogDescriptor.of("cat2", 
catalogConfiguration));
+        assertThatThrownBy(catalogStore::open)
+                .isInstanceOf(CatalogException.class)
+                .hasMessageContaining(
+                        "Failed to open catalog store. The catalog store 
directory "
+                                + catalogStorePath
+                                + " does not exist.");
+    }
 
-        assertTrue(catalogStore.contains("cat1"));
-        assertTrue(catalogStore.contains("cat2"));
+    @Test
+    void testStore() {
+        CatalogStore catalogStore = initCatalogStore(true);
+        catalogStore.open();
 
-        // check catalog file is right created
-        List<String> files =
-                Arrays.stream(tempFolder.listFiles())
-                        .map(file -> file.getName())
-                        .collect(Collectors.toList());
-        assertTrue(files.contains("cat1.yaml"));
-        assertTrue(files.contains("cat2.yaml"));
+        catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG);
 
-        // check file content
-        String yamlPath1 = String.format("%s/%s", tempFolder, "cat1.yaml");
-        String content = FileUtils.readFileUtf8(new File(yamlPath1));
-        assertThat(content).isEqualTo("type: generic_in_memory\n");
+        File catalog = getCatalogFile();
+        assertThat(catalog.exists()).isTrue();
+        assertThat(catalog.isFile()).isTrue();
+        assertThat(catalogStore.contains(DUMMY)).isTrue();
 
-        catalogStore.close();
+        Set<String> storedCatalogs = catalogStore.listCatalogs();
+        assertThat(storedCatalogs.size()).isEqualTo(1);
+        assertThat(storedCatalogs.contains(DUMMY)).isTrue();
+    }
 
-        // create a new FileCatalogStore, check catalog is right loaded.
-        catalogStore = new FileCatalogStore(tempFolder.getAbsolutePath(), 
"utf-8");
+    @Test
+    void testRemoveExisting() {
+        CatalogStore catalogStore = initCatalogStore(true);
         catalogStore.open();
 
-        assertTrue(catalogStore.listCatalogs().contains("cat1"));
-        assertTrue(catalogStore.listCatalogs().contains("cat2"));
+        catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG);
+        assertThat(catalogStore.listCatalogs().size()).isEqualTo(1);
 
-        // test remove operation.
-        catalogStore.removeCatalog("cat1", false);
-        catalogStore.close();
+        catalogStore.removeCatalog(DUMMY, false);
+        assertThat(catalogStore.listCatalogs().size()).isEqualTo(0);
+        assertThat(catalogStore.contains(DUMMY)).isFalse();
 
-        catalogStore = new FileCatalogStore(tempFolder.getAbsolutePath(), 
"utf-8");
-        catalogStore.open();
-        assertFalse(catalogStore.listCatalogs().contains("cat1"));
-        assertTrue(catalogStore.listCatalogs().contains("cat2"));
+        File catalog = getCatalogFile();
+        assertThat(catalog.exists()).isFalse();
     }
 
     @Test
-    void testInvalidCases(@TempDir File tempFolder) {
-        // test catalog store the catalog already exists.
-        final CatalogStore catalogStore =
-                new FileCatalogStore(tempFolder.getAbsolutePath(), "utf-8");
+    void testRemoveNonExisting() {
+        CatalogStore catalogStore = initCatalogStore(true);
         catalogStore.open();
 
-        Configuration catalogConfiguration = new Configuration();
-        catalogConfiguration.setString("type", "generic_in_memory");
+        catalogStore.removeCatalog(DUMMY, true);
 
-        // store catalog to catalog store
-        catalogStore.storeCatalog("cat1", CatalogDescriptor.of("cat1", 
catalogConfiguration));
-        assertThatThrownBy(
-                        () ->
-                                catalogStore.storeCatalog(
-                                        "cat1", CatalogDescriptor.of("cat1", 
catalogConfiguration)))
+        File catalog = getCatalogFile();
+        assertThatThrownBy(() -> catalogStore.removeCatalog(DUMMY, false))
                 .isInstanceOf(CatalogException.class)
-                .hasMessageContaining("Failed to save catalog cat1's 
configuration to file");
+                .hasMessageContaining(
+                        "Catalog " + DUMMY + "'s store file " + catalog + " 
does not exist.");
+    }
 
-        // get a no exist catalog
-        assertThat(catalogStore.getCatalog("cat3")).isEmpty();
+    @Test
+    void testClose() {
+        CatalogStore catalogStore = initCatalogStore(true);
+        catalogStore.open();
 
-        // remove a no exist catalog
-        assertThatThrownBy(() -> catalogStore.removeCatalog("cat3", false))
-                .isInstanceOf(CatalogException.class)
-                .hasMessageContaining("Failed to delete catalog cat3's store 
file");
+        catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG);
+        assertThat(catalogStore.listCatalogs().size()).isEqualTo(1);
 
         catalogStore.close();
 
-        // test unsupported file schema
-        CatalogStore catalogStore2 =
-                new 
FileCatalogStore("hdfs://namenode:14565/some/path/to/a/file", "utf-8");
-        catalogStore2.open();
-        assertThatThrownBy(() -> catalogStore2.listCatalogs())
-                .isInstanceOf(CatalogException.class)
-                .hasMessageContaining("File catalog store only support local 
directory");
+        assertCatalogStoreNotOpened(catalogStore::listCatalogs);
+        assertCatalogStoreNotOpened(() -> catalogStore.contains(DUMMY));
+        assertCatalogStoreNotOpened(() -> catalogStore.getCatalog(DUMMY));
+        assertCatalogStoreNotOpened(() -> catalogStore.storeCatalog(DUMMY, 
DUMMY_CATALOG));
+        assertCatalogStoreNotOpened(() -> catalogStore.removeCatalog(DUMMY, 
true));
+    }
+
+    private void assertCatalogStoreNotOpened(
+            ThrowableAssert.ThrowingCallable shouldRaiseThrowable) {
+        assertThatThrownBy(shouldRaiseThrowable)
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("CatalogStore is not opened yet.");
+    }
+
+    private CatalogStore initCatalogStore(boolean createDir) {
+        Path catalogStorePath = tempDir.resolve(CATALOG_STORE_DIR_NAME);
+        if (createDir) {
+            catalogStorePath.toFile().mkdir();
+        }
+
+        return new FileCatalogStore(catalogStorePath.toString());
+    }
+
+    private File getCatalogFile() {
+        return tempDir.resolve(CATALOG_STORE_DIR_NAME)
+                .resolve(DUMMY + FileCatalogStore.FILE_EXTENSION)
+                .toFile();
     }
 }

Reply via email to