[jira] [Commented] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method

2021-04-22 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17329651#comment-17329651
 ] 

Flink Jira Bot commented on FLINK-14591:


This issue has been labeled "stale-minor" for 7 days. It is closed now. If you 
are still affected by this or would like to raise the priority of this ticket 
please re-open, removing the label "auto-closed" and raise the ticket priority 
accordingly.

>  Execute PlannerBase#mergeParameters every time of calling 
> PlannerBase#translate method
> ---
>
> Key: FLINK-14591
> URL: https://issues.apache.org/jira/browse/FLINK-14591
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Wei Zhong
>Priority: Minor
>  Labels: stale-minor
>
> In current implementation of blink planner, the method 
> "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method 
> to merge the configuration inside TableConfig into global job parameters:
> {code:scala}
>   override def translate(
>   modifyOperations: util.List[ModifyOperation]): 
> util.List[Transformation[_]] = {
> if (modifyOperations.isEmpty) {
>   return List.empty[Transformation[_]]
> }
> mergeParameters()
> val relNodes = modifyOperations.map(translateToRel)
> val optimizedRelNodes = optimize(relNodes)
> val execNodes = translateToExecNodePlan(optimizedRelNodes)
> translateToPlan(execNodes)
>   }
> {code}
> This translate method is called in every important moment, e.g. execute, 
> toDataStream, insertInto, etc.
> But as shown above, there is a chance that the method return directly and not 
> call the "mergeParameters".
> In fact if we set some configurations between the "Table#insertInto" method 
> and "TableEnvironment#execute" method, these configurations will not be 
> merged into global job parameters because the "mergeParameters" method is not 
> called:
> {code:scala}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env, 
> EnvironmentSettings.newInstance.useBlinkPlanner.build)
> ...
> ...
> val result = ...
> val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink)
> tEnv.registerTableSink("MySink", sink)
> tEnv.getConfig.getConfiguration.setString("jobparam1", "value1")
> result.insertInto("MySink")
> 
> // the "jobparam2" configuration will loss
> tEnv.getConfig.getConfiguration.setString("jobparam2", "value2")
> tEnv.execute("test")
> val jobConfig = env.getConfig.getGlobalJobParameters.toMap
> 
> assertTrue(jobConfig.get("jobparam1")=="value1")
> // this assertion will fail:
> assertTrue(jobConfig.get("jobparam2")=="value2"){code}
> This may bring some confusion to the user. It will be great if we can fix 
> this problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method

2021-04-14 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17321445#comment-17321445
 ] 

Flink Jira Bot commented on FLINK-14591:


This issue and all of its Sub-Tasks have not been updated for 180 days. So, it 
has been labeled "stale-minor". If you are still affected by this bug or are 
still interested in this issue, please give an update and remove the label. In 
7 days the issue will be closed automatically.

>  Execute PlannerBase#mergeParameters every time of calling 
> PlannerBase#translate method
> ---
>
> Key: FLINK-14591
> URL: https://issues.apache.org/jira/browse/FLINK-14591
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Wei Zhong
>Priority: Minor
>  Labels: stale-minor
>
> In current implementation of blink planner, the method 
> "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method 
> to merge the configuration inside TableConfig into global job parameters:
> {code:scala}
>   override def translate(
>   modifyOperations: util.List[ModifyOperation]): 
> util.List[Transformation[_]] = {
> if (modifyOperations.isEmpty) {
>   return List.empty[Transformation[_]]
> }
> mergeParameters()
> val relNodes = modifyOperations.map(translateToRel)
> val optimizedRelNodes = optimize(relNodes)
> val execNodes = translateToExecNodePlan(optimizedRelNodes)
> translateToPlan(execNodes)
>   }
> {code}
> This translate method is called in every important moment, e.g. execute, 
> toDataStream, insertInto, etc.
> But as shown above, there is a chance that the method return directly and not 
> call the "mergeParameters".
> In fact if we set some configurations between the "Table#insertInto" method 
> and "TableEnvironment#execute" method, these configurations will not be 
> merged into global job parameters because the "mergeParameters" method is not 
> called:
> {code:scala}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env, 
> EnvironmentSettings.newInstance.useBlinkPlanner.build)
> ...
> ...
> val result = ...
> val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink)
> tEnv.registerTableSink("MySink", sink)
> tEnv.getConfig.getConfiguration.setString("jobparam1", "value1")
> result.insertInto("MySink")
> 
> // the "jobparam2" configuration will loss
> tEnv.getConfig.getConfiguration.setString("jobparam2", "value2")
> tEnv.execute("test")
> val jobConfig = env.getConfig.getGlobalJobParameters.toMap
> 
> assertTrue(jobConfig.get("jobparam1")=="value1")
> // this assertion will fail:
> assertTrue(jobConfig.get("jobparam2")=="value2"){code}
> This may bring some confusion to the user. It will be great if we can fix 
> this problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method

