This is an automated email from the ASF dual-hosted git repository. zjureel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 5628c709787 [FLINK-32653][docs] Add doc for catalog store (#23110) 5628c709787 is described below commit 5628c7097875be4bd56fc7805dbdd727d92bdac7 Author: Feng Jin <jinfeng1...@gmail.com> AuthorDate: Fri Aug 18 10:34:20 2023 +0800 [FLINK-32653][docs] Add doc for catalog store (#23110) * [FLINK-32653][docs] Add doc for catalog store --- docs/content.zh/docs/dev/table/catalogs.md | 183 ++++++++++++++++++++++++++++ docs/content/docs/dev/table/catalogs.md | 188 +++++++++++++++++++++++++++++ 2 files changed, 371 insertions(+) diff --git a/docs/content.zh/docs/dev/table/catalogs.md b/docs/content.zh/docs/dev/table/catalogs.md index 8efee5d0e83..72928e0c374 100644 --- a/docs/content.zh/docs/dev/table/catalogs.md +++ b/docs/content.zh/docs/dev/table/catalogs.md @@ -803,3 +803,186 @@ the gateway, or you can also use `SET` to specify the listener for ddl, for exam Flink SQL> SET 'table.catalog-modification.listeners' = 'your_factory'; Flink SQL> CREATE TABLE test_table(...); ``` + +## Catalog Store + +Catalog Store 用于保存 Catalog 的配置信息, 配置 Catalog Store 之后,在 session 中创建的 catalog 信息会持久化至 +Catalog Store 对应的外部系统中,即使 session 重建, 之前创建的 Catalog 依旧可以从 Catalog Store 中重新获取。 + +### Catalog Store 的配置 +用户可以以不同的方式配置 Catalog Store,一种是使用Table API,另一种是使用 YAML 配置。 + +在 Table API 中使用 Catalog Store 实例来注册 Catalog Store 。 +```java +// Initialize a catalog Store +CatalogStore catalogStore = new FileCatalogStore("file://path/to/catalog/store/"); + +// set up the catalog store +final EnvironmentSettings settings = + EnvironmentSettings.newInstance().inBatchMode() + .withCatalogStore(catalogStore) + .build(); + +final TableEnvironment tableEnv = TableEnvironment.create(settings); +``` + +在 Table API 中使用 configuration 注册 Catalog Store 。 +```java +// set up configuration +Configuration configuration = new Configuration(); +configuration.set("table.catalog-store.kind", "file"); +configuration.set("table.catalog-store.file.path", "file://path/to/catalog/store/"); +// set up the configuration. +final EnvironmentSettings settings = + EnvironmentSettings.newInstance().inBatchMode() + .withConfiguration(configuration) + .build(); + +final TableEnvironment tableEnv = TableEnvironment.create(settings); +``` + +在 SQL Gateway 中,推荐在 `flink-conf.yaml` 文件中进行配置,所有的 session 可以自动使用已经创建好的 Catalog 。 +配置的格式如下,一般情况下需要配置 Catalog Store 的 kind ,以及 Catalog Store 需要的其他参数配置。 +```yaml +table.catalog-store.kind: file +table.catalog-store.file.path: /path/to/catalog/store/ +``` + +### Catalog Store 类型 +Flink 框架内置了两种 Catalog Store,分别是 GenericInMemoryCatalogStore 和 FileCatalogStore。用户也可以自定义 Catalog Store 。 + +#### GenericInMemoryCatalogStore +GenericInMemoryCatalogStore 是基于内存实现的 Catalog Store,所有的 Catalog 配置只在 session 的生命周期内可用, +session 重建之后 store 中保存的 Catalog 配置也会自动清理。 + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">参数</th> + <th class="text-center" style="width: 45%">描述</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>kind</h5></td> + <td>指定要使用的 Catalog Store 类型,此处应为 'generic_in_memory'</td> + </tr> + </tbody> +</table> + +#### FileCatalogStore +FileCatalogStore 可以将用户的 Catalog 配置信息保存至文件中,使用 FileCatalogStore 需要指定 Catalog 配置需要 +保存的目录,不同的 Catalog 会对应不同的文件并和 Catalog Name 一一对应。 + +这是一个示例目录结构,用于表示使用 `FileCatalogStore` 保存 `catalog` 配置的情况: +```shell +- /path/to/save/the/catalog/ + - catalog1.yaml + - catalog2.yaml + - catalog3.yaml +``` + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">参数</th> + <th class="text-center" style="width: 45%">描述</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>kind</h5></td> + <td>指定要使用的 Catalog Store 类型,此处应为 'file'</td> + </tr> + <tr> + <td><h5>path</h5></td> + <td>指定要使用的 Catalog Store 保存的路径,必须是一个合法的目录,当前只支持本地目录</td> + </tr> + </tbody> +</table> + +#### 用户自定义 Catalog Store +Catalog Store 是可拓展的, 用户可以通过实现 Catalog Store 的接口来自定义 Catalog Store。如果需要 SQL CLI 或者 SQL Gateway 中使用 +Catalog Store,还需要这个 Catalog Store 实现对应的 CatalogStoreFactory 接口。 + +```java +public class CustomCatalogStoreFactory implements CatalogStoreFactory { + + public static final String IDENTIFIER = "custom-kind"; + + // Used to connect external storage systems + private CustomClient client; + + @Override + public CatalogStore createCatalogStore() { + return new CustomCatalogStore(); + } + + @Override + public void open(Context context) throws CatalogException { + // initialize the resources, such as http client + client = initClient(context); + } + + @Override + public void close() throws CatalogException { + // release the resources + } + + @Override + public String factoryIdentifier() { + // table store kind identifier + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + // define the required options + Set<ConfigOption> options = new HashSet(); + options.add(OPTION_1); + options.add(OPTION_2); + + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + // define the optional options + } +} + +public class CustomCatalogStore extends AbstractCatalogStore { + + private Client client; + + public CustomCatalogStore(Client client) { + this.client = client; + } + + @Override + public void storeCatalog(String catalogName, CatalogDescriptor catalog) + throws CatalogException { + // store the catalog + } + + @Override + public void removeCatalog(String catalogName, boolean ignoreIfNotExists) + throws CatalogException { + // remove the catalog descriptor + } + + @Override + public Optional<CatalogDescriptor> getCatalog(String catalogName) { + // retrieve the catalog configuration and build the catalog descriptor + } + + @Override + public Set<String> listCatalogs() { + // list all catalogs + } + + @Override + public boolean contains(String catalogName) { + } +} +``` diff --git a/docs/content/docs/dev/table/catalogs.md b/docs/content/docs/dev/table/catalogs.md index 9af98e2c9e6..5ec357bbfc2 100644 --- a/docs/content/docs/dev/table/catalogs.md +++ b/docs/content/docs/dev/table/catalogs.md @@ -807,3 +807,191 @@ the gateway, or you can also use `SET` to specify the listener for ddl, for exam Flink SQL> SET 'table.catalog-modification.listeners' = 'your_factory'; Flink SQL> CREATE TABLE test_table(...); ``` + +## Catalog Store + +Catalog Store is used to store the configuration of catalogs. When using Catalog Store, the configurations +of catalogs created in the session will be persisted in the corresponding external system of Catalog Store. +Even if the session is reconstructed, previously created catalogs can still be retrieved from Catalog Store. + +### Configure Catalog Store +Users can configure the Catalog Store in different ways, one is to use the Table API, and another is to use YAML configuration. + +Register a catalog store using catalog store instance. + +```java +// Initialize a catalog Store instance +CatalogStore catalogStore = new FileCatalogStore("file://path/to/catalog/store/"); + +// set up the catalog store +final EnvironmentSettings settings = + EnvironmentSettings.newInstance().inBatchMode() + .withCatalogStore(catalogStore) + .build(); +``` + +Register a catalog store using configuration. + +```java +// Set up configuration +Configuration configuration = new Configuration(); +configuration.set("table.catalog-store.kind", "file"); +configuration.set("table.catalog-store.file.path", "file://path/to/catalog/store/"); +// set up the configuration. +final EnvironmentSettings settings = + EnvironmentSettings.newInstance().inBatchMode() + .withConfiguration(configuration) + .build(); + +final TableEnvironment tableEnv = TableEnvironment.create(settings); +``` + +In SQL Gateway, it is recommended to configure the settings in a yaml file so that all sessions can automatically +use the pre-created Catalog. Usually, you need to configure the kind of Catalog Store and other +required parameters for the Catalog Store. +```yaml +table.catalog-store.kind: file +table.catalog-store.file.path: /path/to/catalog/store/ +``` + +### Catalog Store Type +Flink has two built-in Catalog Stores, namely GenericInMemoryCatalogStore and FileCatalogStore. +Users can also customize their own Catalog Store. + +#### GenericInMemoryCatalogStore +GenericInMemoryCatalogStore is an implementation of CatalogStore that saves configuration information in memory. +All catalog configurations are only available within the session's lifecycle, and the stored catalog configurations will be +automatically cleared after session reconstruction. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-center" style="width: 45%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>kind</h5></td> + <td>Specify the Catalog Store type to be used, which should be 'generic_in_memory'</td> + </tr> + </tbody> +</table> + +#### FileCatalogStore +FileCatalogStore can save the user's Catalog configuration to a file. To use FileCatalogStore, you need to specify the directory where the Catalog configuration +needs to be saved. Different Catalogs will correspond to different files and each file will correspond to a Catalog Name. + +Here's an example directory structure representing the storage of Catalog configurations using FileCatalogStore: + +```shell +- /path/to/save/the/catalog/ + - catalog1.yaml + - catalog2.yaml + - catalog3.yaml +``` + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-center" style="width: 45%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>kind</h5></td> + <td>Specify the Catalog Store type to be used, which should be 'file'</td> + </tr> + <tr> + <td><h5>path</h5></td> + <td>Specify the path to be used for saving in the Catalog Store, it must be a valid directory and currently only supports local directories.</td> + </tr> + </tbody> +</table> + +#### Custom Catalog Store +Catalog Store is extensible, and users can customize Catalog Store by implementing its interface. +If SQL CLI or SQL Gateway needs to use Catalog Store, the corresponding CatalogStoreFactory interface +also needs to be implemented for this Catalog Store. + +```java +public class CustomCatalogStoreFactory implements CatalogStoreFactory { + + public static final String IDENTIFIER = "custom-kind"; + + // Used to connect external storage systems + private CustomClient client; + + @Override + public CatalogStore createCatalogStore() { + return new CustomCatalogStore(); + } + + @Override + public void open(Context context) throws CatalogException { + // initialize the resources, such as http client + client = initClient(context); + } + + @Override + public void close() throws CatalogException { + // release the resources + } + + @Override + public String factoryIdentifier() { + // table store kind identifier + return IDENTIFIER; + } + + public Set<ConfigOption<?>> requiredOptions() { + // define the required options + Set<ConfigOption> options = new HashSet(); + options.add(OPTION_1); + options.add(OPTION_2); + + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + // define the optional options + } +} + +public class CustomCatalogStore extends AbstractCatalogStore { + + private Client client; + + public CustomCatalogStore(Client client) { + this.client = client; + } + + @Override + public void storeCatalog(String catalogName, CatalogDescriptor catalog) + throws CatalogException { + // store the catalog + } + + @Override + public void removeCatalog(String catalogName, boolean ignoreIfNotExists) + throws CatalogException { + // remove the catalog descriptor + } + + @Override + public Optional<CatalogDescriptor> getCatalog(String catalogName) { + // retrieve the catalog configuration and build the catalog descriptor + } + + @Override + public Set<String> listCatalogs() { + // list all catalogs + } + + @Override + public boolean contains(String catalogName) { + } +} +```