This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a740f0c  [FLINK-13176][SQL CLI] remember current catalog and database 
in SQL CLI SessionContext
a740f0c is described below

commit a740f0c2799ba48e19acb185c1138115fd63aa65
Author: bowen.li <bowenl...@gmail.com>
AuthorDate: Wed Jul 10 16:44:30 2019 -0700

    [FLINK-13176][SQL CLI] remember current catalog and database in SQL CLI 
SessionContext
    
    This PR supports remembering current catalog and database that users set in 
SQL CLI SessionContext.
    
    This closes #9049.
---
 .../client/config/entries/ExecutionEntry.java      |  4 +-
 .../flink/table/client/gateway/SessionContext.java | 25 ++++++++++++
 .../client/gateway/local/ExecutionContext.java     | 17 ++++----
 .../table/client/gateway/local/LocalExecutor.java  | 19 +++++++--
 .../table/client/gateway/local/DependencyTest.java | 22 ++++++++++-
 .../client/gateway/local/LocalExecutorITCase.java  | 46 ++++++++++++++++++++++
 6 files changed, 119 insertions(+), 14 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
index a1d47a0..80d3efb 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
@@ -97,9 +97,9 @@ public class ExecutionEntry extends ConfigEntry {
 
        private static final String 
EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL = 
"restart-strategy.max-failures-per-interval";
 
-       private static final String EXECUTION_CURRNET_CATALOG = 
"current-catalog";
+       public static final String EXECUTION_CURRNET_CATALOG = 
"current-catalog";
 
-       private static final String EXECUTION_CURRNET_DATABASE = 
"current-database";
+       public static final String EXECUTION_CURRNET_DATABASE = 
"current-database";
 
        private ExecutionEntry(DescriptorProperties properties) {
                super(properties);
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
index af17941..d2a7da2 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
@@ -19,13 +19,18 @@
 package org.apache.flink.table.client.gateway;
 
 import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.entries.ExecutionEntry;
 import org.apache.flink.table.client.config.entries.ViewEntry;
+import org.apache.flink.util.StringUtils;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Context describing a session.
@@ -73,6 +78,26 @@ public class SessionContext {
                return name;
        }
 
+       public Optional<String> getCurrentCatalog() {
+               return 
Optional.ofNullable(sessionProperties.get(ExecutionEntry.EXECUTION_CURRNET_CATALOG));
+       }
+
+       public void setCurrentCatalog(String currentCatalog) {
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(currentCatalog));
+
+               sessionProperties.put(ExecutionEntry.EXECUTION_CURRNET_CATALOG, 
currentCatalog);
+       }
+
+       public Optional<String> getCurrentDatabase() {
+               return 
Optional.ofNullable(sessionProperties.get(ExecutionEntry.EXECUTION_CURRNET_DATABASE));
+       }
+
+       public void setCurrentDatabase(String currentDatabase) {
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(currentDatabase));
+
+               
sessionProperties.put(ExecutionEntry.EXECUTION_CURRNET_DATABASE, 
currentDatabase);
+       }
+
        public Environment getEnvironment() {
                return Environment.enrich(
                        defaultEnvironment,
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 344505f..0df7fba 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -78,7 +78,6 @@ import java.net.URL;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.function.Supplier;
 
 /**
@@ -303,14 +302,18 @@ public class ExecutionContext<T> {
                        // register catalogs
                        catalogs.forEach(tableEnv::registerCatalog);
 
-                       Optional<String> potentialCurrentCatalog = 
mergedEnv.getExecution().getCurrentCatalog();
-                       if (potentialCurrentCatalog.isPresent()) {
-                               
tableEnv.useCatalog(potentialCurrentCatalog.get());
+                       // set current catalog
+                       if (sessionContext.getCurrentCatalog().isPresent()) {
+                               
tableEnv.useCatalog(sessionContext.getCurrentCatalog().get());
+                       } else if 
(mergedEnv.getExecution().getCurrentCatalog().isPresent()) {
+                               
tableEnv.useCatalog(mergedEnv.getExecution().getCurrentCatalog().get());
                        }
 
-                       Optional<String> potentialCurrentDatabase = 
mergedEnv.getExecution().getCurrentDatabase();
-                       if (potentialCurrentDatabase.isPresent()) {
-                               
tableEnv.useDatabase(potentialCurrentDatabase.get());
+                       // set current database
+                       if (sessionContext.getCurrentDatabase().isPresent()) {
+                               
tableEnv.useDatabase(sessionContext.getCurrentDatabase().get());
+                       } else if 
(mergedEnv.getExecution().getCurrentDatabase().isPresent()) {
+                               
tableEnv.useDatabase(mergedEnv.getExecution().getCurrentDatabase().get());
                        }
 
                        // create query config
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index af128ef..7e41f1f 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -192,18 +192,24 @@ public class LocalExecutor implements Executor {
 
        @Override
        public List<String> listCatalogs(SessionContext session) throws 
SqlExecutionException {
-               final TableEnvironment tableEnv = 
getOrCreateExecutionContext(session)
+               final ExecutionContext<?> context = 
getOrCreateExecutionContext(session);
+
+               final TableEnvironment tableEnv = context
                        .createEnvironmentInstance()
                        .getTableEnvironment();
-               return Arrays.asList(tableEnv.listCatalogs());
+
+               return context.wrapClassLoader(() -> 
Arrays.asList(tableEnv.listCatalogs()));
        }
 
        @Override
        public List<String> listDatabases(SessionContext session) throws 
SqlExecutionException {
-               final TableEnvironment tableEnv = 
getOrCreateExecutionContext(session)
+               final ExecutionContext<?> context = 
getOrCreateExecutionContext(session);
+
+               final TableEnvironment tableEnv = context
                        .createEnvironmentInstance()
                        .getTableEnvironment();
-               return Arrays.asList(tableEnv.listDatabases());
+
+               return context.wrapClassLoader(() -> 
Arrays.asList(tableEnv.listDatabases()));
        }
 
        @Override
@@ -232,7 +238,10 @@ public class LocalExecutor implements Executor {
                        .getTableEnvironment();
 
                context.wrapClassLoader(() -> {
+                       // Rely on TableEnvironment/CatalogManager to validate 
input
                        tableEnv.useCatalog(catalogName);
+                       session.setCurrentCatalog(catalogName);
+                       
session.setCurrentDatabase(tableEnv.getCurrentDatabase());
                        return null;
                });
        }
@@ -245,7 +254,9 @@ public class LocalExecutor implements Executor {
                        .getTableEnvironment();
 
                context.wrapClassLoader(() -> {
+                       // Rely on TableEnvironment/CatalogManager to validate 
input
                        tableEnv.useDatabase(databaseName);
+                       session.setCurrentDatabase(databaseName);
                        return null;
                });
        }
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index 286d76a..5fabe83 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -20,13 +20,19 @@ package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.config.CatalogConfig;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
@@ -165,6 +171,7 @@ public class DependencyTest {
         */
        public static class TestHiveCatalogFactory extends HiveCatalogFactory {
                public static final String ADDITIONAL_TEST_DATABASE = 
"additional_test_database";
+               public static final String TEST_TABLE = "test_table";
 
                @Override
                public Map<String, String> requiredContext() {
@@ -193,7 +200,20 @@ public class DependencyTest {
                                        ADDITIONAL_TEST_DATABASE,
                                        new CatalogDatabaseImpl(new 
HashMap<>(), null),
                                        false);
-                       } catch (DatabaseAlreadyExistException e) {
+                               hiveCatalog.createTable(
+                                       new 
ObjectPath(ADDITIONAL_TEST_DATABASE, TEST_TABLE),
+                                       new CatalogTableImpl(
+                                               TableSchema.builder()
+                                                       .field("testcol", 
DataTypes.INT())
+                                                       .build(),
+                                               new HashMap<String, String>() {{
+                                                       
put(CatalogConfig.IS_GENERIC, String.valueOf(true));
+                                               }},
+                                               ""
+                                       ),
+                                       false
+                               );
+                       } catch (DatabaseAlreadyExistException | 
TableAlreadyExistException | DatabaseNotExistException e) {
                                throw new CatalogException(e);
                        }
 
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 023d656..ac1a7ae 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.config.entries.ViewEntry;
 import org.apache.flink.table.client.gateway.Executor;
@@ -75,6 +76,8 @@ import static org.junit.Assert.fail;
 public class LocalExecutorITCase extends TestLogger {
 
        private static final String DEFAULTS_ENVIRONMENT_FILE = 
"test-sql-client-defaults.yaml";
+       private static final String CATALOGS_ENVIRONMENT_FILE = 
"test-sql-client-catalogs.yaml";
+
        private static final int NUM_TMS = 2;
        private static final int NUM_SLOTS_PER_TM = 2;
 
@@ -426,6 +429,40 @@ public class LocalExecutorITCase extends TestLogger {
                }
        }
 
+       @Test
+       public void testUseCatalogAndUseDatabase() throws Exception {
+               final String csvOutputPath = new 
File(tempFolder.newFolder().getAbsolutePath(), 
"test-out.csv").toURI().toString();
+               final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
+               Objects.requireNonNull(url);
+               final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+               replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
+               replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath);
+               replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+               replaceVars.put("$VAR_MAX_ROWS", "100");
+
+               final Executor executor = 
createModifiedExecutor(CATALOGS_ENVIRONMENT_FILE, clusterClient, replaceVars);
+               final SessionContext session = new 
SessionContext("test-session", new Environment());
+
+               try {
+                       assertEquals(Arrays.asList("mydatabase"), 
executor.listDatabases(session));
+
+                       executor.useCatalog(session, "hivecatalog");
+
+                       assertEquals(
+                               
Arrays.asList(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE, 
HiveCatalog.DEFAULT_DB),
+                               executor.listDatabases(session));
+
+                       assertEquals(Collections.emptyList(), 
executor.listTables(session));
+
+                       executor.useDatabase(session, 
DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE);
+
+                       
assertEquals(Arrays.asList(DependencyTest.TestHiveCatalogFactory.TEST_TABLE), 
executor.listTables(session));
+               } finally {
+                       executor.stop(session);
+               }
+       }
+
        private void executeStreamQueryTable(
                        Map<String, String> replaceVars,
                        String query,
@@ -481,6 +518,15 @@ public class LocalExecutorITCase extends TestLogger {
                        new DummyCustomCommandLine<T>(clusterClient));
        }
 
+       private <T> LocalExecutor createModifiedExecutor(
+                       String yamlFile, ClusterClient<T> clusterClient, 
Map<String, String> replaceVars) throws Exception {
+               return new LocalExecutor(
+                       EnvironmentFileUtil.parseModified(yamlFile, 
replaceVars),
+                       Collections.emptyList(),
+                       clusterClient.getFlinkConfiguration(),
+                       new DummyCustomCommandLine<T>(clusterClient));
+       }
+
        private List<String> retrieveTableResult(
                        Executor executor,
                        SessionContext session,

Reply via email to