[ https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16967212#comment-16967212 ]
Jark Wu commented on FLINK-14591: --------------------------------- Hi [~Dillon.], I'm not sure the fixing approach yet. Hi [~godfreyhe], what do you think about this problem? > Execute PlannerBase#mergeParameters every time of calling > PlannerBase#translate method > --------------------------------------------------------------------------------------- > > Key: FLINK-14591 > URL: https://issues.apache.org/jira/browse/FLINK-14591 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: Wei Zhong > Priority: Minor > > In current implementation of blink planner, the method > "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method > to merge the configuration inside TableConfig into global job parameters: > {code:scala} > override def translate( > modifyOperations: util.List[ModifyOperation]): > util.List[Transformation[_]] = { > if (modifyOperations.isEmpty) { > return List.empty[Transformation[_]] > } > mergeParameters() > val relNodes = modifyOperations.map(translateToRel) > val optimizedRelNodes = optimize(relNodes) > val execNodes = translateToExecNodePlan(optimizedRelNodes) > translateToPlan(execNodes) > } > {code} > This translate method is called in every important moment, e.g. execute, > toDataStream, insertInto, etc. > But as shown above, there is a chance that the method return directly and not > call the "mergeParameters". > In fact if we set some configurations between the "Table#insertInto" method > and "TableEnvironment#execute" method, these configurations will not be > merged into global job parameters because the "mergeParameters" method is not > called: > {code:scala} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env, > EnvironmentSettings.newInstance.useBlinkPlanner.build) > ... > ... > val result = ... > val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink) > tEnv.registerTableSink("MySink", sink) > tEnv.getConfig.getConfiguration.setString("jobparam1", "value1") > result.insertInto("MySink") > > // the "jobparam2" configuration will loss > tEnv.getConfig.getConfiguration.setString("jobparam2", "value2") > tEnv.execute("test") > val jobConfig = env.getConfig.getGlobalJobParameters.toMap > > assertTrue(jobConfig.get("jobparam1")=="value1") > // this assertion will fail: > assertTrue(jobConfig.get("jobparam2")=="value2"){code} > This may bring some confusion to the user. It will be great if we can fix > this problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)