This is an automated email from the ASF dual-hosted git repository. zjureel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new f68967a7c93 [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation (#23063) f68967a7c93 is described below commit f68967a7c938f6258125c3221be74522965d1ff2 Author: Ferenc Csaky <ferenc.cs...@pm.me> AuthorDate: Fri Aug 11 11:56:00 2023 +0200 [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation (#23063) --- .../flink/table/catalog/FileCatalogStore.java | 208 +++++++++++---------- .../table/catalog/FileCatalogStoreFactory.java | 28 ++- .../catalog/FileCatalogStoreFactoryOptions.java | 9 - .../table/catalog/FileCatalogStoreFactoryTest.java | 55 ++++++ .../flink/table/catalog/FileCatalogStoreTest.java | 192 ++++++++++--------- 5 files changed, 282 insertions(+), 210 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java index e19cee17cb6..b1b5c32f46c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java @@ -19,23 +19,26 @@ package org.apache.flink.table.catalog; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystem.WriteMode; +import org.apache.flink.core.fs.Path; import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.apache.flink.util.FileUtils; -import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.Yaml; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Collections; -import java.util.HashMap; +import java.util.Arrays; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; /** * A {@link CatalogStore} that stores all catalog configuration to a directory. Configuration of @@ -46,42 +49,56 @@ public class FileCatalogStore extends AbstractCatalogStore { private static final Logger LOG = LoggerFactory.getLogger(FileCatalogStore.class); - private static final String FILE_EXTENSION = ".yaml"; + static final String FILE_EXTENSION = ".yaml"; - /** The directory path where catalog configurations will be stored. */ - private final String catalogStoreDirectory; - - /** The character set to use when reading and writing catalog files. */ - private final String charset; + /** The YAML mapper to use when reading and writing catalog files. */ + private static final YAMLMapper YAML_MAPPER = + new YAMLMapper().disable(YAMLGenerator.Feature.WRITE_DOC_START_MARKER); - /** The YAML parser to use when reading and writing catalog files. */ - private final Yaml yaml = new Yaml(); + /** The directory path where catalog configurations will be stored. */ + private final Path catalogStorePath; /** * Creates a new {@link FileCatalogStore} instance with the specified directory path. * - * @param catalogStoreDirectory the directory path where catalog configurations will be stored + * @param catalogStorePath the directory path where catalog configurations will be stored */ - public FileCatalogStore(String catalogStoreDirectory, String charset) { - this.catalogStoreDirectory = catalogStoreDirectory; - this.charset = charset; + public FileCatalogStore(String catalogStorePath) { + this.catalogStorePath = new Path(catalogStorePath); } /** * Opens the catalog store and initializes the catalog file map. * - * @throws CatalogException if the catalog store directory does not exist or if there is an - * error reading the directory + * @throws CatalogException if the catalog store directory does not exist, not a directory, or + * if there is an error reading the directory */ @Override public void open() throws CatalogException { - super.open(); - try { + FileSystem fs = catalogStorePath.getFileSystem(); + if (!fs.exists(catalogStorePath)) { + throw new CatalogException( + String.format( + "Failed to open catalog store. The catalog store directory %s does not exist.", + catalogStorePath)); + } - } catch (Throwable e) { - throw new CatalogException("Failed to open file catalog store directory", e); + if (!fs.getFileStatus(catalogStorePath).isDir()) { + throw new CatalogException( + String.format( + "Failed to open catalog store. The given catalog store path %s is not a directory.", + catalogStorePath)); + } + } catch (CatalogException e) { + throw e; + } catch (Exception e) { + throw new CatalogException( + String.format( + "Failed to open file catalog store directory %s.", catalogStorePath), + e); } + super.open(); } /** @@ -97,25 +114,29 @@ public class FileCatalogStore extends AbstractCatalogStore { throws CatalogException { checkOpenState(); - Path filePath = getCatalogPath(catalogName); + Path catalogPath = getCatalogPath(catalogName); try { - File file = filePath.toFile(); - if (file.exists()) { + FileSystem fs = catalogPath.getFileSystem(); + + if (fs.exists(catalogPath)) { throw new CatalogException( String.format( "Catalog %s's store file %s is already exist.", - catalogName, filePath)); + catalogName, catalogPath)); + } + + try (FSDataOutputStream os = fs.create(catalogPath, WriteMode.NO_OVERWRITE)) { + YAML_MAPPER.writeValue(os, catalog.getConfiguration().toMap()); } - // create a new file - file.createNewFile(); - String yamlString = yaml.dumpAsMap(catalog.getConfiguration().toMap()); - FileUtils.writeFile(file, yamlString, charset); - LOG.info("Catalog {}'s configuration saved to file {}", catalogName, filePath); - } catch (Throwable e) { + + LOG.info("Catalog {}'s configuration saved to file {}", catalogName, catalogPath); + } catch (CatalogException e) { + throw e; + } catch (Exception e) { throw new CatalogException( String.format( - "Failed to save catalog %s's configuration to file %s : %s", - catalogName, filePath, e.getMessage()), + "Failed to store catalog %s's configuration to file %s.", + catalogName, catalogPath), e); } } @@ -132,31 +153,23 @@ public class FileCatalogStore extends AbstractCatalogStore { public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException { checkOpenState(); - - Path path = getCatalogPath(catalogName); + Path catalogPath = getCatalogPath(catalogName); try { - File file = path.toFile(); - if (file.exists()) { - if (!file.isFile()) { - throw new CatalogException( - String.format( - "Catalog %s's store file %s is not a regular file", - catalogName, path.getFileName())); - } - Files.deleteIfExists(path); - } else { - if (!ignoreIfNotExists) { - throw new CatalogException( - String.format( - "Catalog %s's store file %s is not exist", catalogName, path)); - } + FileSystem fs = catalogPath.getFileSystem(); + + if (fs.exists(catalogPath)) { + fs.delete(catalogPath, false); + } else if (!ignoreIfNotExists) { + throw new CatalogException( + String.format( + "Catalog %s's store file %s does not exist.", + catalogName, catalogPath)); } - } catch (Throwable e) { + } catch (CatalogException e) { + throw e; + } catch (Exception e) { throw new CatalogException( - String.format( - "Failed to delete catalog %s's store file: %s", - catalogName, e.getMessage()), - e); + String.format("Failed to remove catalog %s's store file.", catalogName), e); } } @@ -172,22 +185,28 @@ public class FileCatalogStore extends AbstractCatalogStore { @Override public Optional<CatalogDescriptor> getCatalog(String catalogName) throws CatalogException { checkOpenState(); - - Path path = getCatalogPath(catalogName); + Path catalogPath = getCatalogPath(catalogName); try { - File file = path.toFile(); - if (!file.exists()) { - LOG.warn("Catalog {}'s store file %s does not exist.", catalogName, path); + FileSystem fs = catalogPath.getFileSystem(); + + if (!fs.exists(catalogPath)) { return Optional.empty(); } - String content = FileUtils.readFile(file, charset); - Map<String, String> options = yaml.load(content); - return Optional.of(CatalogDescriptor.of(catalogName, Configuration.fromMap(options))); - } catch (Throwable t) { + + try (FSDataInputStream is = fs.open(catalogPath)) { + Map<String, String> configMap = + YAML_MAPPER.readValue(is, new TypeReference<Map<String, String>>() {}); + + CatalogDescriptor catalog = + CatalogDescriptor.of(catalogName, Configuration.fromMap(configMap)); + + return Optional.of(catalog); + } + } catch (Exception e) { throw new CatalogException( String.format( - "Failed to load catalog %s's configuration from file", catalogName), - t); + "Failed to load catalog %s's configuration from file.", catalogName), + e); } } @@ -201,8 +220,21 @@ public class FileCatalogStore extends AbstractCatalogStore { @Override public Set<String> listCatalogs() throws CatalogException { checkOpenState(); + try { + FileStatus[] statusArr = catalogStorePath.getFileSystem().listStatus(catalogStorePath); - return Collections.unmodifiableSet(listAllCatalogFiles().keySet()); + return Arrays.stream(statusArr) + .filter(status -> !status.isDir()) + .map(FileStatus::getPath) + .map(Path::getName) + .map(filename -> filename.replace(FILE_EXTENSION, "")) + .collect(Collectors.toSet()); + } catch (Exception e) { + throw new CatalogException( + String.format( + "Failed to list file catalog store directory %s.", catalogStorePath), + e); + } } /** @@ -216,33 +248,19 @@ public class FileCatalogStore extends AbstractCatalogStore { @Override public boolean contains(String catalogName) throws CatalogException { checkOpenState(); - - return listAllCatalogFiles().containsKey(catalogName); - } - - private Map<String, Path> listAllCatalogFiles() throws CatalogException { - Map<String, Path> files = new HashMap<>(); - File directoryFile = new File(catalogStoreDirectory); - if (!directoryFile.isDirectory()) { - throw new CatalogException("File catalog store only support local directory"); - } - + Path catalogPath = getCatalogPath(catalogName); try { - Files.list(directoryFile.toPath()) - .filter(file -> file.getFileName().toString().endsWith(FILE_EXTENSION)) - .filter(Files::isRegularFile) - .forEach( - p -> - files.put( - p.getFileName().toString().replace(FILE_EXTENSION, ""), - p)); - } catch (Throwable t) { - throw new CatalogException("Failed to list file catalog store directory", t); + return catalogPath.getFileSystem().exists(catalogPath); + } catch (Exception e) { + throw new CatalogException( + String.format( + "Failed to check if catalog %s exists in the catalog store.", + catalogName), + e); } - return files; } private Path getCatalogPath(String catalogName) { - return Paths.get(catalogStoreDirectory, catalogName + FILE_EXTENSION); + return new Path(catalogStorePath, catalogName + FILE_EXTENSION); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactory.java index 57d26f008e4..844220c22d2 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactory.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactory.java @@ -20,42 +20,39 @@ package org.apache.flink.table.catalog; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.factories.CatalogStoreFactory; -import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.FactoryUtil.FactoryHelper; +import java.util.Collections; import java.util.HashSet; import java.util.Set; -import static org.apache.flink.table.catalog.FileCatalogStoreFactoryOptions.CHARSET; import static org.apache.flink.table.catalog.FileCatalogStoreFactoryOptions.IDENTIFIER; import static org.apache.flink.table.catalog.FileCatalogStoreFactoryOptions.PATH; import static org.apache.flink.table.factories.FactoryUtil.createCatalogStoreFactoryHelper; -/** CatalogStore factory for {@link FileCatalogStore}. */ +/** Catalog store factory for {@link FileCatalogStore}. */ public class FileCatalogStoreFactory implements CatalogStoreFactory { private String path; - private String charset; - @Override public CatalogStore createCatalogStore() { - return new FileCatalogStore(path, charset); + return new FileCatalogStore(path); } @Override - public void open(Context context) throws CatalogException { - FactoryUtil.FactoryHelper factoryHelper = createCatalogStoreFactoryHelper(this, context); + public void open(Context context) { + FactoryHelper<CatalogStoreFactory> factoryHelper = + createCatalogStoreFactoryHelper(this, context); factoryHelper.validate(); - ReadableConfig options = factoryHelper.getOptions(); + ReadableConfig options = factoryHelper.getOptions(); path = options.get(PATH); - charset = options.get(CHARSET); } @Override - public void close() throws CatalogException {} + public void close() {} @Override public String factoryIdentifier() { @@ -66,13 +63,12 @@ public class FileCatalogStoreFactory implements CatalogStoreFactory { public Set<ConfigOption<?>> requiredOptions() { Set<ConfigOption<?>> options = new HashSet<>(); options.add(PATH); - return options; + + return Collections.unmodifiableSet(options); } @Override public Set<ConfigOption<?>> optionalOptions() { - Set<ConfigOption<?>> options = new HashSet<>(); - options.add(CHARSET); - return options; + return Collections.emptySet(); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactoryOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactoryOptions.java index 51501a50912..7e1a30c1c51 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactoryOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactoryOptions.java @@ -21,8 +21,6 @@ package org.apache.flink.table.catalog; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import java.nio.charset.StandardCharsets; - /** {@link ConfigOption}s for {@link FileCatalogStoreFactory}. */ public class FileCatalogStoreFactoryOptions { @@ -35,12 +33,5 @@ public class FileCatalogStoreFactoryOptions { .withDescription( "The configuration option for specifying the path to the file catalog store."); - public static final ConfigOption<String> CHARSET = - ConfigOptions.key("charset") - .stringType() - .defaultValue(StandardCharsets.UTF_8.displayName()) - .withDescription( - "The charset used for storing/reading the catalog configuration."); - private FileCatalogStoreFactoryOptions() {} } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FileCatalogStoreFactoryTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FileCatalogStoreFactoryTest.java new file mode 100644 index 00000000000..bfc7fc8e227 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FileCatalogStoreFactoryTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.table.factories.CatalogStoreFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** Tests for {@link FileCatalogStoreFactory}. */ +class FileCatalogStoreFactoryTest { + + @Test + void testFileCatalogStoreFactoryDiscovery(@TempDir File tempFolder) { + + String factoryIdentifier = FileCatalogStoreFactoryOptions.IDENTIFIER; + Map<String, String> options = new HashMap<>(); + options.put("path", tempFolder.getAbsolutePath()); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + final FactoryUtil.DefaultCatalogStoreContext discoveryContext = + new FactoryUtil.DefaultCatalogStoreContext(options, null, classLoader); + final CatalogStoreFactory factory = + FactoryUtil.discoverFactory( + classLoader, CatalogStoreFactory.class, factoryIdentifier); + factory.open(discoveryContext); + + CatalogStore catalogStore = factory.createCatalogStore(); + assertThat(catalogStore instanceof FileCatalogStore).isTrue(); + + factory.close(); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FileCatalogStoreTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FileCatalogStoreTest.java index ada5a11380f..f79df92b32e 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FileCatalogStoreTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FileCatalogStoreTest.java @@ -20,130 +20,142 @@ package org.apache.flink.table.catalog; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.apache.flink.table.factories.CatalogStoreFactory; -import org.apache.flink.table.factories.FactoryUtil; -import org.apache.flink.util.FileUtils; +import org.assertj.core.api.ThrowableAssert; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; +import java.nio.file.Path; +import java.util.Set; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -/** Test for {@link FileCatalogStore}. */ -public class FileCatalogStoreTest { - @Test - void testFileCatalogStoreFactoryDiscovery(@TempDir File tempFolder) { - - String factoryIdentifier = FileCatalogStoreFactoryOptions.IDENTIFIER; - Map<String, String> options = new HashMap<>(); - options.put("path", tempFolder.getAbsolutePath()); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - final FactoryUtil.DefaultCatalogStoreContext discoveryContext = - new FactoryUtil.DefaultCatalogStoreContext(options, null, classLoader); - final CatalogStoreFactory factory = - FactoryUtil.discoverFactory( - classLoader, CatalogStoreFactory.class, factoryIdentifier); - factory.open(discoveryContext); - - CatalogStore catalogStore = factory.createCatalogStore(); - assertThat(catalogStore instanceof FileCatalogStore).isTrue(); - - factory.close(); +/** Tests for {@link FileCatalogStore}. */ +class FileCatalogStoreTest { + + private static final String CATALOG_STORE_DIR_NAME = "dummy-catalog-store"; + private static final String DUMMY = "dummy"; + private static final CatalogDescriptor DUMMY_CATALOG; + + static { + Configuration conf = new Configuration(); + conf.set(CommonCatalogOptions.CATALOG_TYPE, DUMMY); + conf.set(GenericInMemoryCatalogFactoryOptions.DEFAULT_DATABASE, "dummy_db"); + + DUMMY_CATALOG = CatalogDescriptor.of(DUMMY, conf); } + @TempDir private Path tempDir; + @Test - void testFileCatalogStoreSaveAndRead(@TempDir File tempFolder) throws IOException { - CatalogStore catalogStore = new FileCatalogStore(tempFolder.getAbsolutePath(), "utf-8"); - catalogStore.open(); + void testNotOpened() { + CatalogStore catalogStore = initCatalogStore(false); + + assertCatalogStoreNotOpened(catalogStore::listCatalogs); + assertCatalogStoreNotOpened(() -> catalogStore.contains(DUMMY)); + assertCatalogStoreNotOpened(() -> catalogStore.getCatalog(DUMMY)); + assertCatalogStoreNotOpened(() -> catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG)); + assertCatalogStoreNotOpened(() -> catalogStore.removeCatalog(DUMMY, true)); + } - Configuration catalogConfiguration = new Configuration(); - catalogConfiguration.setString("type", "generic_in_memory"); + @Test + void testStoreDirNotExists() { + CatalogStore catalogStore = initCatalogStore(false); + Path catalogStorePath = tempDir.resolve(CATALOG_STORE_DIR_NAME); - // store catalog to catalog store - catalogStore.storeCatalog("cat1", CatalogDescriptor.of("cat1", catalogConfiguration)); - catalogStore.storeCatalog("cat2", CatalogDescriptor.of("cat2", catalogConfiguration)); + assertThatThrownBy(catalogStore::open) + .isInstanceOf(CatalogException.class) + .hasMessageContaining( + "Failed to open catalog store. The catalog store directory " + + catalogStorePath + + " does not exist."); + } - assertTrue(catalogStore.contains("cat1")); - assertTrue(catalogStore.contains("cat2")); + @Test + void testStore() { + CatalogStore catalogStore = initCatalogStore(true); + catalogStore.open(); - // check catalog file is right created - List<String> files = - Arrays.stream(tempFolder.listFiles()) - .map(file -> file.getName()) - .collect(Collectors.toList()); - assertTrue(files.contains("cat1.yaml")); - assertTrue(files.contains("cat2.yaml")); + catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG); - // check file content - String yamlPath1 = String.format("%s/%s", tempFolder, "cat1.yaml"); - String content = FileUtils.readFileUtf8(new File(yamlPath1)); - assertThat(content).isEqualTo("type: generic_in_memory\n"); + File catalog = getCatalogFile(); + assertThat(catalog.exists()).isTrue(); + assertThat(catalog.isFile()).isTrue(); + assertThat(catalogStore.contains(DUMMY)).isTrue(); - catalogStore.close(); + Set<String> storedCatalogs = catalogStore.listCatalogs(); + assertThat(storedCatalogs.size()).isEqualTo(1); + assertThat(storedCatalogs.contains(DUMMY)).isTrue(); + } - // create a new FileCatalogStore, check catalog is right loaded. - catalogStore = new FileCatalogStore(tempFolder.getAbsolutePath(), "utf-8"); + @Test + void testRemoveExisting() { + CatalogStore catalogStore = initCatalogStore(true); catalogStore.open(); - assertTrue(catalogStore.listCatalogs().contains("cat1")); - assertTrue(catalogStore.listCatalogs().contains("cat2")); + catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG); + assertThat(catalogStore.listCatalogs().size()).isEqualTo(1); - // test remove operation. - catalogStore.removeCatalog("cat1", false); - catalogStore.close(); + catalogStore.removeCatalog(DUMMY, false); + assertThat(catalogStore.listCatalogs().size()).isEqualTo(0); + assertThat(catalogStore.contains(DUMMY)).isFalse(); - catalogStore = new FileCatalogStore(tempFolder.getAbsolutePath(), "utf-8"); - catalogStore.open(); - assertFalse(catalogStore.listCatalogs().contains("cat1")); - assertTrue(catalogStore.listCatalogs().contains("cat2")); + File catalog = getCatalogFile(); + assertThat(catalog.exists()).isFalse(); } @Test - void testInvalidCases(@TempDir File tempFolder) { - // test catalog store the catalog already exists. - final CatalogStore catalogStore = - new FileCatalogStore(tempFolder.getAbsolutePath(), "utf-8"); + void testRemoveNonExisting() { + CatalogStore catalogStore = initCatalogStore(true); catalogStore.open(); - Configuration catalogConfiguration = new Configuration(); - catalogConfiguration.setString("type", "generic_in_memory"); + catalogStore.removeCatalog(DUMMY, true); - // store catalog to catalog store - catalogStore.storeCatalog("cat1", CatalogDescriptor.of("cat1", catalogConfiguration)); - assertThatThrownBy( - () -> - catalogStore.storeCatalog( - "cat1", CatalogDescriptor.of("cat1", catalogConfiguration))) + File catalog = getCatalogFile(); + assertThatThrownBy(() -> catalogStore.removeCatalog(DUMMY, false)) .isInstanceOf(CatalogException.class) - .hasMessageContaining("Failed to save catalog cat1's configuration to file"); + .hasMessageContaining( + "Catalog " + DUMMY + "'s store file " + catalog + " does not exist."); + } - // get a no exist catalog - assertThat(catalogStore.getCatalog("cat3")).isEmpty(); + @Test + void testClose() { + CatalogStore catalogStore = initCatalogStore(true); + catalogStore.open(); - // remove a no exist catalog - assertThatThrownBy(() -> catalogStore.removeCatalog("cat3", false)) - .isInstanceOf(CatalogException.class) - .hasMessageContaining("Failed to delete catalog cat3's store file"); + catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG); + assertThat(catalogStore.listCatalogs().size()).isEqualTo(1); catalogStore.close(); - // test unsupported file schema - CatalogStore catalogStore2 = - new FileCatalogStore("hdfs://namenode:14565/some/path/to/a/file", "utf-8"); - catalogStore2.open(); - assertThatThrownBy(() -> catalogStore2.listCatalogs()) - .isInstanceOf(CatalogException.class) - .hasMessageContaining("File catalog store only support local directory"); + assertCatalogStoreNotOpened(catalogStore::listCatalogs); + assertCatalogStoreNotOpened(() -> catalogStore.contains(DUMMY)); + assertCatalogStoreNotOpened(() -> catalogStore.getCatalog(DUMMY)); + assertCatalogStoreNotOpened(() -> catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG)); + assertCatalogStoreNotOpened(() -> catalogStore.removeCatalog(DUMMY, true)); + } + + private void assertCatalogStoreNotOpened( + ThrowableAssert.ThrowingCallable shouldRaiseThrowable) { + assertThatThrownBy(shouldRaiseThrowable) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("CatalogStore is not opened yet."); + } + + private CatalogStore initCatalogStore(boolean createDir) { + Path catalogStorePath = tempDir.resolve(CATALOG_STORE_DIR_NAME); + if (createDir) { + catalogStorePath.toFile().mkdir(); + } + + return new FileCatalogStore(catalogStorePath.toString()); + } + + private File getCatalogFile() { + return tempDir.resolve(CATALOG_STORE_DIR_NAME) + .resolve(DUMMY + FileCatalogStore.FILE_EXTENSION) + .toFile(); } }