This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cb6dc1  [FLINK-13521][sql-client] Allow setting configurations in SQL 
CLI
6cb6dc1 is described below

commit 6cb6dc1a7074406651b1d547eee45b14f29033c4
Author: TsReaper <tsreape...@gmail.com>
AuthorDate: Fri Aug 2 11:15:23 2019 +0800

    [FLINK-13521][sql-client] Allow setting configurations in SQL CLI
    
    This closes #9328.
---
 docs/dev/table/sqlClient.md                        | 19 +++--
 .../flink-sql-client/conf/sql-client-defaults.yaml | 19 ++++-
 .../flink/table/client/config/Environment.java     | 22 ++++++
 .../client/config/entries/ConfigurationEntry.java  | 81 ++++++++++++++++++++++
 .../client/gateway/local/ExecutionContext.java     |  4 ++
 .../table/client/gateway/local/LocalExecutor.java  |  1 +
 .../client/gateway/local/EnvironmentTest.java      |  5 ++
 .../client/gateway/local/ExecutionContextTest.java | 43 ++++++++++++
 .../client/gateway/local/LocalExecutorITCase.java  |  1 +
 ...ory.yaml => test-sql-client-configuration.yaml} | 41 +++--------
 .../test/resources/test-sql-client-defaults.yaml   |  3 +
 .../test/resources/test-sql-client-factory.yaml    |  3 +
 .../table/api/config/ExecutionConfigOptions.java   |  2 +
 .../table/api/config/OptimizerConfigOptions.java   |  2 +
 14 files changed, 206 insertions(+), 40 deletions(-)

diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index 8ec569d..059cd94 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -213,7 +213,7 @@ catalogs:
      hive-conf-dir: ...
      hive-version: 1.2.1
 
-# Execution properties allow for changing the behavior of a table program.
+# Properties that change the fundamental execution behavior of a table program.
 
 execution:
   planner: old                      # optional: either 'old' (default) or 
'blink'
@@ -233,7 +233,16 @@ execution:
   restart-strategy:                 # optional: restart strategy
     type: fallback                  #   "fallback" to global restart strategy 
by default
 
-# Deployment properties allow for describing the cluster to which table 
programs are submitted to.
+# Configuration options for adjusting and tuning table programs.
+
+# A full list of options and their default values can be found
+# on the dedicated "Configuration" page.
+configuration:
+  table.optimizer.join-reorder-enabled: true
+  table.exec.spill-compression.enabled: true
+  table.exec.spill-compression.block-size: 128kb
+
+# Properties that describe the cluster to which table programs are submitted 
to.
 
 deployment:
   response-timeout: 5000
@@ -245,9 +254,9 @@ This configuration:
 - defines a view `MyCustomView` that declares a virtual table using a SQL 
query,
 - defines a user-defined function `myUDF` that can be instantiated using the 
class name and two constructor parameters,
 - connects to two Hive catalogs and uses `catalog_1` as the current catalog 
with `mydb1` as the current database of the catalog,
-- specifies a parallelism of 1 for queries executed in this streaming 
environment,
-- specifies an event-time characteristic, and
-- runs queries in the `table` result mode.
+- uses the old planner in streaming mode for running statements with 
event-time characteristic and a parallelism of 1,
+- runs exploratory queries in the `table` result mode,
+- and makes some planner adjustments around join reordering and spilling via 
configuration options.
 
 Depending on the use case, a configuration can be split into multiple files. 
Therefore, environment files can be created for general purposes (*defaults 
environment file* using `--defaults`) as well as on a per-session basis 
(*session environment file* using `--environment`). Every CLI session is 
initialized with the default properties followed by the session properties. For 
example, the defaults environment file could specify all table sources that 
should be available for querying in ev [...]
 
diff --git a/flink-table/flink-sql-client/conf/sql-client-defaults.yaml 
b/flink-table/flink-sql-client/conf/sql-client-defaults.yaml
index db6a0ea..f6e8340 100644
--- a/flink-table/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-table/flink-sql-client/conf/sql-client-defaults.yaml
@@ -83,7 +83,7 @@ catalogs: [] # empty list
 # Execution properties
 #==============================================================================
 
-# Execution properties allow for changing the behavior of a table program.
+# Properties that change the fundamental execution behavior of a table program.
 
 execution:
   # select the implementation responsible for planning table programs
@@ -117,13 +117,26 @@ execution:
     # possible values are "fixed-delay", "failure-rate", "none", or "fallback" 
(default)
     type: fallback
 
