This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a740f0c [FLINK-13176][SQL CLI] remember current catalog and database in SQL CLI SessionContext a740f0c is described below commit a740f0c2799ba48e19acb185c1138115fd63aa65 Author: bowen.li <bowenl...@gmail.com> AuthorDate: Wed Jul 10 16:44:30 2019 -0700 [FLINK-13176][SQL CLI] remember current catalog and database in SQL CLI SessionContext This PR supports remembering current catalog and database that users set in SQL CLI SessionContext. This closes #9049. --- .../client/config/entries/ExecutionEntry.java | 4 +- .../flink/table/client/gateway/SessionContext.java | 25 ++++++++++++ .../client/gateway/local/ExecutionContext.java | 17 ++++---- .../table/client/gateway/local/LocalExecutor.java | 19 +++++++-- .../table/client/gateway/local/DependencyTest.java | 22 ++++++++++- .../client/gateway/local/LocalExecutorITCase.java | 46 ++++++++++++++++++++++ 6 files changed, 119 insertions(+), 14 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java index a1d47a0..80d3efb 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java @@ -97,9 +97,9 @@ public class ExecutionEntry extends ConfigEntry { private static final String EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL = "restart-strategy.max-failures-per-interval"; - private static final String EXECUTION_CURRNET_CATALOG = "current-catalog"; + public static final String EXECUTION_CURRNET_CATALOG = "current-catalog"; - private static final String EXECUTION_CURRNET_DATABASE = "current-database"; + public static final String EXECUTION_CURRNET_DATABASE = "current-database"; private ExecutionEntry(DescriptorProperties properties) { super(properties); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java index af17941..d2a7da2 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java @@ -19,13 +19,18 @@ package org.apache.flink.table.client.gateway; import org.apache.flink.table.client.config.Environment; +import org.apache.flink.table.client.config.entries.ExecutionEntry; import org.apache.flink.table.client.config.entries.ViewEntry; +import org.apache.flink.util.StringUtils; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; /** * Context describing a session. @@ -73,6 +78,26 @@ public class SessionContext { return name; } + public Optional<String> getCurrentCatalog() { + return Optional.ofNullable(sessionProperties.get(ExecutionEntry.EXECUTION_CURRNET_CATALOG)); + } + + public void setCurrentCatalog(String currentCatalog) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(currentCatalog)); + + sessionProperties.put(ExecutionEntry.EXECUTION_CURRNET_CATALOG, currentCatalog); + } + + public Optional<String> getCurrentDatabase() { + return Optional.ofNullable(sessionProperties.get(ExecutionEntry.EXECUTION_CURRNET_DATABASE)); + } + + public void setCurrentDatabase(String currentDatabase) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(currentDatabase)); + + sessionProperties.put(ExecutionEntry.EXECUTION_CURRNET_DATABASE, currentDatabase); + } + public Environment getEnvironment() { return Environment.enrich( defaultEnvironment, diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index 344505f..0df7fba 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -78,7 +78,6 @@ import java.net.URL; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.function.Supplier; /** @@ -303,14 +302,18 @@ public class ExecutionContext<T> { // register catalogs catalogs.forEach(tableEnv::registerCatalog); - Optional<String> potentialCurrentCatalog = mergedEnv.getExecution().getCurrentCatalog(); - if (potentialCurrentCatalog.isPresent()) { - tableEnv.useCatalog(potentialCurrentCatalog.get()); + // set current catalog + if (sessionContext.getCurrentCatalog().isPresent()) { + tableEnv.useCatalog(sessionContext.getCurrentCatalog().get()); + } else if (mergedEnv.getExecution().getCurrentCatalog().isPresent()) { + tableEnv.useCatalog(mergedEnv.getExecution().getCurrentCatalog().get()); } - Optional<String> potentialCurrentDatabase = mergedEnv.getExecution().getCurrentDatabase(); - if (potentialCurrentDatabase.isPresent()) { - tableEnv.useDatabase(potentialCurrentDatabase.get()); + // set current database + if (sessionContext.getCurrentDatabase().isPresent()) { + tableEnv.useDatabase(sessionContext.getCurrentDatabase().get()); + } else if (mergedEnv.getExecution().getCurrentDatabase().isPresent()) { + tableEnv.useDatabase(mergedEnv.getExecution().getCurrentDatabase().get()); } // create query config diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index af128ef..7e41f1f 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -192,18 +192,24 @@ public class LocalExecutor implements Executor { @Override public List<String> listCatalogs(SessionContext session) throws SqlExecutionException { - final TableEnvironment tableEnv = getOrCreateExecutionContext(session) + final ExecutionContext<?> context = getOrCreateExecutionContext(session); + + final TableEnvironment tableEnv = context .createEnvironmentInstance() .getTableEnvironment(); - return Arrays.asList(tableEnv.listCatalogs()); + + return context.wrapClassLoader(() -> Arrays.asList(tableEnv.listCatalogs())); } @Override public List<String> listDatabases(SessionContext session) throws SqlExecutionException { - final TableEnvironment tableEnv = getOrCreateExecutionContext(session) + final ExecutionContext<?> context = getOrCreateExecutionContext(session); + + final TableEnvironment tableEnv = context .createEnvironmentInstance() .getTableEnvironment(); - return Arrays.asList(tableEnv.listDatabases()); + + return context.wrapClassLoader(() -> Arrays.asList(tableEnv.listDatabases())); } @Override @@ -232,7 +238,10 @@ public class LocalExecutor implements Executor { .getTableEnvironment(); context.wrapClassLoader(() -> { + // Rely on TableEnvironment/CatalogManager to validate input tableEnv.useCatalog(catalogName); + session.setCurrentCatalog(catalogName); + session.setCurrentDatabase(tableEnv.getCurrentDatabase()); return null; }); } @@ -245,7 +254,9 @@ public class LocalExecutor implements Executor { .getTableEnvironment(); context.wrapClassLoader(() -> { + // Rely on TableEnvironment/CatalogManager to validate input tableEnv.useDatabase(databaseName); + session.setCurrentDatabase(databaseName); return null; }); } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java index 286d76a..5fabe83 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java @@ -20,13 +20,19 @@ package org.apache.flink.table.client.gateway.local; import org.apache.flink.client.cli.DefaultCLI; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.config.CatalogConfig; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; @@ -165,6 +171,7 @@ public class DependencyTest { */ public static class TestHiveCatalogFactory extends HiveCatalogFactory { public static final String ADDITIONAL_TEST_DATABASE = "additional_test_database"; + public static final String TEST_TABLE = "test_table"; @Override public Map<String, String> requiredContext() { @@ -193,7 +200,20 @@ public class DependencyTest { ADDITIONAL_TEST_DATABASE, new CatalogDatabaseImpl(new HashMap<>(), null), false); - } catch (DatabaseAlreadyExistException e) { + hiveCatalog.createTable( + new ObjectPath(ADDITIONAL_TEST_DATABASE, TEST_TABLE), + new CatalogTableImpl( + TableSchema.builder() + .field("testcol", DataTypes.INT()) + .build(), + new HashMap<String, String>() {{ + put(CatalogConfig.IS_GENERIC, String.valueOf(true)); + }}, + "" + ), + false + ); + } catch (DatabaseAlreadyExistException | TableAlreadyExistException | DatabaseNotExistException e) { throw new CatalogException(e); } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index 023d656..ac1a7ae 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -32,6 +32,7 @@ import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.client.config.Environment; import org.apache.flink.table.client.config.entries.ViewEntry; import org.apache.flink.table.client.gateway.Executor; @@ -75,6 +76,8 @@ import static org.junit.Assert.fail; public class LocalExecutorITCase extends TestLogger { private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml"; + private static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-client-catalogs.yaml"; + private static final int NUM_TMS = 2; private static final int NUM_SLOTS_PER_TM = 2; @@ -426,6 +429,40 @@ public class LocalExecutorITCase extends TestLogger { } } + @Test + public void testUseCatalogAndUseDatabase() throws Exception { + final String csvOutputPath = new File(tempFolder.newFolder().getAbsolutePath(), "test-out.csv").toURI().toString(); + final URL url = getClass().getClassLoader().getResource("test-data.csv"); + Objects.requireNonNull(url); + final Map<String, String> replaceVars = new HashMap<>(); + replaceVars.put("$VAR_SOURCE_PATH1", url.getPath()); + replaceVars.put("$VAR_EXECUTION_TYPE", "streaming"); + replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath); + replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append"); + replaceVars.put("$VAR_MAX_ROWS", "100"); + + final Executor executor = createModifiedExecutor(CATALOGS_ENVIRONMENT_FILE, clusterClient, replaceVars); + final SessionContext session = new SessionContext("test-session", new Environment()); + + try { + assertEquals(Arrays.asList("mydatabase"), executor.listDatabases(session)); + + executor.useCatalog(session, "hivecatalog"); + + assertEquals( + Arrays.asList(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE, HiveCatalog.DEFAULT_DB), + executor.listDatabases(session)); + + assertEquals(Collections.emptyList(), executor.listTables(session)); + + executor.useDatabase(session, DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE); + + assertEquals(Arrays.asList(DependencyTest.TestHiveCatalogFactory.TEST_TABLE), executor.listTables(session)); + } finally { + executor.stop(session); + } + } + private void executeStreamQueryTable( Map<String, String> replaceVars, String query, @@ -481,6 +518,15 @@ public class LocalExecutorITCase extends TestLogger { new DummyCustomCommandLine<T>(clusterClient)); } + private <T> LocalExecutor createModifiedExecutor( + String yamlFile, ClusterClient<T> clusterClient, Map<String, String> replaceVars) throws Exception { + return new LocalExecutor( + EnvironmentFileUtil.parseModified(yamlFile, replaceVars), + Collections.emptyList(), + clusterClient.getFlinkConfiguration(), + new DummyCustomCommandLine<T>(clusterClient)); + } + private List<String> retrieveTableResult( Executor executor, SessionContext session,