This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 6cb6dc1 [FLINK-13521][sql-client] Allow setting configurations in SQL CLI 6cb6dc1 is described below commit 6cb6dc1a7074406651b1d547eee45b14f29033c4 Author: TsReaper <tsreape...@gmail.com> AuthorDate: Fri Aug 2 11:15:23 2019 +0800 [FLINK-13521][sql-client] Allow setting configurations in SQL CLI This closes #9328. --- docs/dev/table/sqlClient.md | 19 +++-- .../flink-sql-client/conf/sql-client-defaults.yaml | 19 ++++- .../flink/table/client/config/Environment.java | 22 ++++++ .../client/config/entries/ConfigurationEntry.java | 81 ++++++++++++++++++++++ .../client/gateway/local/ExecutionContext.java | 4 ++ .../table/client/gateway/local/LocalExecutor.java | 1 + .../client/gateway/local/EnvironmentTest.java | 5 ++ .../client/gateway/local/ExecutionContextTest.java | 43 ++++++++++++ .../client/gateway/local/LocalExecutorITCase.java | 1 + ...ory.yaml => test-sql-client-configuration.yaml} | 41 +++-------- .../test/resources/test-sql-client-defaults.yaml | 3 + .../test/resources/test-sql-client-factory.yaml | 3 + .../table/api/config/ExecutionConfigOptions.java | 2 + .../table/api/config/OptimizerConfigOptions.java | 2 + 14 files changed, 206 insertions(+), 40 deletions(-) diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md index 8ec569d..059cd94 100644 --- a/docs/dev/table/sqlClient.md +++ b/docs/dev/table/sqlClient.md @@ -213,7 +213,7 @@ catalogs: hive-conf-dir: ... hive-version: 1.2.1 -# Execution properties allow for changing the behavior of a table program. +# Properties that change the fundamental execution behavior of a table program. execution: planner: old # optional: either 'old' (default) or 'blink' @@ -233,7 +233,16 @@ execution: restart-strategy: # optional: restart strategy type: fallback # "fallback" to global restart strategy by default -# Deployment properties allow for describing the cluster to which table programs are submitted to. +# Configuration options for adjusting and tuning table programs. + +# A full list of options and their default values can be found +# on the dedicated "Configuration" page. +configuration: + table.optimizer.join-reorder-enabled: true + table.exec.spill-compression.enabled: true + table.exec.spill-compression.block-size: 128kb + +# Properties that describe the cluster to which table programs are submitted to. deployment: response-timeout: 5000 @@ -245,9 +254,9 @@ This configuration: - defines a view `MyCustomView` that declares a virtual table using a SQL query, - defines a user-defined function `myUDF` that can be instantiated using the class name and two constructor parameters, - connects to two Hive catalogs and uses `catalog_1` as the current catalog with `mydb1` as the current database of the catalog, -- specifies a parallelism of 1 for queries executed in this streaming environment, -- specifies an event-time characteristic, and -- runs queries in the `table` result mode. +- uses the old planner in streaming mode for running statements with event-time characteristic and a parallelism of 1, +- runs exploratory queries in the `table` result mode, +- and makes some planner adjustments around join reordering and spilling via configuration options. Depending on the use case, a configuration can be split into multiple files. Therefore, environment files can be created for general purposes (*defaults environment file* using `--defaults`) as well as on a per-session basis (*session environment file* using `--environment`). Every CLI session is initialized with the default properties followed by the session properties. For example, the defaults environment file could specify all table sources that should be available for querying in ev [...] diff --git a/flink-table/flink-sql-client/conf/sql-client-defaults.yaml b/flink-table/flink-sql-client/conf/sql-client-defaults.yaml index db6a0ea..f6e8340 100644 --- a/flink-table/flink-sql-client/conf/sql-client-defaults.yaml +++ b/flink-table/flink-sql-client/conf/sql-client-defaults.yaml @@ -83,7 +83,7 @@ catalogs: [] # empty list # Execution properties #============================================================================== -# Execution properties allow for changing the behavior of a table program. +# Properties that change the fundamental execution behavior of a table program. execution: # select the implementation responsible for planning table programs @@ -117,13 +117,26 @@ execution: # possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default) type: fallback +#============================================================================== +# Configuration options +#============================================================================== + +# Configuration options for adjusting and tuning table programs. + +# A full list of options and their default values can be found +# on the dedicated "Configuration" web page. + +# A configuration can look like: +# configuration: +# table.exec.spill-compression.enabled: true +# table.exec.spill-compression.block-size: 128kb +# table.optimizer.join-reorder-enabled: true #============================================================================== # Deployment properties #============================================================================== -# Deployment properties allow for describing the cluster to which table -# programs are submitted to. +# Properties that describe the cluster to which table programs are submitted to. deployment: # general cluster communication timeout in ms diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java index 64c9453..39bced9 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java @@ -20,6 +20,7 @@ package org.apache.flink.table.client.config; import org.apache.flink.table.client.SqlClientException; import org.apache.flink.table.client.config.entries.CatalogEntry; +import org.apache.flink.table.client.config.entries.ConfigurationEntry; import org.apache.flink.table.client.config.entries.DeploymentEntry; import org.apache.flink.table.client.config.entries.ExecutionEntry; import org.apache.flink.table.client.config.entries.FunctionEntry; @@ -48,6 +49,8 @@ public class Environment { public static final String EXECUTION_ENTRY = "execution"; + public static final String CONFIGURATION_ENTRY = "table"; + public static final String DEPLOYMENT_ENTRY = "deployment"; private Map<String, CatalogEntry> catalogs; @@ -58,6 +61,8 @@ public class Environment { private ExecutionEntry execution; + private ConfigurationEntry configuration; + private DeploymentEntry deployment; public Environment() { @@ -65,6 +70,7 @@ public class Environment { this.tables = Collections.emptyMap(); this.functions = Collections.emptyMap(); this.execution = ExecutionEntry.DEFAULT_INSTANCE; + this.configuration = ConfigurationEntry.DEFAULT_INSTANCE; this.deployment = DeploymentEntry.DEFAULT_INSTANCE; } @@ -128,6 +134,14 @@ public class Environment { return execution; } + public void setConfiguration(Map<String, Object> config) { + this.configuration = ConfigurationEntry.create(config); + } + + public ConfigurationEntry getConfiguration() { + return configuration; + } + public void setDeployment(Map<String, Object> config) { this.deployment = DeploymentEntry.create(config); } @@ -156,6 +170,8 @@ public class Environment { }); sb.append("=================== Execution ====================\n"); execution.asTopLevelMap().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n')); + sb.append("================== Configuration =================\n"); + configuration.asMap().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n')); sb.append("=================== Deployment ===================\n"); deployment.asTopLevelMap().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n')); return sb.toString(); @@ -209,6 +225,9 @@ public class Environment { // merge execution properties mergedEnv.execution = ExecutionEntry.merge(env1.getExecution(), env2.getExecution()); + // merge configuration properties + mergedEnv.configuration = ConfigurationEntry.merge(env1.getConfiguration(), env2.getConfiguration()); + // merge deployment properties mergedEnv.deployment = DeploymentEntry.merge(env1.getDeployment(), env2.getDeployment()); @@ -237,6 +256,9 @@ public class Environment { // enrich execution properties enrichedEnv.execution = ExecutionEntry.enrich(env.execution, properties); + // enrich configuration properties + enrichedEnv.configuration = ConfigurationEntry.enrich(env.configuration, properties); + // enrich deployment properties enrichedEnv.deployment = DeploymentEntry.enrich(env.deployment, properties); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ConfigurationEntry.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ConfigurationEntry.java new file mode 100644 index 0000000..08c0c67 --- /dev/null +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ConfigurationEntry.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config.entries; + +import org.apache.flink.table.client.config.ConfigUtil; +import org.apache.flink.table.descriptors.DescriptorProperties; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.table.client.config.Environment.CONFIGURATION_ENTRY; + +/** + * Configuration for configuring {@link org.apache.flink.table.api.TableConfig}. + */ +public class ConfigurationEntry extends ConfigEntry { + + public static final ConfigurationEntry DEFAULT_INSTANCE = + new ConfigurationEntry(new DescriptorProperties(true)); + + private ConfigurationEntry(DescriptorProperties properties) { + super(properties); + } + + @Override + protected void validate(DescriptorProperties properties) { + // Nothing to validate as the planner will check the options + } + + // -------------------------------------------------------------------------------------------- + + public static ConfigurationEntry create(Map<String, Object> config) { + return new ConfigurationEntry(ConfigUtil.normalizeYaml(config)); + } + + /** + * Merges two configuration entries. The properties of the first configuration entry might be + * overwritten by the second one. + */ + public static ConfigurationEntry merge(ConfigurationEntry configuration1, ConfigurationEntry configuration2) { + final Map<String, String> mergedProperties = new HashMap<>(configuration1.asMap()); + mergedProperties.putAll(configuration2.asMap()); + + final DescriptorProperties properties = new DescriptorProperties(true); + properties.putProperties(mergedProperties); + + return new ConfigurationEntry(properties); + } + + public static ConfigurationEntry enrich(ConfigurationEntry configuration, Map<String, String> prefixedProperties) { + final Map<String, String> enrichedProperties = new HashMap<>(configuration.asMap()); + + prefixedProperties.forEach((k, v) -> { + final String normalizedKey = k.toLowerCase(); + if (k.startsWith(CONFIGURATION_ENTRY + ".")) { + enrichedProperties.put(normalizedKey, v); + } + }); + + final DescriptorProperties properties = new DescriptorProperties(true); + properties.putProperties(enrichedProperties); + + return new ConfigurationEntry(properties); + } +} diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index f212e98..27c3ee2 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -368,6 +368,10 @@ public class ExecutionContext<T> { throw new SqlExecutionException("Unsupported execution type specified."); } + // set table configuration + mergedEnv.getConfiguration().asMap().forEach((k, v) -> + tableEnv.getConfig().getConfiguration().setString(k, v)); + // register catalogs catalogs.forEach(tableEnv::registerCatalog); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index a7fd6b1..3394df7 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -187,6 +187,7 @@ public class LocalExecutor implements Executor { final Map<String, String> properties = new HashMap<>(); properties.putAll(env.getExecution().asTopLevelMap()); properties.putAll(env.getDeployment().asTopLevelMap()); + properties.putAll(env.getConfiguration().asMap()); return properties; } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java index 2ad69e4..94d6fa2 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java @@ -79,6 +79,11 @@ public class EnvironmentTest { assertEquals(tables, merged.getTables().keySet()); assertTrue(merged.getExecution().inStreamingMode()); assertEquals(16, merged.getExecution().getMaxParallelism()); + + final Map<String, String> configuration = new HashMap<>(); + configuration.put("table.optimizer.join-reorder-enabled", "true"); + + assertEquals(configuration, merged.getConfiguration().asMap()); } @Test diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java index aaa8321..a3a9ce9 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java @@ -25,6 +25,8 @@ import org.apache.flink.client.cli.DefaultCLI; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.Types; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.hive.HiveCatalog; @@ -60,6 +62,7 @@ public class ExecutionContextTest { private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml"; private static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-client-catalogs.yaml"; private static final String STREAMING_ENVIRONMENT_FILE = "test-sql-client-streaming.yaml"; + private static final String CONFIGURATION_ENVIRONMENT_FILE = "test-sql-client-configuration.yaml"; @Test public void testExecutionConfig() throws Exception { @@ -230,6 +233,42 @@ public class ExecutionContextTest { tableEnv.scan("TemporalTableUsage").getSchema().getFieldNames()); } + @Test + public void testConfiguration() throws Exception { + final ExecutionContext<?> context = createConfigurationExecutionContext(); + final TableEnvironment tableEnv = context.createEnvironmentInstance().getTableEnvironment(); + + assertEquals( + 100, + tableEnv.getConfig().getConfiguration().getInteger( + ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT)); + assertTrue( + tableEnv.getConfig().getConfiguration().getBoolean( + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED)); + assertEquals( + "128kb", + tableEnv.getConfig().getConfiguration().getString( + ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)); + + assertTrue( + tableEnv.getConfig().getConfiguration().getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED)); + + // these options are not modified and should be equal to their default value + assertEquals( + ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue(), + tableEnv.getConfig().getConfiguration().getBoolean( + ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED)); + assertEquals( + ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE.defaultValue(), + tableEnv.getConfig().getConfiguration().getString( + ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE)); + assertEquals( + OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD.defaultValue().longValue(), + tableEnv.getConfig().getConfiguration().getLong( + OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)); + } + private <T> ExecutionContext<T> createExecutionContext(String file, Map<String, String> replaceVars) throws Exception { final Environment env = EnvironmentFileUtil.parseModified( file, @@ -272,4 +311,8 @@ public class ExecutionContextTest { replaceVars.put("$VAR_CONNECTOR_PROPERTY_VALUE", ""); return createExecutionContext(STREAMING_ENVIRONMENT_FILE, replaceVars); } + + private <T> ExecutionContext<T> createConfigurationExecutionContext() throws Exception { + return createExecutionContext(CONFIGURATION_ENVIRONMENT_FILE, new HashMap<>()); + } } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index 838b95a..963314b 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -250,6 +250,7 @@ public class LocalExecutorITCase extends TestLogger { expectedProperties.put("execution.restart-strategy.max-failures-per-interval", "10"); expectedProperties.put("execution.restart-strategy.failure-rate-interval", "99000"); expectedProperties.put("execution.restart-strategy.delay", "1000"); + expectedProperties.put("table.optimizer.join-reorder-enabled", "false"); expectedProperties.put("deployment.response-timeout", "5000"); assertEquals(expectedProperties, actualProperties); diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-configuration.yaml similarity index 59% copy from flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml copy to flink-table/flink-sql-client/src/test/resources/test-sql-client-configuration.yaml index de60538..2e1b324 100644 --- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml +++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-configuration.yaml @@ -18,39 +18,16 @@ #============================================================================== # TEST ENVIRONMENT FILE -# Intended for org.apache.flink.table.client.gateway.local.DependencyTest. +# This test file is to check whether the configuration can be successfully set. #============================================================================== -# this file has variables that can be filled with content by replacing $VAR_XXX - -tables: - - name: TableNumber1 - type: source-sink-table - update-mode: append - schema: - - name: IntegerField1 - type: INT - - name: StringField1 - type: VARCHAR - - name: rowtimeField - type: TIMESTAMP - rowtime: - timestamps: - type: from-field - from: rowtimeField - watermarks: - type: from-source - connector: - type: "$VAR_CONNECTOR_TYPE" - $VAR_CONNECTOR_PROPERTY: "$VAR_CONNECTOR_PROPERTY_VALUE" - execution: - type: streaming - parallelism: 1 - -deployment: - response-timeout: 5000 + planner: blink + type: batch + result-mode: table -catalogs: - - name: catalog2 - type: DependencyTest +configuration: + table.exec.sort.default-limit: 100 + table.exec.spill-compression.enabled: true + table.exec.spill-compression.block-size: 128kb + table.optimizer.join-reorder-enabled: true diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml index d3e917d..9b6b50c 100644 --- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml +++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml @@ -139,5 +139,8 @@ execution: failure-rate-interval: 99000 delay: 1000 +configuration: + table.optimizer.join-reorder-enabled: false + deployment: response-timeout: 5000 diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml index de60538..6c6ec14 100644 --- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml +++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml @@ -48,6 +48,9 @@ execution: type: streaming parallelism: 1 +configuration: + table.optimizer.join-reorder-enabled: true + deployment: response-timeout: 5000 diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java index 81b464d..8d14dd6 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java @@ -27,6 +27,8 @@ import static org.apache.flink.configuration.ConfigOptions.key; * This class holds configuration constants used by Flink's table module. * * <p>This is only used for the Blink planner. + * + * <p>NOTE: All option keys in this class must start with "table.exec". */ public class ExecutionConfigOptions { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java index c9ad6ef..2256954 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java @@ -27,6 +27,8 @@ import static org.apache.flink.configuration.ConfigOptions.key; * This class holds configuration constants used by Flink's table planner module. * * <p>This is only used for the Blink planner. + * + * <p>NOTE: All option keys in this class must start with "table.optimizer". */ public class OptimizerConfigOptions {