2019-11-04 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16967212#comment-16967212
 ] 

Jark Wu commented on FLINK-14591:
-

Hi [~Dillon.], I'm not sure the fixing approach yet. 
Hi [~godfreyhe], what do you think about this problem? 

>  Execute PlannerBase#mergeParameters every time of calling 
> PlannerBase#translate method
> ---
>
> Key: FLINK-14591
> URL: https://issues.apache.org/jira/browse/FLINK-14591
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Wei Zhong
>Priority: Minor
>
> In current implementation of blink planner, the method 
> "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method 
> to merge the configuration inside TableConfig into global job parameters:
> {code:scala}
>   override def translate(
>   modifyOperations: util.List[ModifyOperation]): 
> util.List[Transformation[_]] = {
> if (modifyOperations.isEmpty) {
>   return List.empty[Transformation[_]]
> }
> mergeParameters()
> val relNodes = modifyOperations.map(translateToRel)
> val optimizedRelNodes = optimize(relNodes)
> val execNodes = translateToExecNodePlan(optimizedRelNodes)
> translateToPlan(execNodes)
>   }
> {code}
> This translate method is called in every important moment, e.g. execute, 
> toDataStream, insertInto, etc.
> But as shown above, there is a chance that the method return directly and not 
> call the "mergeParameters".
> In fact if we set some configurations between the "Table#insertInto" method 
> and "TableEnvironment#execute" method, these configurations will not be 
> merged into global job parameters because the "mergeParameters" method is not 
> called:
> {code:scala}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env, 
> EnvironmentSettings.newInstance.useBlinkPlanner.build)
> ...
> ...
> val result = ...
> val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink)
> tEnv.registerTableSink("MySink", sink)
> tEnv.getConfig.getConfiguration.setString("jobparam1", "value1")
> result.insertInto("MySink")
> 
> // the "jobparam2" configuration will loss
> tEnv.getConfig.getConfiguration.setString("jobparam2", "value2")
> tEnv.execute("test")
> val jobConfig = env.getConfig.getGlobalJobParameters.toMap
> 
> assertTrue(jobConfig.get("jobparam1")=="value1")
> // this assertion will fail:
> assertTrue(jobConfig.get("jobparam2")=="value2"){code}
> This may bring some confusion to the user. It will be great if we can fix 
> this problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method

2019-11-04 Thread Zhanchun Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16967203#comment-16967203
 ] 

Zhanchun Zhang commented on FLINK-14591:


Hi [~zhongwei][~jark], I'm willing to fix this issue, can you assign it to me. 
Thanks ~