+#==============================================================================
+# Configuration options
+#==============================================================================
+
+# Configuration options for adjusting and tuning table programs.
+
+# A full list of options and their default values can be found
+# on the dedicated "Configuration" web page.
+
+# A configuration can look like:
+# configuration:
+#   table.exec.spill-compression.enabled: true
+#   table.exec.spill-compression.block-size: 128kb
+#   table.optimizer.join-reorder-enabled: true
 
 #==============================================================================
 # Deployment properties
 #==============================================================================
 
-# Deployment properties allow for describing the cluster to which table
-# programs are submitted to.
+# Properties that describe the cluster to which table programs are submitted 
to.
 
 deployment:
   # general cluster communication timeout in ms
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index 64c9453..39bced9 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.client.config;
 
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.config.entries.CatalogEntry;
+import org.apache.flink.table.client.config.entries.ConfigurationEntry;
 import org.apache.flink.table.client.config.entries.DeploymentEntry;
 import org.apache.flink.table.client.config.entries.ExecutionEntry;
 import org.apache.flink.table.client.config.entries.FunctionEntry;
@@ -48,6 +49,8 @@ public class Environment {
 
        public static final String EXECUTION_ENTRY = "execution";
 
+       public static final String CONFIGURATION_ENTRY = "table";
+
        public static final String DEPLOYMENT_ENTRY = "deployment";
 
        private Map<String, CatalogEntry> catalogs;
@@ -58,6 +61,8 @@ public class Environment {
 
        private ExecutionEntry execution;
 
+       private ConfigurationEntry configuration;
+
        private DeploymentEntry deployment;
 
        public Environment() {
@@ -65,6 +70,7 @@ public class Environment {
                this.tables = Collections.emptyMap();
                this.functions = Collections.emptyMap();
                this.execution = ExecutionEntry.DEFAULT_INSTANCE;
+               this.configuration = ConfigurationEntry.DEFAULT_INSTANCE;
                this.deployment = DeploymentEntry.DEFAULT_INSTANCE;
        }
 
@@ -128,6 +134,14 @@ public class Environment {
                return execution;
        }
 
+       public void setConfiguration(Map<String, Object> config) {
+               this.configuration = ConfigurationEntry.create(config);
+       }
+
+       public ConfigurationEntry getConfiguration() {
+               return configuration;
+       }
+
        public void setDeployment(Map<String, Object> config) {
                this.deployment = DeploymentEntry.create(config);
        }
@@ -156,6 +170,8 @@ public class Environment {
                });
                sb.append("=================== Execution 
====================\n");
                execution.asTopLevelMap().forEach((k, v) -> 
sb.append(k).append(": ").append(v).append('\n'));
+               sb.append("================== Configuration 
=================\n");
+               configuration.asMap().forEach((k, v) -> sb.append(k).append(": 
").append(v).append('\n'));
                sb.append("=================== Deployment 
===================\n");
                deployment.asTopLevelMap().forEach((k, v) -> 
sb.append(k).append(": ").append(v).append('\n'));
                return sb.toString();
@@ -209,6 +225,9 @@ public class Environment {
                // merge execution properties
                mergedEnv.execution = ExecutionEntry.merge(env1.getExecution(), 
env2.getExecution());
 
+               // merge configuration properties
+               mergedEnv.configuration = 
ConfigurationEntry.merge(env1.getConfiguration(), env2.getConfiguration());
+
                // merge deployment properties
                mergedEnv.deployment = 
DeploymentEntry.merge(env1.getDeployment(), env2.getDeployment());
 
@@ -237,6 +256,9 @@ public class Environment {
                // enrich execution properties
                enrichedEnv.execution = ExecutionEntry.enrich(env.execution, 
properties);
 
+               // enrich configuration properties
+               enrichedEnv.configuration = 
ConfigurationEntry.enrich(env.configuration, properties);
+
                // enrich deployment properties
                enrichedEnv.deployment = DeploymentEntry.enrich(env.deployment, 
properties);
 
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ConfigurationEntry.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ConfigurationEntry.java
new file mode 100644
index 0000000..08c0c67
--- /dev/null
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ConfigurationEntry.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config.entries;
+
+import org.apache.flink.table.client.config.ConfigUtil;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.table.client.config.Environment.CONFIGURATION_ENTRY;
+
+/**
+ * Configuration for configuring {@link 
org.apache.flink.table.api.TableConfig}.
+ */
+public class ConfigurationEntry extends ConfigEntry {
+
+       public static final ConfigurationEntry DEFAULT_INSTANCE =
+               new ConfigurationEntry(new DescriptorProperties(true));
+
+       private ConfigurationEntry(DescriptorProperties properties) {
+               super(properties);
+       }
+
+       @Override
+       protected void validate(DescriptorProperties properties) {
+               // Nothing to validate as the planner will check the options
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       public static ConfigurationEntry create(Map<String, Object> config) {
+               return new ConfigurationEntry(ConfigUtil.normalizeYaml(config));
+       }
+
+       /**
+        * Merges two configuration entries. The properties of the first 
configuration entry might be
+        * overwritten by the second one.
+        */
+       public static ConfigurationEntry merge(ConfigurationEntry 
configuration1, ConfigurationEntry configuration2) {
+               final Map<String, String> mergedProperties = new 
HashMap<>(configuration1.asMap());
+               mergedProperties.putAll(configuration2.asMap());
+
+               final DescriptorProperties properties = new 
DescriptorProperties(true);
+               properties.putProperties(mergedProperties);
+
+               return new ConfigurationEntry(properties);
+       }
+
+       public static ConfigurationEntry enrich(ConfigurationEntry 
configuration, Map<String, String> prefixedProperties) {
+               final Map<String, String> enrichedProperties = new 
HashMap<>(configuration.asMap());
+
+               prefixedProperties.forEach((k, v) -> {
+                       final String normalizedKey = k.toLowerCase();
+                       if (k.startsWith(CONFIGURATION_ENTRY + ".")) {
+                               enrichedProperties.put(normalizedKey, v);
+                       }
+               });
+
+               final DescriptorProperties properties = new 
DescriptorProperties(true);
+               properties.putProperties(enrichedProperties);
+
+               return new ConfigurationEntry(properties);
+       }
+}
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index f212e98..27c3ee2 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -368,6 +368,10 @@ public class ExecutionContext<T> {
                                throw new SqlExecutionException("Unsupported 
execution type specified.");
                        }
 
+                       // set table configuration
+                       mergedEnv.getConfiguration().asMap().forEach((k, v) ->
+                               
tableEnv.getConfig().getConfiguration().setString(k, v));
+
                        // register catalogs
                        catalogs.forEach(tableEnv::registerCatalog);
 
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index a7fd6b1..3394df7 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -187,6 +187,7 @@ public class LocalExecutor implements Executor {
                final Map<String, String> properties = new HashMap<>();
                properties.putAll(env.getExecution().asTopLevelMap());
                properties.putAll(env.getDeployment().asTopLevelMap());
+               properties.putAll(env.getConfiguration().asMap());
                return properties;
        }
 
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
index 2ad69e4..94d6fa2 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
@@ -79,6 +79,11 @@ public class EnvironmentTest {
                assertEquals(tables, merged.getTables().keySet());
                assertTrue(merged.getExecution().inStreamingMode());
                assertEquals(16, merged.getExecution().getMaxParallelism());
+
+               final Map<String, String> configuration = new HashMap<>();
+               configuration.put("table.optimizer.join-reorder-enabled", 
"true");
+
+               assertEquals(configuration, merged.getConfiguration().asMap());
        }
 
        @Test
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index aaa8321..a3a9ce9 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
@@ -60,6 +62,7 @@ public class ExecutionContextTest {
        private static final String DEFAULTS_ENVIRONMENT_FILE = 
"test-sql-client-defaults.yaml";
        private static final String CATALOGS_ENVIRONMENT_FILE = 
"test-sql-client-catalogs.yaml";
        private static final String STREAMING_ENVIRONMENT_FILE = 
"test-sql-client-streaming.yaml";
+       private static final String CONFIGURATION_ENVIRONMENT_FILE = 
"test-sql-client-configuration.yaml";
 
        @Test
        public void testExecutionConfig() throws Exception {
@@ -230,6 +233,42 @@ public class ExecutionContextTest {
                        
tableEnv.scan("TemporalTableUsage").getSchema().getFieldNames());
        }
 
+       @Test
+       public void testConfiguration() throws Exception {
+               final ExecutionContext<?> context = 
createConfigurationExecutionContext();
+               final TableEnvironment tableEnv = 
context.createEnvironmentInstance().getTableEnvironment();
+
+               assertEquals(
+                       100,
+                       tableEnv.getConfig().getConfiguration().getInteger(
+                               
ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT));
+               assertTrue(
+                       tableEnv.getConfig().getConfiguration().getBoolean(
+                               
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED));
+               assertEquals(
+                       "128kb",
+                       tableEnv.getConfig().getConfiguration().getString(
+                               
ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE));
+
+               assertTrue(
+                       tableEnv.getConfig().getConfiguration().getBoolean(
+                               
OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED));
+
+               // these options are not modified and should be equal to their 
default value
+               assertEquals(
+                       
ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue(),
+                       tableEnv.getConfig().getConfiguration().getBoolean(
+                               
ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED));
+               assertEquals(
+                       
ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE.defaultValue(),
+                       tableEnv.getConfig().getConfiguration().getString(
+                               
ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE));
+               assertEquals(
+                       
OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD.defaultValue().longValue(),
+                       tableEnv.getConfig().getConfiguration().getLong(
+                               
OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD));
+       }
+
        private <T> ExecutionContext<T> createExecutionContext(String file, 
Map<String, String> replaceVars) throws Exception {
                final Environment env = EnvironmentFileUtil.parseModified(
                        file,
@@ -272,4 +311,8 @@ public class ExecutionContextTest {
                replaceVars.put("$VAR_CONNECTOR_PROPERTY_VALUE", "");
                return createExecutionContext(STREAMING_ENVIRONMENT_FILE, 
replaceVars);
        }
+
+       private <T> ExecutionContext<T> createConfigurationExecutionContext() 
throws Exception {
+               return createExecutionContext(CONFIGURATION_ENVIRONMENT_FILE, 
new HashMap<>());
+       }
 }
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 838b95a..963314b 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -250,6 +250,7 @@ public class LocalExecutorITCase extends TestLogger {
                
expectedProperties.put("execution.restart-strategy.max-failures-per-interval", 
"10");
                
expectedProperties.put("execution.restart-strategy.failure-rate-interval", 
"99000");
                expectedProperties.put("execution.restart-strategy.delay", 
"1000");
+               expectedProperties.put("table.optimizer.join-reorder-enabled", 
"false");
                expectedProperties.put("deployment.response-timeout", "5000");
 
                assertEquals(expectedProperties, actualProperties);
diff --git 
a/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml 
b/flink-table/flink-sql-client/src/test/resources/test-sql-client-configuration.yaml
similarity index 59%
copy from 
flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
copy to 
flink-table/flink-sql-client/src/test/resources/test-sql-client-configuration.yaml
index de60538..2e1b324 100644
--- 
a/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
+++ 
b/flink-table/flink-sql-client/src/test/resources/test-sql-client-configuration.yaml
@@ -18,39 +18,16 @@
 
 #==============================================================================
 # TEST ENVIRONMENT FILE
-# Intended for org.apache.flink.table.client.gateway.local.DependencyTest.
+# This test file is to check whether the configuration can be successfully set.
 #==============================================================================
 
-# this file has variables that can be filled with content by replacing $VAR_XXX
-
-tables:
-  - name: TableNumber1
-    type: source-sink-table
-    update-mode: append
-    schema:
-      - name: IntegerField1
-        type: INT
-      - name: StringField1
-        type: VARCHAR
-      - name: rowtimeField
-        type: TIMESTAMP
-        rowtime:
-          timestamps:
-            type: from-field
-            from: rowtimeField
-          watermarks:
-            type: from-source
-    connector:
-      type: "$VAR_CONNECTOR_TYPE"
-      $VAR_CONNECTOR_PROPERTY: "$VAR_CONNECTOR_PROPERTY_VALUE"
-
 execution:
-  type: streaming
-  parallelism: 1
-
-deployment:
-  response-timeout: 5000
+  planner: blink
+  type: batch
+  result-mode: table
 
-catalogs:
-  - name: catalog2
-    type: DependencyTest
+configuration:
+  table.exec.sort.default-limit: 100
+  table.exec.spill-compression.enabled: true
+  table.exec.spill-compression.block-size: 128kb
+  table.optimizer.join-reorder-enabled: true
diff --git 
a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml 
b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index d3e917d..9b6b50c 100644
--- 
a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ 
b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -139,5 +139,8 @@ execution:
     failure-rate-interval: 99000
     delay: 1000
 
+configuration:
+  table.optimizer.join-reorder-enabled: false
+
 deployment:
   response-timeout: 5000
diff --git 
a/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml 
b/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
index de60538..6c6ec14 100644
--- 
a/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
+++ 
b/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
@@ -48,6 +48,9 @@ execution:
   type: streaming
   parallelism: 1
 
+configuration:
+  table.optimizer.join-reorder-enabled: true
+
 deployment:
   response-timeout: 5000
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 81b464d..8d14dd6 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -27,6 +27,8 @@ import static 
org.apache.flink.configuration.ConfigOptions.key;
  * This class holds configuration constants used by Flink's table module.
  *
  * <p>This is only used for the Blink planner.
+ *
+ * <p>NOTE: All option keys in this class must start with "table.exec".
  */
 public class ExecutionConfigOptions {
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
index c9ad6ef..2256954 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
@@ -27,6 +27,8 @@ import static 
org.apache.flink.configuration.ConfigOptions.key;
  * This class holds configuration constants used by Flink's table planner 
module.
  *
  * <p>This is only used for the Blink planner.
+ *
+ * <p>NOTE: All option keys in this class must start with "table.optimizer".
  */
 public class OptimizerConfigOptions {
 

Reply via email to