[ 
https://issues.apache.org/jira/browse/FLINK-26453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marios Trivyzas updated FLINK-26453:
------------------------------------
    Description: 
* *checkNotAllowedConfigurations()* should be called by  
*StreamContextEnvironment#executeAsync()*
 * Description of the *DeploymentOption* should be more clear, and it's not 
only checked by application mode.
 * When using a config option which is the same as the one in the environment 
*(flink-conf.yaml + CLI options)*  we still throw an exception, and we also 
throwing the exception even if the option is not in the environment, but it's 
the default value of the option anywa. Should we check for those cases, or 
should we at least document them and say explicitly that no config option is 
allowed to be set, if the setAsContext

 

 

 

  was:
* *checkNotAllowedConfigurations()* should be called by  
*StreamContextEnvironment#executeAsync()*
 * Description of the *DeploymentOption* should be more clear, and it's not 
only checked by application mode.
 * When using a configuration which is the same as the one in flink-conf.yaml 

Modified code of StreamSQLExample to pass extra config
{noformat}
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

Configuration conf = new Configuration();
conf.set(
        ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER,
        ExecutionConfigOptions.NotNullEnforcer.DROP);

// set up the Java Table API
final StreamTableEnvironment tableEnv =
        StreamTableEnvironment.create(env, 
EnvironmentSettings.fromConfiguration(conf));

final DataStream<Order> orderA =
        env.fromCollection(
                Arrays.asList(
                        new Order(1L, "beer", 3),
                        new Order(1L, "diaper", 4),
                        new Order(3L, "rubber", 2)));

final DataStream<Order> orderB =
        env.fromCollection(
                Arrays.asList(
                        new Order(2L, "pen", 3),
                        new Order(2L, "rubber", 3),
                        new Order(4L, "beer", 1)));

// convert the first DataStream to a Table object
// it will be used "inline" and is not registered in a catalog
final Table tableA = tableEnv.fromDataStream(orderA);

// convert the second DataStream and register it as a view
// it will be accessible under a name
tableEnv.createTemporaryView("TableB", orderB);

// union the two tables
final Table result =
        tableEnv.sqlQuery(
                "SELECT * FROM "
                        + tableA
                        + " WHERE amount > 2 UNION ALL "
                        + "SELECT * FROM TableB WHERE amount < 2");

// convert the Table back to an insert-only DataStream of type `Order`
tableEnv.toDataStream(result, Order.class).print();

// after the table program is converted to a DataStream program,
// we must use `env.execute()` to submit the job
env.execute();


{noformat}
ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER is set to ERROR in 
flink-conf.yaml and yet no exception is thrown, that is because in

StreamTableEnvironmentImpl:
{noformat}
public static StreamTableEnvironment create(
        StreamExecutionEnvironment executionEnvironment,
        EnvironmentSettings settings,
        TableConfig tableConfig) {{noformat}
we use the 
{noformat}
public static Executor lookupExecutor(
        ClassLoader classLoader,
        String executorIdentifier,
        StreamExecutionEnvironment executionEnvironment) {{noformat}
so we don't follow the same path to call the 
StreamContextEnvironment#setAsContext

which checks for overriding options depending on the new flag.

 

 

 


> execution.allow-client-job-configurations not checked for executeAsync
> ----------------------------------------------------------------------
>
>                 Key: FLINK-26453
>                 URL: https://issues.apache.org/jira/browse/FLINK-26453
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.15.0
>            Reporter: Marios Trivyzas
>            Assignee: Fabian Paul
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> * *checkNotAllowedConfigurations()* should be called by  
> *StreamContextEnvironment#executeAsync()*
>  * Description of the *DeploymentOption* should be more clear, and it's not 
> only checked by application mode.
>  * When using a config option which is the same as the one in the environment 
> *(flink-conf.yaml + CLI options)*  we still throw an exception, and we also 
> throwing the exception even if the option is not in the environment, but it's 
> the default value of the option anywa. Should we check for those cases, or 
> should we at least document them and say explicitly that no config option is 
> allowed to be set, if the setAsContext
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to