>  Execute PlannerBase#mergeParameters every time of calling 
> PlannerBase#translate method
> ---
>
> Key: FLINK-14591
> URL: https://issues.apache.org/jira/browse/FLINK-14591
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Wei Zhong
>Priority: Minor
>
> In current implementation of blink planner, the method 
> "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method 
> to merge the configuration inside TableConfig into global job parameters:
> {code:scala}
>   override def translate(
>   modifyOperations: util.List[ModifyOperation]): 
> util.List[Transformation[_]] = {
> if (modifyOperations.isEmpty) {
>   return List.empty[Transformation[_]]
> }
> mergeParameters()
> val relNodes = modifyOperations.map(translateToRel)
> val optimizedRelNodes = optimize(relNodes)
> val execNodes = translateToExecNodePlan(optimizedRelNodes)
> translateToPlan(execNodes)
>   }
> {code}
> This translate method is called in every important moment, e.g. execute, 
> toDataStream, insertInto, etc.
> But as shown above, there is a chance that the method return directly and not 
> call the "mergeParameters".
> In fact if we set some configurations between the "Table#insertInto" method 
> and "TableEnvironment#execute" method, these configurations will not be 
> merged into global job parameters because the "mergeParameters" method is not 
> called:
> {code:scala}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env, 
> EnvironmentSettings.newInstance.useBlinkPlanner.build)
> ...
> ...
> val result = ...
> val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink)
> tEnv.registerTableSink("MySink", sink)
> tEnv.getConfig.getConfiguration.setString("jobparam1", "value1")
> result.insertInto("MySink")
> 
> // the "jobparam2" configuration will loss
> tEnv.getConfig.getConfiguration.setString("jobparam2", "value2")
> tEnv.execute("test")
> val jobConfig = env.getConfig.getGlobalJobParameters.toMap
> 
> assertTrue(jobConfig.get("jobparam1")=="value1")
> // this assertion will fail:
> assertTrue(jobConfig.get("jobparam2")=="value2"){code}
> This may bring some confusion to the user. It will be great if we can fix 
> this problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method

2019-11-01 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964784#comment-16964784
 ] 

Jark Wu commented on FLINK-14591:
-

I think we can put the merge logic in {{StreamExecutor#execute()}}. This is the 
last step to submit a job. 

>  Execute PlannerBase#mergeParameters every time of calling 
> PlannerBase#translate method
> ---
>
> Key: FLINK-14591
> URL: https://issues.apache.org/jira/browse/FLINK-14591
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Wei Zhong
>Priority: Minor
>
> In current implementation of blink planner, the method 
> "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method 
> to merge the configuration inside TableConfig into global job parameters:
> {code:scala}
>   override def translate(
>   modifyOperations: util.List[ModifyOperation]): 
> util.List[Transformation[_]] = {
> if (modifyOperations.isEmpty) {
>   return List.empty[Transformation[_]]
> }
> mergeParameters()
> val relNodes = modifyOperations.map(translateToRel)
> val optimizedRelNodes = optimize(relNodes)
> val execNodes = translateToExecNodePlan(optimizedRelNodes)
> translateToPlan(execNodes)
>   }
> {code}
> This translate method is called in every important moment, e.g. execute, 
> toDataStream, insertInto, etc.
> But as shown above, there is a chance that the method return directly and not 
> call the "mergeParameters".
> In fact if we set some configurations between the "Table#insertInto" method 
> and "TableEnvironment#execute" method, these configurations will not be 
> merged into global job parameters because the "mergeParameters" method is not 
> called:
> {code:scala}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env, 
> EnvironmentSettings.newInstance.useBlinkPlanner.build)
> ...
> ...
> val result = ...
> val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink)
> tEnv.registerTableSink("MySink", sink)
> tEnv.getConfig.getConfiguration.setString("jobparam1", "value1")
> result.insertInto("MySink")
> 
> // the "jobparam2" configuration will loss
> tEnv.getConfig.getConfiguration.setString("jobparam2", "value2")
> tEnv.execute("test")
> val jobConfig = env.getConfig.getGlobalJobParameters.toMap
> 
> assertTrue(jobConfig.get("jobparam1")=="value1")
> // this assertion will fail:
> assertTrue(jobConfig.get("jobparam2")=="value2"){code}
> This may bring some confusion to the user. It will be great if we can fix 
> this problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)