This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5628c709787 [FLINK-32653][docs] Add doc for catalog store (#23110)
5628c709787 is described below

commit 5628c7097875be4bd56fc7805dbdd727d92bdac7
Author: Feng Jin <jinfeng1...@gmail.com>
AuthorDate: Fri Aug 18 10:34:20 2023 +0800

    [FLINK-32653][docs] Add doc for catalog store (#23110)
    
    * [FLINK-32653][docs] Add doc for catalog store
---
 docs/content.zh/docs/dev/table/catalogs.md | 183 ++++++++++++++++++++++++++++
 docs/content/docs/dev/table/catalogs.md    | 188 +++++++++++++++++++++++++++++
 2 files changed, 371 insertions(+)

diff --git a/docs/content.zh/docs/dev/table/catalogs.md 
b/docs/content.zh/docs/dev/table/catalogs.md
index 8efee5d0e83..72928e0c374 100644
--- a/docs/content.zh/docs/dev/table/catalogs.md
+++ b/docs/content.zh/docs/dev/table/catalogs.md
@@ -803,3 +803,186 @@ the gateway, or you can also use `SET` to specify the 
listener for ddl, for exam
 Flink SQL> SET 'table.catalog-modification.listeners' = 'your_factory';
 Flink SQL> CREATE TABLE test_table(...);
 ```
+
+## Catalog Store
+
+Catalog Store 用于保存 Catalog 的配置信息, 配置 Catalog Store 之后,在 session 中创建的 catalog 
信息会持久化至
+Catalog Store 对应的外部系统中,即使 session 重建, 之前创建的 Catalog 依旧可以从 Catalog Store 中重新获取。
+
+### Catalog Store 的配置
+用户可以以不同的方式配置 Catalog Store,一种是使用Table API,另一种是使用 YAML 配置。
+
+在 Table API 中使用 Catalog Store 实例来注册 Catalog Store 。
+```java
+// Initialize a catalog Store
+CatalogStore catalogStore = new 
FileCatalogStore("file://path/to/catalog/store/");
+
+// set up the catalog store
+final EnvironmentSettings settings =
+        EnvironmentSettings.newInstance().inBatchMode()
+        .withCatalogStore(catalogStore)
+        .build();
+
+final TableEnvironment tableEnv = TableEnvironment.create(settings);
+```
+
+在 Table API 中使用 configuration 注册 Catalog Store 。
+```java
+// set up configuration
+Configuration configuration = new Configuration();
+configuration.set("table.catalog-store.kind", "file");
+configuration.set("table.catalog-store.file.path", 
"file://path/to/catalog/store/");
+// set up the configuration.
+final EnvironmentSettings settings =
+        EnvironmentSettings.newInstance().inBatchMode()
+        .withConfiguration(configuration)
+        .build();
+
+final TableEnvironment tableEnv = TableEnvironment.create(settings);
+```
+
+在 SQL Gateway 中,推荐在 `flink-conf.yaml` 文件中进行配置,所有的 session 可以自动使用已经创建好的 Catalog 
。
+配置的格式如下,一般情况下需要配置 Catalog Store 的 kind ,以及 Catalog Store 需要的其他参数配置。
+```yaml
+table.catalog-store.kind: file
+table.catalog-store.file.path: /path/to/catalog/store/
+```
+
+### Catalog Store 类型
+Flink 框架内置了两种 Catalog Store,分别是 GenericInMemoryCatalogStore 和 
FileCatalogStore。用户也可以自定义 Catalog Store 。
+
+#### GenericInMemoryCatalogStore
+GenericInMemoryCatalogStore 是基于内存实现的 Catalog Store,所有的 Catalog 配置只在 session 
的生命周期内可用,
+session 重建之后 store 中保存的 Catalog 配置也会自动清理。
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">参数</th>
+        <th class="text-center" style="width: 45%">描述</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>kind</h5></td>
+      <td>指定要使用的 Catalog Store 类型,此处应为 'generic_in_memory'</td>
+    </tr>
+    </tbody>
+</table>
+
+#### FileCatalogStore
+FileCatalogStore 可以将用户的 Catalog 配置信息保存至文件中,使用 FileCatalogStore 需要指定 Catalog 
配置需要
+保存的目录,不同的 Catalog 会对应不同的文件并和 Catalog Name 一一对应。
+
+这是一个示例目录结构,用于表示使用 `FileCatalogStore` 保存 `catalog` 配置的情况:
+```shell
+- /path/to/save/the/catalog/
+  - catalog1.yaml
+  - catalog2.yaml
+  - catalog3.yaml
+```
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">参数</th>
+        <th class="text-center" style="width: 45%">描述</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>kind</h5></td>
+      <td>指定要使用的 Catalog Store 类型,此处应为 'file'</td>
+    </tr>
+    <tr>
+      <td><h5>path</h5></td>
+      <td>指定要使用的 Catalog Store 保存的路径,必须是一个合法的目录,当前只支持本地目录</td>
+    </tr>
+    </tbody>
+</table>
+
+#### 用户自定义 Catalog Store
+Catalog Store 是可拓展的, 用户可以通过实现 Catalog Store 的接口来自定义 Catalog Store。如果需要 SQL CLI 
或者 SQL Gateway 中使用
+Catalog Store,还需要这个 Catalog Store 实现对应的 CatalogStoreFactory 接口。
+
+```java
+public class CustomCatalogStoreFactory implements CatalogStoreFactory {
+
+    public static final String IDENTIFIER = "custom-kind";
+    
+    // Used to connect external storage systems
+    private CustomClient client;
+    
+    @Override
+    public CatalogStore createCatalogStore() {
+        return new CustomCatalogStore();
+    }
+
+    @Override
+    public void open(Context context) throws CatalogException {
+        // initialize the resources, such as http client
+        client = initClient(context);
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        // release the resources
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        // table store kind identifier
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        // define the required options
+        Set<ConfigOption> options = new HashSet();
+        options.add(OPTION_1);
+        options.add(OPTION_2);
+        
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        // define the optional options
+    }
+}
+
+public class CustomCatalogStore extends AbstractCatalogStore {
+
+    private Client client;
+    
+    public CustomCatalogStore(Client client) {
+        this.client = client;
+    }
+
+    @Override
+    public void storeCatalog(String catalogName, CatalogDescriptor catalog)
+            throws CatalogException {
+        // store the catalog
+    }
+
+    @Override
+    public void removeCatalog(String catalogName, boolean ignoreIfNotExists)
+            throws CatalogException {
+        // remove the catalog descriptor
+    }
+
+    @Override
+    public Optional<CatalogDescriptor> getCatalog(String catalogName) {
+        // retrieve the catalog configuration and build the catalog descriptor
+    }
+
+    @Override
+    public Set<String> listCatalogs() {
+        // list all catalogs
+    }
+
+    @Override
+    public boolean contains(String catalogName) {
+    }
+}
+```
diff --git a/docs/content/docs/dev/table/catalogs.md 
b/docs/content/docs/dev/table/catalogs.md
index 9af98e2c9e6..5ec357bbfc2 100644
--- a/docs/content/docs/dev/table/catalogs.md
+++ b/docs/content/docs/dev/table/catalogs.md
@@ -807,3 +807,191 @@ the gateway, or you can also use `SET` to specify the 
listener for ddl, for exam
 Flink SQL> SET 'table.catalog-modification.listeners' = 'your_factory';
 Flink SQL> CREATE TABLE test_table(...);
 ```
+
+## Catalog Store
+
+Catalog Store is used to store the configuration of catalogs. When using 
Catalog Store, the configurations
+of catalogs created in the session will be persisted in the corresponding 
external system of Catalog Store.
+Even if the session is reconstructed, previously created catalogs can still be 
retrieved from Catalog Store.
+
+### Configure Catalog Store
+Users can configure the Catalog Store in different ways, one is to use the 
Table API, and another is to use YAML configuration.
+
+Register a catalog store using catalog store instance.
+
+```java
+// Initialize a catalog Store instance
+CatalogStore catalogStore = new 
FileCatalogStore("file://path/to/catalog/store/");
+
+// set up the catalog store
+final EnvironmentSettings settings =
+        EnvironmentSettings.newInstance().inBatchMode()
+        .withCatalogStore(catalogStore)
+        .build();
+```
+
+Register a catalog store using configuration.
+
+```java 
+// Set up configuration
+Configuration configuration = new Configuration();
+configuration.set("table.catalog-store.kind", "file");
+configuration.set("table.catalog-store.file.path", 
"file://path/to/catalog/store/");
+// set up the configuration.
+final EnvironmentSettings settings =
+        EnvironmentSettings.newInstance().inBatchMode()
+        .withConfiguration(configuration)
+        .build();
+
+final TableEnvironment tableEnv = TableEnvironment.create(settings);
+```
+
+In SQL Gateway, it is recommended to configure the settings in a yaml file so 
that all sessions can automatically
+use the pre-created Catalog. Usually, you need to configure the kind of 
Catalog Store and other
+required parameters for the Catalog Store.
+```yaml
+table.catalog-store.kind: file
+table.catalog-store.file.path: /path/to/catalog/store/
+```
+
+### Catalog Store Type
+Flink has two built-in Catalog Stores, namely GenericInMemoryCatalogStore and 
FileCatalogStore.
+Users can also customize their own Catalog Store.
+
+#### GenericInMemoryCatalogStore
+GenericInMemoryCatalogStore is an implementation of CatalogStore that saves 
configuration information in memory.
+All catalog configurations are only available within the session's lifecycle, 
and the stored catalog configurations will be
+automatically cleared after session reconstruction.
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 45%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>kind</h5></td>
+      <td>Specify the Catalog Store type to be used, which should be 
'generic_in_memory'</td>
+    </tr>
+    </tbody>
+</table>
+
+#### FileCatalogStore
+FileCatalogStore can save the user's Catalog configuration to a file. To use 
FileCatalogStore, you need to specify the directory where the Catalog 
configuration
+needs to be saved. Different Catalogs will correspond to different files and 
each file will correspond to a Catalog Name.
+
+Here's an example directory structure representing the storage of Catalog 
configurations using FileCatalogStore:
+
+```shell
+- /path/to/save/the/catalog/
+  - catalog1.yaml
+  - catalog2.yaml
+  - catalog3.yaml
+```
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 45%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>kind</h5></td>
+      <td>Specify the Catalog Store type to be used, which should be 
'file'</td>
+    </tr>
+    <tr>
+      <td><h5>path</h5></td>
+      <td>Specify the path to be used for saving in the Catalog Store, it must 
be a valid directory and currently only supports local directories.</td>
+    </tr>
+    </tbody>
+</table>
+
+#### Custom Catalog Store
+Catalog Store is extensible, and users can customize Catalog Store by 
implementing its interface.
+If SQL CLI or SQL Gateway needs to use Catalog Store, the corresponding 
CatalogStoreFactory interface
+also needs to be implemented for this Catalog Store.
+
+```java
+public class CustomCatalogStoreFactory implements CatalogStoreFactory {
+
+    public static final String IDENTIFIER = "custom-kind";
+    
+    // Used to connect external storage systems
+    private CustomClient client;
+    
+    @Override
+    public CatalogStore createCatalogStore() {
+        return new CustomCatalogStore();
+    }
+
+    @Override
+    public void open(Context context) throws CatalogException {
+        // initialize the resources, such as http client
+        client = initClient(context);
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        // release the resources
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        // table store kind identifier
+        return IDENTIFIER;
+    }
+    
+    public Set<ConfigOption<?>> requiredOptions() {
+        // define the required options
+        Set<ConfigOption> options = new HashSet();
+        options.add(OPTION_1);
+        options.add(OPTION_2);
+        
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        // define the optional options
+    }
+}
+
+public class CustomCatalogStore extends AbstractCatalogStore {
+
+    private Client client;
+    
+    public CustomCatalogStore(Client client) {
+        this.client = client;
+    }
+
+    @Override
+    public void storeCatalog(String catalogName, CatalogDescriptor catalog)
+            throws CatalogException {
+        // store the catalog
+    }
+
+    @Override
+    public void removeCatalog(String catalogName, boolean ignoreIfNotExists)
+            throws CatalogException {
+        // remove the catalog descriptor
+    }
+
+    @Override
+    public Optional<CatalogDescriptor> getCatalog(String catalogName) {
+        // retrieve the catalog configuration and build the catalog descriptor
+    }
+
+    @Override
+    public Set<String> listCatalogs() {
+        // list all catalogs
+    }
+
+    @Override
+    public boolean contains(String catalogName) {
+    }
+}
+```

Reply via email to