bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] 
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r280866583
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##########
 @@ -215,6 +218,74 @@ abstract class TableEnvironment(val config: TableConfig) {
     rels.map(_.asInstanceOf[ExecNode[_, _]])
   }
 
+  /**
+    * Register an [[ReadableCatalog]] under a unique name.
+    *
+    * @param name the name under which the catalog will be registered
+    * @param catalog the catalog to register
+    * @throws CatalogAlreadyExistsException thrown if the catalog already 
exists
+    */
+  @throws[CatalogAlreadyExistsException]
+  def registerCatalog(name: String, catalog: ReadableCatalog): Unit = {
+    catalogManager.registerCatalog(name, catalog)
+  }
+
+  /**
+    * Get a registered [[ReadableCatalog]].
+    *
+    * @param catalogName name of the catalog to get
+    * @return the requested catalog
+    * @throws CatalogNotExistException thrown if the catalog doesn't exist
+    */
+  @throws[CatalogNotExistException]
+  def getCatalog(catalogName: String): ReadableCatalog = {
+    catalogManager.getCatalog(catalogName)
+  }
+
+  /**
+    * Get the current catalog.
+    *
+    * @return the current catalog in CatalogManager
+    */
+  def getCurrentCatalog(): ReadableCatalog = {
+    catalogManager.getCurrentCatalog
+  }
+
+  /**
+    * Get the current database name.
+    *
+    * @return the current database of the current catalog
+    */
+  def getCurrentDatabaseName(): String = {
+    catalogManager.getCurrentCatalog.getCurrentDatabase
+  }
+
+  /**
+    * Set the current catalog.
+    *
+    * @param name name of the catalog to set as current catalog
+    * @throws CatalogNotExistException thrown if the catalog doesn't exist
+    */
+  @throws[CatalogNotExistException]
+  def setCurrentCatalog(name: String): Unit = {
+    catalogManager.setCurrentCatalog(name)
+  }
+
+  /**
+    * Set the current catalog and current database.
+    *
+    * @param catalogName name of the catalog to set as current catalog
+    * @param databaseName name of the database to set as current database
+    * @throws CatalogNotExistException  thrown if the catalog doesn't exist
+    * @throws DatabaseNotExistException thrown if the database doesn't exist
+    */
+  @throws[CatalogNotExistException]
+  @throws[DatabaseNotExistException]
+  def setCurrentDatabase(catalogName: String, databaseName: String): Unit = {
+    catalogManager.setCurrentCatalog(catalogName)
+    catalogManager.getCurrentCatalog.setCurrentDatabase(databaseName)
 
 Review comment:
   I agree the "current database" thing is now confusing because it actually 
presents two things: 1) the database of the session if its catalog is current; 
2) which will be the current db when users switch to its catalog, it will be 
the "default_db" in each catalog if users don't overwrite it, that's why 
CatalogManager needs to keep a map if we wanna extract it out. We separated 
these two concepts In Blink, and we probably do that too in Flink by replacing 
`get/setCurrentDatabase` with `String getDefaultDatabase()` (if that's way, we 
should do it in a different JIRA/PR).
   
   IMHO, this is not important at this moment, as it's only a small usability 
thing from users' perspective. I can remove parts related to 
`setCurrentDatabase` from this PR, and we can bake it for more time.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to