[jira] [Commented] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method
[ https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17329651#comment-17329651 ] Flink Jira Bot commented on FLINK-14591: This issue has been labeled "stale-minor" for 7 days. It is closed now. If you are still affected by this or would like to raise the priority of this ticket please re-open, removing the label "auto-closed" and raise the ticket priority accordingly. > Execute PlannerBase#mergeParameters every time of calling > PlannerBase#translate method > --- > > Key: FLINK-14591 > URL: https://issues.apache.org/jira/browse/FLINK-14591 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Wei Zhong >Priority: Minor > Labels: stale-minor > > In current implementation of blink planner, the method > "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method > to merge the configuration inside TableConfig into global job parameters: > {code:scala} > override def translate( > modifyOperations: util.List[ModifyOperation]): > util.List[Transformation[_]] = { > if (modifyOperations.isEmpty) { > return List.empty[Transformation[_]] > } > mergeParameters() > val relNodes = modifyOperations.map(translateToRel) > val optimizedRelNodes = optimize(relNodes) > val execNodes = translateToExecNodePlan(optimizedRelNodes) > translateToPlan(execNodes) > } > {code} > This translate method is called in every important moment, e.g. execute, > toDataStream, insertInto, etc. > But as shown above, there is a chance that the method return directly and not > call the "mergeParameters". > In fact if we set some configurations between the "Table#insertInto" method > and "TableEnvironment#execute" method, these configurations will not be > merged into global job parameters because the "mergeParameters" method is not > called: > {code:scala} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env, > EnvironmentSettings.newInstance.useBlinkPlanner.build) > ... > ... > val result = ... > val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink) > tEnv.registerTableSink("MySink", sink) > tEnv.getConfig.getConfiguration.setString("jobparam1", "value1") > result.insertInto("MySink") > > // the "jobparam2" configuration will loss > tEnv.getConfig.getConfiguration.setString("jobparam2", "value2") > tEnv.execute("test") > val jobConfig = env.getConfig.getGlobalJobParameters.toMap > > assertTrue(jobConfig.get("jobparam1")=="value1") > // this assertion will fail: > assertTrue(jobConfig.get("jobparam2")=="value2"){code} > This may bring some confusion to the user. It will be great if we can fix > this problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method
[ https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17321445#comment-17321445 ] Flink Jira Bot commented on FLINK-14591: This issue and all of its Sub-Tasks have not been updated for 180 days. So, it has been labeled "stale-minor". If you are still affected by this bug or are still interested in this issue, please give an update and remove the label. In 7 days the issue will be closed automatically. > Execute PlannerBase#mergeParameters every time of calling > PlannerBase#translate method > --- > > Key: FLINK-14591 > URL: https://issues.apache.org/jira/browse/FLINK-14591 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Wei Zhong >Priority: Minor > Labels: stale-minor > > In current implementation of blink planner, the method > "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method > to merge the configuration inside TableConfig into global job parameters: > {code:scala} > override def translate( > modifyOperations: util.List[ModifyOperation]): > util.List[Transformation[_]] = { > if (modifyOperations.isEmpty) { > return List.empty[Transformation[_]] > } > mergeParameters() > val relNodes = modifyOperations.map(translateToRel) > val optimizedRelNodes = optimize(relNodes) > val execNodes = translateToExecNodePlan(optimizedRelNodes) > translateToPlan(execNodes) > } > {code} > This translate method is called in every important moment, e.g. execute, > toDataStream, insertInto, etc. > But as shown above, there is a chance that the method return directly and not > call the "mergeParameters". > In fact if we set some configurations between the "Table#insertInto" method > and "TableEnvironment#execute" method, these configurations will not be > merged into global job parameters because the "mergeParameters" method is not > called: > {code:scala} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env, > EnvironmentSettings.newInstance.useBlinkPlanner.build) > ... > ... > val result = ... > val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink) > tEnv.registerTableSink("MySink", sink) > tEnv.getConfig.getConfiguration.setString("jobparam1", "value1") > result.insertInto("MySink") > > // the "jobparam2" configuration will loss > tEnv.getConfig.getConfiguration.setString("jobparam2", "value2") > tEnv.execute("test") > val jobConfig = env.getConfig.getGlobalJobParameters.toMap > > assertTrue(jobConfig.get("jobparam1")=="value1") > // this assertion will fail: > assertTrue(jobConfig.get("jobparam2")=="value2"){code} > This may bring some confusion to the user. It will be great if we can fix > this problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method
[ https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16967212#comment-16967212 ] Jark Wu commented on FLINK-14591: - Hi [~Dillon.], I'm not sure the fixing approach yet. Hi [~godfreyhe], what do you think about this problem? > Execute PlannerBase#mergeParameters every time of calling > PlannerBase#translate method > --- > > Key: FLINK-14591 > URL: https://issues.apache.org/jira/browse/FLINK-14591 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Wei Zhong >Priority: Minor > > In current implementation of blink planner, the method > "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method > to merge the configuration inside TableConfig into global job parameters: > {code:scala} > override def translate( > modifyOperations: util.List[ModifyOperation]): > util.List[Transformation[_]] = { > if (modifyOperations.isEmpty) { > return List.empty[Transformation[_]] > } > mergeParameters() > val relNodes = modifyOperations.map(translateToRel) > val optimizedRelNodes = optimize(relNodes) > val execNodes = translateToExecNodePlan(optimizedRelNodes) > translateToPlan(execNodes) > } > {code} > This translate method is called in every important moment, e.g. execute, > toDataStream, insertInto, etc. > But as shown above, there is a chance that the method return directly and not > call the "mergeParameters". > In fact if we set some configurations between the "Table#insertInto" method > and "TableEnvironment#execute" method, these configurations will not be > merged into global job parameters because the "mergeParameters" method is not > called: > {code:scala} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env, > EnvironmentSettings.newInstance.useBlinkPlanner.build) > ... > ... > val result = ... > val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink) > tEnv.registerTableSink("MySink", sink) > tEnv.getConfig.getConfiguration.setString("jobparam1", "value1") > result.insertInto("MySink") > > // the "jobparam2" configuration will loss > tEnv.getConfig.getConfiguration.setString("jobparam2", "value2") > tEnv.execute("test") > val jobConfig = env.getConfig.getGlobalJobParameters.toMap > > assertTrue(jobConfig.get("jobparam1")=="value1") > // this assertion will fail: > assertTrue(jobConfig.get("jobparam2")=="value2"){code} > This may bring some confusion to the user. It will be great if we can fix > this problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method
[ https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16967203#comment-16967203 ] Zhanchun Zhang commented on FLINK-14591: Hi [~zhongwei][~jark], I'm willing to fix this issue, can you assign it to me. Thanks ~ > Execute PlannerBase#mergeParameters every time of calling > PlannerBase#translate method > --- > > Key: FLINK-14591 > URL: https://issues.apache.org/jira/browse/FLINK-14591 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Wei Zhong >Priority: Minor > > In current implementation of blink planner, the method > "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method > to merge the configuration inside TableConfig into global job parameters: > {code:scala} > override def translate( > modifyOperations: util.List[ModifyOperation]): > util.List[Transformation[_]] = { > if (modifyOperations.isEmpty) { > return List.empty[Transformation[_]] > } > mergeParameters() > val relNodes = modifyOperations.map(translateToRel) > val optimizedRelNodes = optimize(relNodes) > val execNodes = translateToExecNodePlan(optimizedRelNodes) > translateToPlan(execNodes) > } > {code} > This translate method is called in every important moment, e.g. execute, > toDataStream, insertInto, etc. > But as shown above, there is a chance that the method return directly and not > call the "mergeParameters". > In fact if we set some configurations between the "Table#insertInto" method > and "TableEnvironment#execute" method, these configurations will not be > merged into global job parameters because the "mergeParameters" method is not > called: > {code:scala} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env, > EnvironmentSettings.newInstance.useBlinkPlanner.build) > ... > ... > val result = ... > val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink) > tEnv.registerTableSink("MySink", sink) > tEnv.getConfig.getConfiguration.setString("jobparam1", "value1") > result.insertInto("MySink") > > // the "jobparam2" configuration will loss > tEnv.getConfig.getConfiguration.setString("jobparam2", "value2") > tEnv.execute("test") > val jobConfig = env.getConfig.getGlobalJobParameters.toMap > > assertTrue(jobConfig.get("jobparam1")=="value1") > // this assertion will fail: > assertTrue(jobConfig.get("jobparam2")=="value2"){code} > This may bring some confusion to the user. It will be great if we can fix > this problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method
[ https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964784#comment-16964784 ] Jark Wu commented on FLINK-14591: - I think we can put the merge logic in {{StreamExecutor#execute()}}. This is the last step to submit a job. > Execute PlannerBase#mergeParameters every time of calling > PlannerBase#translate method > --- > > Key: FLINK-14591 > URL: https://issues.apache.org/jira/browse/FLINK-14591 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Wei Zhong >Priority: Minor > > In current implementation of blink planner, the method > "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method > to merge the configuration inside TableConfig into global job parameters: > {code:scala} > override def translate( > modifyOperations: util.List[ModifyOperation]): > util.List[Transformation[_]] = { > if (modifyOperations.isEmpty) { > return List.empty[Transformation[_]] > } > mergeParameters() > val relNodes = modifyOperations.map(translateToRel) > val optimizedRelNodes = optimize(relNodes) > val execNodes = translateToExecNodePlan(optimizedRelNodes) > translateToPlan(execNodes) > } > {code} > This translate method is called in every important moment, e.g. execute, > toDataStream, insertInto, etc. > But as shown above, there is a chance that the method return directly and not > call the "mergeParameters". > In fact if we set some configurations between the "Table#insertInto" method > and "TableEnvironment#execute" method, these configurations will not be > merged into global job parameters because the "mergeParameters" method is not > called: > {code:scala} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env, > EnvironmentSettings.newInstance.useBlinkPlanner.build) > ... > ... > val result = ... > val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink) > tEnv.registerTableSink("MySink", sink) > tEnv.getConfig.getConfiguration.setString("jobparam1", "value1") > result.insertInto("MySink") > > // the "jobparam2" configuration will loss > tEnv.getConfig.getConfiguration.setString("jobparam2", "value2") > tEnv.execute("test") > val jobConfig = env.getConfig.getGlobalJobParameters.toMap > > assertTrue(jobConfig.get("jobparam1")=="value1") > // this assertion will fail: > assertTrue(jobConfig.get("jobparam2")=="value2"){code} > This may bring some confusion to the user. It will be great if we can fix > this problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)