[jira] [Updated] (FLINK-16919) Parameter 'jobName' does not take effect in BatchTableEnvUtil
[ https://issues.apache.org/jira/browse/FLINK-16919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhanchun Zhang updated FLINK-16919: --- Description: {code:java} def collect[T]( tEnv: TableEnvironment, table: Table, sink: CollectTableSink[T], jobName: Option[String]): Seq[T] = { // jobName was not used val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType) .asInstanceOf[TypeInformation[T]] .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl] .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig) val id = new AbstractID().toString sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id) val sinkName = UUID.randomUUID().toString tEnv.registerTableSink(sinkName, sink) tEnv.insertInto(s"`$sinkName`", table) val res = tEnv.execute(jobName.getOrElse("test")) val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id) SerializedListAccumulator.deserializeList(accResult, typeSerializer) } {code} Just shown as the code, parameter 'jobName' not used. was: {code:java} def collect[T]( tEnv: TableEnvironment, table: Table, sink: CollectTableSink[T], jobName: Option[String]): Seq[T] = { // jobName was not used val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType) .asInstanceOf[TypeInformation[T]] .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl] .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig) val id = new AbstractID().toString sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id) val sinkName = UUID.randomUUID().toString tEnv.registerTableSink(sinkName, sink) tEnv.insertInto(s"`$sinkName`", table) val res = tEnv.execute(jobName.getOrElse("test")) val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id) SerializedListAccumulator.deserializeList(accResult, typeSerializer) } {code} Just shown as the code, parameter 'jobName' not used. Summary: Parameter 'jobName' does not take effect in BatchTableEnvUtil (was: Parameter 'jobName' not used in BatchTableEnvUtil) > Parameter 'jobName' does not take effect in BatchTableEnvUtil > - > > Key: FLINK-16919 > URL: https://issues.apache.org/jira/browse/FLINK-16919 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Zhanchun Zhang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > {code:java} > def collect[T]( > tEnv: TableEnvironment, > table: Table, > sink: CollectTableSink[T], > jobName: Option[String]): Seq[T] = { // jobName was not used > val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType) > .asInstanceOf[TypeInformation[T]] > .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl] > .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig) > val id = new AbstractID().toString > sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id) > val sinkName = UUID.randomUUID().toString > tEnv.registerTableSink(sinkName, sink) > tEnv.insertInto(s"`$sinkName`", table) > val res = tEnv.execute(jobName.getOrElse("test")) > val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id) > SerializedListAccumulator.deserializeList(accResult, typeSerializer) > } > {code} > Just shown as the code, parameter 'jobName' not used. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16919) Parameter 'jobName' not used in BatchTableEnvUtil
Zhanchun Zhang created FLINK-16919: -- Summary: Parameter 'jobName' not used in BatchTableEnvUtil Key: FLINK-16919 URL: https://issues.apache.org/jira/browse/FLINK-16919 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Zhanchun Zhang {code:java} def collect[T]( tEnv: TableEnvironment, table: Table, sink: CollectTableSink[T], jobName: Option[String]): Seq[T] = { // jobName was not used val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType) .asInstanceOf[TypeInformation[T]] .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl] .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig) val id = new AbstractID().toString sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id) val sinkName = UUID.randomUUID().toString tEnv.registerTableSink(sinkName, sink) tEnv.insertInto(s"`$sinkName`", table) val res = tEnv.execute(jobName.getOrElse("test")) val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id) SerializedListAccumulator.deserializeList(accResult, typeSerializer) } {code} Just shown as the code, parameter 'jobName' not used. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method
[ https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16967203#comment-16967203 ] Zhanchun Zhang commented on FLINK-14591: Hi [~zhongwei][~jark], I'm willing to fix this issue, can you assign it to me. Thanks ~ > Execute PlannerBase#mergeParameters every time of calling > PlannerBase#translate method > --- > > Key: FLINK-14591 > URL: https://issues.apache.org/jira/browse/FLINK-14591 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Wei Zhong >Priority: Minor > > In current implementation of blink planner, the method > "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method > to merge the configuration inside TableConfig into global job parameters: > {code:scala} > override def translate( > modifyOperations: util.List[ModifyOperation]): > util.List[Transformation[_]] = { > if (modifyOperations.isEmpty) { > return List.empty[Transformation[_]] > } > mergeParameters() > val relNodes = modifyOperations.map(translateToRel) > val optimizedRelNodes = optimize(relNodes) > val execNodes = translateToExecNodePlan(optimizedRelNodes) > translateToPlan(execNodes) > } > {code} > This translate method is called in every important moment, e.g. execute, > toDataStream, insertInto, etc. > But as shown above, there is a chance that the method return directly and not > call the "mergeParameters". > In fact if we set some configurations between the "Table#insertInto" method > and "TableEnvironment#execute" method, these configurations will not be > merged into global job parameters because the "mergeParameters" method is not > called: > {code:scala} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env, > EnvironmentSettings.newInstance.useBlinkPlanner.build) > ... > ... > val result = ... > val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink) > tEnv.registerTableSink("MySink", sink) > tEnv.getConfig.getConfiguration.setString("jobparam1", "value1") > result.insertInto("MySink") > > // the "jobparam2" configuration will loss > tEnv.getConfig.getConfiguration.setString("jobparam2", "value2") > tEnv.execute("test") > val jobConfig = env.getConfig.getGlobalJobParameters.toMap > > assertTrue(jobConfig.get("jobparam1")=="value1") > // this assertion will fail: > assertTrue(jobConfig.get("jobparam2")=="value2"){code} > This may bring some confusion to the user. It will be great if we can fix > this problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method
[ https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16967203#comment-16967203 ] Zhanchun Zhang edited comment on FLINK-14591 at 11/5/19 3:30 AM: - Hi [~zhongwei] [~jark], I'm willing to fix this issue, can you assign it to me. Thanks ~ was (Author: dillon.): Hi [~zhongwei][~jark], I'm willing to fix this issue, can you assign it to me. Thanks ~ > Execute PlannerBase#mergeParameters every time of calling > PlannerBase#translate method > --- > > Key: FLINK-14591 > URL: https://issues.apache.org/jira/browse/FLINK-14591 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Wei Zhong >Priority: Minor > > In current implementation of blink planner, the method > "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method > to merge the configuration inside TableConfig into global job parameters: > {code:scala} > override def translate( > modifyOperations: util.List[ModifyOperation]): > util.List[Transformation[_]] = { > if (modifyOperations.isEmpty) { > return List.empty[Transformation[_]] > } > mergeParameters() > val relNodes = modifyOperations.map(translateToRel) > val optimizedRelNodes = optimize(relNodes) > val execNodes = translateToExecNodePlan(optimizedRelNodes) > translateToPlan(execNodes) > } > {code} > This translate method is called in every important moment, e.g. execute, > toDataStream, insertInto, etc. > But as shown above, there is a chance that the method return directly and not > call the "mergeParameters". > In fact if we set some configurations between the "Table#insertInto" method > and "TableEnvironment#execute" method, these configurations will not be > merged into global job parameters because the "mergeParameters" method is not > called: > {code:scala} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env, > EnvironmentSettings.newInstance.useBlinkPlanner.build) > ... > ... > val result = ... > val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink) > tEnv.registerTableSink("MySink", sink) > tEnv.getConfig.getConfiguration.setString("jobparam1", "value1") > result.insertInto("MySink") > > // the "jobparam2" configuration will loss > tEnv.getConfig.getConfiguration.setString("jobparam2", "value2") > tEnv.execute("test") > val jobConfig = env.getConfig.getGlobalJobParameters.toMap > > assertTrue(jobConfig.get("jobparam1")=="value1") > // this assertion will fail: > assertTrue(jobConfig.get("jobparam2")=="value2"){code} > This may bring some confusion to the user. It will be great if we can fix > this problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-14511) Checking YARN queues should add "root" prefix
[ https://issues.apache.org/jira/browse/FLINK-14511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16959404#comment-16959404 ] Zhanchun Zhang edited comment on FLINK-14511 at 10/25/19 10:12 AM: --- Hi [~fly_in_gis]. For fair scheduler, we can use `-yqu root.a.product` and `-yqu a.product` to submit applications to queue `root.a.product`. We cannot use `-yqu product` to submit applications to queue `root.a.product`, as queue `product` does not exist, and got an exception like this: {code:java} 2019-10-25 11:36:02,559 WARN org.apache.flink.yarn.YarnClusterDescriptor - The specified queue 'product' does not exist. Available queues: root.a, root.a.product, root.default, 2019-10-25 11:36:03,065 WARN org.apache.flink.yarn.YarnClusterDescriptor - The configuration directory ('/home/yarn/flink/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. 2019-10-25 11:36:04,938 INFO org.apache.flink.yarn.YarnClusterDescriptor - Submitting application master application_1571906019943_0102 Error while deploying YARN cluster: Couldn't deploy Yarn session cluster java.lang.RuntimeException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:389) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:839) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:627) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:624) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1867) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:624) Caused by: org.apache.hadoop.yarn.exceptions.YarnException: Failed to submit application_1571906019943_0102 to YARN : Application rejected by queue placement policy, queue product does not exist. at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:274) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1013) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployJm(AbstractYarnClusterDescriptor.java:452) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:418) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:384) ... 8 more {code} If we have a `root.a.product` queue in YARN cluster with FairScheduler,we can use `-yqu root.a.product` and `-yqu a.product` to submit applications to the cluster successfully, and cannot use `product` to submit applications as it not exist. But if we use `-yqu a.product` to submit applications will get an warning as follows, because the queues gettting from `yarnClient.getAllQueues()` method are have a `root` prefix for FairScheduer like `root.a, root.a.product`, and leaf queue name for CapacityScheduler like `a, product`. So, I think we should check `root` prefix before checking queues. {code:java} 2019-10-25 12:04:20,711 WARN org.apache.flink.yarn.YarnClusterDescriptor - The specified queue 'a.product' does not exist. Available queues: root.a, root.a.product, root.default, ... {code} was (Author: dillon.): For fair scheduler, we can use `-yqu root.a.product` and `-yqu a.product` to submit applications to queue `root.a.product`. We cannot use `-yqu product` to submit applications to queue `root.a.product`, as queue `product` does not exist, and got an exception like this: {code:java} 2019-10-25 11:36:02,559 WARN org.apache.flink.yarn.YarnClusterDescriptor - The specified queue 'product' does not exist. Available queues: root.a, root.a.product, root.default, 2019-10-25 11:36:03,065 WARN org.apache.flink.yarn.YarnClusterDescriptor - The configuration directory ('/home/yarn/flink/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. 2019-10-25 11:36:04,938 INFO org.apache.flink.yarn.YarnClusterDescriptor - Submitting application master application_1571906019943_0102 Error while deploying YARN cluster: Couldn't deploy Yarn session cluster java.lang.RuntimeException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:389) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:839) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:627) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:624) at
[jira] [Comment Edited] (FLINK-14511) Checking YARN queues should add "root" prefix
[ https://issues.apache.org/jira/browse/FLINK-14511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16959404#comment-16959404 ] Zhanchun Zhang edited comment on FLINK-14511 at 10/25/19 4:09 AM: -- For fair scheduler, we can use `-yqu root.a.product` and `-yqu a.product` to submit applications to queue `root.a.product`. We cannot use `-yqu product` to submit applications to queue `root.a.product`, as queue `product` does not exist, and got an exception like this: {code:java} 2019-10-25 11:36:02,559 WARN org.apache.flink.yarn.YarnClusterDescriptor - The specified queue 'product' does not exist. Available queues: root.a, root.a.product, root.default, 2019-10-25 11:36:03,065 WARN org.apache.flink.yarn.YarnClusterDescriptor - The configuration directory ('/home/yarn/flink/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. 2019-10-25 11:36:04,938 INFO org.apache.flink.yarn.YarnClusterDescriptor - Submitting application master application_1571906019943_0102 Error while deploying YARN cluster: Couldn't deploy Yarn session cluster java.lang.RuntimeException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:389) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:839) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:627) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:624) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1867) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:624) Caused by: org.apache.hadoop.yarn.exceptions.YarnException: Failed to submit application_1571906019943_0102 to YARN : Application rejected by queue placement policy, queue product does not exist. at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:274) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1013) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployJm(AbstractYarnClusterDescriptor.java:452) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:418) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:384) ... 8 more {code} If we have a `root.a.product` queue in YARN cluster with FairScheduler,we can use `-yqu root.a.product` and `-yqu a.product` to submit applications to the cluster successfully, and cannot use `product` to submit applications as it not exist. But if we use `-yqu a.product` to submit applications will get an warning as follows, because the queues gettting from `yarnClient.getAllQueues()` method are have a `root` prefix for FairScheduer like `root.a, root.a.product`, and leaf queue name for CapacityScheduler like `a, product`. So, I think we should check `root` prefix before checking queues. {code:java} 2019-10-25 12:04:20,711 WARN org.apache.flink.yarn.YarnClusterDescriptor - The specified queue 'a.product' does not exist. Available queues: root.a, root.a.product, root.default, ... {code} was (Author: dillon.): For fair scheduler, we can use `-yqu root.a.product` and `-yqu a.product` to submit applications to queue `root.a.product`. We cannot use `-yqu product` to submit applications to queue `root.a.product`, as queue `product` does not exist, and got an exception like this: {code:java} 2019-10-25 11:36:02,559 WARN org.apache.flink.yarn.YarnClusterDescriptor - The specified queue 'product' does not exist. Available queues: root.a, root.a.product, root.default, 2019-10-25 11:36:03,065 WARN org.apache.flink.yarn.YarnClusterDescriptor - The configuration directory ('/home/yarn/flink/1200-test/flink-1.4.2-1200/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. 2019-10-25 11:36:04,938 INFO org.apache.flink.yarn.YarnClusterDescriptor - Submitting application master application_1571906019943_0102 Error while deploying YARN cluster: Couldn't deploy Yarn session cluster java.lang.RuntimeException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:389) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:839) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:627) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.jav
[jira] [Commented] (FLINK-14511) Checking YARN queues should add "root" prefix
[ https://issues.apache.org/jira/browse/FLINK-14511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16959404#comment-16959404 ] Zhanchun Zhang commented on FLINK-14511: For fair scheduler, we can use `-yqu root.a.product` and `-yqu a.product` to submit applications to queue `root.a.product`. We cannot use `-yqu product` to submit applications to queue `root.a.product`, as queue `product` does not exist, and got an exception like this: {code:java} 2019-10-25 11:36:02,559 WARN org.apache.flink.yarn.YarnClusterDescriptor - The specified queue 'product' does not exist. Available queues: root.a, root.a.product, root.default, 2019-10-25 11:36:03,065 WARN org.apache.flink.yarn.YarnClusterDescriptor - The configuration directory ('/home/yarn/flink/1200-test/flink-1.4.2-1200/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. 2019-10-25 11:36:04,938 INFO org.apache.flink.yarn.YarnClusterDescriptor - Submitting application master application_1571906019943_0102 Error while deploying YARN cluster: Couldn't deploy Yarn session cluster java.lang.RuntimeException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:389) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:839) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:627) at org.apache.flink.yarn.cli.FlinkYarnSessionCli$1.call(FlinkYarnSessionCli.java:624) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1867) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:624) Caused by: org.apache.hadoop.yarn.exceptions.YarnException: Failed to submit application_1571906019943_0102 to YARN : Application rejected by queue placement policy, queue product does not exist. at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.submitApplication(YarnClientImpl.java:274) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1013) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployJm(AbstractYarnClusterDescriptor.java:452) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:418) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:384) ... 8 more {code} If we have a `root.a.product` queue in YARN cluster with FairScheduler,we can use `-yqu root.a.product` and `-yqu a.product` to submit applications to the cluster successfully, and cannot use `product` to submit applications as it not exist. But if we use `-yqu a.product` to submit applications will get an warning as follows, because the queues gettting from `yarnClient.getAllQueues()` method are have a `root` prefix for FairScheduer like `root.a, root.a.product`, and leaf queue name for CapacityScheduler like `a, product`. So, I think we should check `root` prefix before checking queues. {code:java} 2019-10-25 12:04:20,711 WARN org.apache.flink.yarn.YarnClusterDescriptor - The specified queue 'a.product' does not exist. Available queues: root.a, root.a.product, root.default, ... {code} > Checking YARN queues should add "root" prefix > - > > Key: FLINK-14511 > URL: https://issues.apache.org/jira/browse/FLINK-14511 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Reporter: Zhanchun Zhang >Assignee: Zhanchun Zhang >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > As we all know, all queues in the YARN cluster are children of the "root" > queue. While submitting an application to "root.product" queue with -qu > product parameter, the client logs that "The specified queue 'product' does > not exist. Available queues". But this queue is exist and we can still > submit application to YARN cluster, which is confusing for users. So I think > that when checking queues should add "root." prefix to the queue name. > {code:java} > List queues = yarnClient.getAllQueues(); > if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are > queues configured in yarn and for this session. > boolean queueFound = false; > for (QueueInfo queue : queues) { > if (queue.getQueueName().equals(this.yarnQueue) { > queueFound = true; > break; > } > } >
[jira] [Comment Edited] (FLINK-14511) Checking YARN queues should add "root" prefix
[ https://issues.apache.org/jira/browse/FLINK-14511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16958879#comment-16958879 ] Zhanchun Zhang edited comment on FLINK-14511 at 10/24/19 1:11 PM: -- Hi [~fly_in_gis], thanks for your reply. You are right. For capacity scheduler, we could only use a unique leaf queue to submit applications, such as: 'product'. For Fair scheduler, we could not only use a full queue name 'root.product', but also 'product' to submit applications, as Fair scheduler will add 'root' prefix while scheduling appliations. The problem is that, if we submit applications with queue name 'product' to a YARN cluster with Fair scheduler, YarnClusterDescriptor#checkYarnQueues method will give a warning information, informs that the 'product' queue does exist. But we still can submit applications to YARN successfully, which is confusing for users. As we don't know what kind of scheduler users use, I think that we should check queues for both schedulers. YarnClusterDescriptor#checkYarnQueues works well for capacity scheduler, but not works fine for fair scheduler. If we have a queue named 'root.product' in YARN with fair scheduler, we can use `-yqu root.product` or `-yqu product` to submit applications to this queue. So, I think we should add a ’root‘ prefix before checking queues for Fair scheduler. was (Author: dillon.): Hi [~fly_in_gis], thanks for your reply. You are right. For capacity scheduler, we could only use a unique leaf queue to submit applications, such as: 'product'. For Fair scheduler, we could not only use a full queue name 'root.product', but also 'product' to submit applications, as Fair scheduler will add 'root' prefix while scheduling appliations. The problem is that, if we submit applications with queue name 'product' to a YARN cluster with Fair scheduler, YarnClusterDescriptor#checkYarnQueues method will give a warning information, informs that the 'product' queue does exist. But we still can submit applications to YARN successfully, which is confusing for users. As we don't know what kind of scheduler user uses, I think that we should check queues for both scheduler. YarnClusterDescriptor#checkYarnQueues works well for capacity scheduler, but not works fine for fair scheduler. If we have a queue named 'root.product' in YARN with fair scheduler, we can use `-yqu root.product` or `-yqu product` to submit applications to this queue. So, I think we should add a ’root‘ prefix for Fair scheduler. > Checking YARN queues should add "root" prefix > - > > Key: FLINK-14511 > URL: https://issues.apache.org/jira/browse/FLINK-14511 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Reporter: Zhanchun Zhang >Assignee: Zhanchun Zhang >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > As we all know, all queues in the YARN cluster are children of the "root" > queue. While submitting an application to "root.product" queue with -qu > product parameter, the client logs that "The specified queue 'product' does > not exist. Available queues". But this queue is exist and we can still > submit application to YARN cluster, which is confusing for users. So I think > that when checking queues should add "root." prefix to the queue name. > {code:java} > List queues = yarnClient.getAllQueues(); > if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are > queues configured in yarn and for this session. > boolean queueFound = false; > for (QueueInfo queue : queues) { > if (queue.getQueueName().equals(this.yarnQueue) { > queueFound = true; > break; > } > } > if (!queueFound) { > String queueNames = ""; > for (QueueInfo queue : queues) { > queueNames += queue.getQueueName() + ", "; > } > LOG.warn("The specified queue '" + this.yarnQueue + "' does not > exist. " + > "Available queues: " + queueNames); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14511) Checking YARN queues should add "root" prefix
[ https://issues.apache.org/jira/browse/FLINK-14511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16958879#comment-16958879 ] Zhanchun Zhang commented on FLINK-14511: Hi [~fly_in_gis], thanks for your reply. You are right. For capacity scheduler, we could only use a unique leaf queue to submit applications, such as: 'product'. For Fair scheduler, we could not only use a full queue name 'root.product', but also 'product' to submit applications, as Fair scheduler will add 'root' prefix while scheduling appliations. The problem is that, if we submit applications with queue name 'product' to a YARN cluster with Fair scheduler, YarnClusterDescriptor#checkYarnQueues method will give a warning information, informs that the 'product' queue does exist. But we still can submit applications to YARN successfully, which is confusing for users. As we don't know what kind of scheduler user uses, I think that we should check queues for both scheduler. YarnClusterDescriptor#checkYarnQueues works well for capacity scheduler, but not works fine for fair scheduler. If we have a queue named 'root.product' in YARN with fair scheduler, we can use `-yqu root.product` or `-yqu product` to submit applications to this queue. So, I think we should add a ’root‘ prefix for Fair scheduler. > Checking YARN queues should add "root" prefix > - > > Key: FLINK-14511 > URL: https://issues.apache.org/jira/browse/FLINK-14511 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Reporter: Zhanchun Zhang >Assignee: Zhanchun Zhang >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > As we all know, all queues in the YARN cluster are children of the "root" > queue. While submitting an application to "root.product" queue with -qu > product parameter, the client logs that "The specified queue 'product' does > not exist. Available queues". But this queue is exist and we can still > submit application to YARN cluster, which is confusing for users. So I think > that when checking queues should add "root." prefix to the queue name. > {code:java} > List queues = yarnClient.getAllQueues(); > if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are > queues configured in yarn and for this session. > boolean queueFound = false; > for (QueueInfo queue : queues) { > if (queue.getQueueName().equals(this.yarnQueue) { > queueFound = true; > break; > } > } > if (!queueFound) { > String queueNames = ""; > for (QueueInfo queue : queues) { > queueNames += queue.getQueueName() + ", "; > } > LOG.warn("The specified queue '" + this.yarnQueue + "' does not > exist. " + > "Available queues: " + queueNames); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14511) Checking YARN queues should add "root" prefix
[ https://issues.apache.org/jira/browse/FLINK-14511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhanchun Zhang updated FLINK-14511: --- Description: As we all know, all queues in the YARN cluster are children of the "root" queue. While submitting an application to "root.product" queue with -qu product parameter, the client logs that "The specified queue 'product' does not exist. Available queues". But this queue is exist and we can still submit application to YARN cluster, which is confusing for users. So I think that when checking queues should add "root." prefix to the queue name. {code:java} List queues = yarnClient.getAllQueues(); if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session. boolean queueFound = false; for (QueueInfo queue : queues) { if (queue.getQueueName().equals(this.yarnQueue) { queueFound = true; break; } } if (!queueFound) { String queueNames = ""; for (QueueInfo queue : queues) { queueNames += queue.getQueueName() + ", "; } LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " + "Available queues: " + queueNames); } } else { {code} was: As we all know, all queues in the YARN cluster are children of the "root" queue. While submitting an application to "root.product" queue with -qu product parameter, the client logs that "The specified queue 'product' does not exist. Available queues". But this is exist and we can still submit application to YARN cluster, which is confusing for users. So I think that when checking queues should add "root." prefix to the queue name. {code:java} List queues = yarnClient.getAllQueues(); if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session. boolean queueFound = false; for (QueueInfo queue : queues) { if (queue.getQueueName().equals(this.yarnQueue) { queueFound = true; break; } } if (!queueFound) { String queueNames = ""; for (QueueInfo queue : queues) { queueNames += queue.getQueueName() + ", "; } LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " + "Available queues: " + queueNames); } } else { {code} > Checking YARN queues should add "root" prefix > - > > Key: FLINK-14511 > URL: https://issues.apache.org/jira/browse/FLINK-14511 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Reporter: Zhanchun Zhang >Priority: Major > > As we all know, all queues in the YARN cluster are children of the "root" > queue. While submitting an application to "root.product" queue with -qu > product parameter, the client logs that "The specified queue 'product' does > not exist. Available queues". But this queue is exist and we can still > submit application to YARN cluster, which is confusing for users. So I think > that when checking queues should add "root." prefix to the queue name. > {code:java} > List queues = yarnClient.getAllQueues(); > if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are > queues configured in yarn and for this session. > boolean queueFound = false; > for (QueueInfo queue : queues) { > if (queue.getQueueName().equals(this.yarnQueue) { > queueFound = true; > break; > } > } > if (!queueFound) { > String queueNames = ""; > for (QueueInfo queue : queues) { > queueNames += queue.getQueueName() + ", "; > } > LOG.warn("The specified queue '" + this.yarnQueue + "' does not > exist. " + > "Available queues: " + queueNames); > } > } else { > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14511) Checking YARN queues should add "root" prefix
[ https://issues.apache.org/jira/browse/FLINK-14511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhanchun Zhang updated FLINK-14511: --- Description: As we all know, all queues in the YARN cluster are children of the "root" queue. While submitting an application to "root.product" queue with -qu product parameter, the client logs that "The specified queue 'product' does not exist. Available queues". But this queue is exist and we can still submit application to YARN cluster, which is confusing for users. So I think that when checking queues should add "root." prefix to the queue name. {code:java} List queues = yarnClient.getAllQueues(); if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session. boolean queueFound = false; for (QueueInfo queue : queues) { if (queue.getQueueName().equals(this.yarnQueue) { queueFound = true; break; } } if (!queueFound) { String queueNames = ""; for (QueueInfo queue : queues) { queueNames += queue.getQueueName() + ", "; } LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " + "Available queues: " + queueNames); } {code} was: As we all know, all queues in the YARN cluster are children of the "root" queue. While submitting an application to "root.product" queue with -qu product parameter, the client logs that "The specified queue 'product' does not exist. Available queues". But this queue is exist and we can still submit application to YARN cluster, which is confusing for users. So I think that when checking queues should add "root." prefix to the queue name. {code:java} List queues = yarnClient.getAllQueues(); if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session. boolean queueFound = false; for (QueueInfo queue : queues) { if (queue.getQueueName().equals(this.yarnQueue) { queueFound = true; break; } } if (!queueFound) { String queueNames = ""; for (QueueInfo queue : queues) { queueNames += queue.getQueueName() + ", "; } LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " + "Available queues: " + queueNames); } } else { {code} > Checking YARN queues should add "root" prefix > - > > Key: FLINK-14511 > URL: https://issues.apache.org/jira/browse/FLINK-14511 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Reporter: Zhanchun Zhang >Priority: Major > > As we all know, all queues in the YARN cluster are children of the "root" > queue. While submitting an application to "root.product" queue with -qu > product parameter, the client logs that "The specified queue 'product' does > not exist. Available queues". But this queue is exist and we can still > submit application to YARN cluster, which is confusing for users. So I think > that when checking queues should add "root." prefix to the queue name. > {code:java} > List queues = yarnClient.getAllQueues(); > if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are > queues configured in yarn and for this session. > boolean queueFound = false; > for (QueueInfo queue : queues) { > if (queue.getQueueName().equals(this.yarnQueue) { > queueFound = true; > break; > } > } > if (!queueFound) { > String queueNames = ""; > for (QueueInfo queue : queues) { > queueNames += queue.getQueueName() + ", "; > } > LOG.warn("The specified queue '" + this.yarnQueue + "' does not > exist. " + > "Available queues: " + queueNames); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14511) Checking YARN queues should add "root" prefix
[ https://issues.apache.org/jira/browse/FLINK-14511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhanchun Zhang updated FLINK-14511: --- Description: As we all know, all queues in the YARN cluster are children of the "root" queue. While submitting an application to "root.product" queue with -qu product parameter, the client logs that "The specified queue 'product' does not exist. Available queues". But this is exist and we can still submit application to YARN cluster, which is confusing for users. So I think that when checking queues should add "root." prefix to the queue name. {code:java} List queues = yarnClient.getAllQueues(); if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session. boolean queueFound = false; for (QueueInfo queue : queues) { if (queue.getQueueName().equals(this.yarnQueue) { queueFound = true; break; } } if (!queueFound) { String queueNames = ""; for (QueueInfo queue : queues) { queueNames += queue.getQueueName() + ", "; } LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " + "Available queues: " + queueNames); } } else { {code} > Checking YARN queues should add "root" prefix > - > > Key: FLINK-14511 > URL: https://issues.apache.org/jira/browse/FLINK-14511 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Reporter: Zhanchun Zhang >Priority: Major > > As we all know, all queues in the YARN cluster are children of the "root" > queue. While submitting an application to "root.product" queue with -qu > product parameter, the client logs that "The specified queue 'product' does > not exist. Available queues". But this is exist and we can still submit > application to YARN cluster, which is confusing for users. So I think that > when checking queues should add "root." prefix to the queue name. > {code:java} > List queues = yarnClient.getAllQueues(); > if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are > queues configured in yarn and for this session. > boolean queueFound = false; > for (QueueInfo queue : queues) { > if (queue.getQueueName().equals(this.yarnQueue) { > queueFound = true; > break; > } > } > if (!queueFound) { > String queueNames = ""; > for (QueueInfo queue : queues) { > queueNames += queue.getQueueName() + ", "; > } > LOG.warn("The specified queue '" + this.yarnQueue + "' does not > exist. " + > "Available queues: " + queueNames); > } > } else { > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14511) Checking YARN queues should add "root" prefix
[ https://issues.apache.org/jira/browse/FLINK-14511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhanchun Zhang updated FLINK-14511: --- Environment: (was: As we all know, all queues in the YARN cluster are children of the "root" queue. While submitting an application to "root.product" queue with -qu product parameter, the client logs that "The specified queue 'product' does not exist. Available queues". But this is exist and we can still submit application to YARN cluster, which is confusing for users. So I think that when checking queues should add "root." prefix to the queue name. {code:java} List queues = yarnClient.getAllQueues(); if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session. boolean queueFound = false; for (QueueInfo queue : queues) { if (queue.getQueueName().equals(this.yarnQueue) { queueFound = true; break; } } if (!queueFound) { String queueNames = ""; for (QueueInfo queue : queues) { queueNames += queue.getQueueName() + ", "; } LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " + "Available queues: " + queueNames); } } else { {code} ) > Checking YARN queues should add "root" prefix > - > > Key: FLINK-14511 > URL: https://issues.apache.org/jira/browse/FLINK-14511 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Reporter: Zhanchun Zhang >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14511) Checking YARN queues should add "root" prefix
Zhanchun Zhang created FLINK-14511: -- Summary: Checking YARN queues should add "root" prefix Key: FLINK-14511 URL: https://issues.apache.org/jira/browse/FLINK-14511 Project: Flink Issue Type: Bug Components: Deployment / YARN Environment: As we all know, all queues in the YARN cluster are children of the "root" queue. While submitting an application to "root.product" queue with -qu product parameter, the client logs that "The specified queue 'product' does not exist. Available queues". But this is exist and we can still submit application to YARN cluster, which is confusing for users. So I think that when checking queues should add "root." prefix to the queue name. {code:java} List queues = yarnClient.getAllQueues(); if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session. boolean queueFound = false; for (QueueInfo queue : queues) { if (queue.getQueueName().equals(this.yarnQueue) { queueFound = true; break; } } if (!queueFound) { String queueNames = ""; for (QueueInfo queue : queues) { queueNames += queue.getQueueName() + ", "; } LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " + "Available queues: " + queueNames); } } else { {code} Reporter: Zhanchun Zhang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-12451) [Bitwise Functions] Add BIT_XOR function supported in Table API and SQL
Zhanchun Zhang created FLINK-12451: -- Summary: [Bitwise Functions] Add BIT_XOR function supported in Table API and SQL Key: FLINK-12451 URL: https://issues.apache.org/jira/browse/FLINK-12451 Project: Flink Issue Type: Sub-task Reporter: Zhanchun Zhang Assignee: Zhanchun Zhang Bitwise XOR, returns an unsigned 64-bit integer. eg: SELECT 1 ^ 1; returns 0 SELECT 1 ^ 0; returns 1 SELECT 11 ^ 3; returns 8 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12450) [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL
Zhanchun Zhang created FLINK-12450: -- Summary: [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL Key: FLINK-12450 URL: https://issues.apache.org/jira/browse/FLINK-12450 Project: Flink Issue Type: Sub-task Reporter: Zhanchun Zhang Assignee: Zhanchun Zhang BIT_LSHIFT, Shifts a long number to the left BIT_RSHIFT, Shifts a long number to the right -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12449) [Bitwise Functions] Add BIT_AND, BIT_OR functions supported in Table API and SQL
Zhanchun Zhang created FLINK-12449: -- Summary: [Bitwise Functions] Add BIT_AND, BIT_OR functions supported in Table API and SQL Key: FLINK-12449 URL: https://issues.apache.org/jira/browse/FLINK-12449 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Zhanchun Zhang Assignee: Zhanchun Zhang Bitwise AND. eg. SELECT BIT_AND(29,15), returns 13 Bitwise OR eg. SELECT BIT_OR(29 ,15), returns 31 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12354) Add Reverse function supported in Table API and SQL
Zhanchun Zhang created FLINK-12354: -- Summary: Add Reverse function supported in Table API and SQL Key: FLINK-12354 URL: https://issues.apache.org/jira/browse/FLINK-12354 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Zhanchun Zhang Assignee: Zhanchun Zhang Returns the string _{{str}}_ with the order of the characters reversed. eg: SELECT REVERSE('abc'); -> 'cba' -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-6976) Add STR_TO_DATE supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhanchun Zhang reassigned FLINK-6976: - Assignee: Zhanchun Zhang > Add STR_TO_DATE supported in TableAPI > - > > Key: FLINK-6976 > URL: https://issues.apache.org/jira/browse/FLINK-6976 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: Zhanchun Zhang >Priority: Major > Labels: starter > > See FLINK-6895 for detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)