[
https://issues.apache.org/jira/browse/FLINK-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014006#comment-17014006
]
Xintong Song edited comment on FLINK-15527 at 1/13/20 3:58 AM:
---------------------------------------------------------------
{quote}> Hive setting too high job parallelism
It is not a problem, that's how it should be. This is not a bug.
I agree that we can introduce max containers in 1.11.
For 1.10, user can control their hive source with
"table.exec.hive.infer-source-parallelism.max" to limit max parallelism. But
not work for complex jobs. If a job have a lot of stages, user are hard to
control the total parallelism in SQL world.
{quote}
Jut trying to understand the problem. If this is not a but, you mean the job is
suppose to have a very high parallelism. But these parallel tasks do not need
to all run at the same time. The user would prefer to use a smaller set or yarn
cluster resources to run these tasks in a pipelined (one after another) manner.
Am I understanding it correctly? [~lzljs3620320]
If that is indeed the case, one can also try to limit the job resource
consumption by configuring a dedicated yarn queue with limited resource quota
for the job. Of course that would be much more inconvenient compared to control
it with flink directly, and may not always be doable (say if the flink user do
not have admin access to the yarn cluster). I'm just trying to provide an idea
of potential workaround for Flink 1.10 as it is.
was (Author: xintongsong):
{quote}> Hive setting too high job parallelism
It is not a problem, that's how it should be. This is not a bug.
I agree that we can introduce max containers in 1.11.
For 1.10, user can control their hive source with
"table.exec.hive.infer-source-parallelism.max" to limit max parallelism. But
not work for complex jobs. If a job have a lot of stages, user are hard to
control the total parallelism in SQL world.
{quote}
Jut trying to understand the problem. If this is not a but, you mean the job is
suppose to have a very high parallelism. But these parallel tasks do not need
to all run at the same time. The user would prefer to use a smaller set or yarn
cluster resources to run these tasks in a pipelined (one after another) manner.
Am I understanding it correctly? [~lzljs3620320]
If that is indeed the case, one can also try to limit the job resource
consumption by configuring a dedicated queue with limited resource quota for
the job. Of course that would be much more inconvenient compared to control it
with flink directly, and may not always be doable (say if the flink user do not
have admin access to the yarn cluster). I'm just trying to provide an idea of
potential workaround for Flink 1.10 as it is.
> can not control the number of container on yarn single job module
> -----------------------------------------------------------------
>
> Key: FLINK-15527
> URL: https://issues.apache.org/jira/browse/FLINK-15527
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Hive
> Affects Versions: 1.10.0
> Reporter: chenchencc
> Priority: Major
> Fix For: 1.10.0
>
> Attachments: flink-conf.yaml, image-2020-01-09-14-30-46-973.png,
> yarn_application.png
>
>
> when run yarn single job run many container but paralism set 4
> *scripts:*
> ./bin/flink run -m yarn-cluster -ys 3 -p 4 -yjm 1024m -ytm 4096m -yqu bi -c
> com.cc.test.HiveTest2 ./cc_jars/hive-1.0-SNAPSHOT.jar 11.txt test61 6
> _notes_: in 1.9.1 has cli paramter -yn to control the number of containers
> and in 1.10 remove it
> *result:*
> the number of containers is 500+
>
> *code use:*
> query the table and save it to the hdfs text
>
> the storge of table is 200g+
>
>
>
>
> *code:*
> com.cc.test.HiveTest2
> public static void main(String[] args) throws Exception
> { EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamExecutionEnvironment settings2 =
> StreamExecutionEnvironment.getExecutionEnvironment();
> settings2.setParallelism(Integer.valueOf(args[2]));
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(settings2,
> settings); String name = "myhive"; String defaultDatabase = "test"; String
> hiveConfDir = "/etc/hive/conf"; String version = "1.2.1"; // or 1.2.1 2.3.4
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,
> version); tableEnv.registerCatalog("myhive", hive);
> tableEnv.useCatalog("myhive"); tableEnv.listTables(); Table table =
> tableEnv.sqlQuery("select id from orderparent_test2 where id =
> 'A000021204170176'"); tableEnv.toAppendStream(table, Row.class).print();
> tableEnv.toAppendStream(table, Row.class)
> .writeAsText("hdfs:///user/chenchao1/"+ args[0],
> FileSystem.WriteMode.OVERWRITE); tableEnv.execute(args[1]); }
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)