1996fanrui commented on code in PR #1031:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/1031#discussion_r2399385455
##########
flink-autoscaler-standalone/README.md:
##########
@@ -69,7 +69,9 @@ Please click [here](../flink-autoscaler/README.md) to check
out extensibility of
`JobAutoScalerContext` of the job. It has a control loop that periodically
calls
`JobListFetcher#fetch` to fetch the job list and scale these jobs.
-Currently `FlinkClusterJobListFetcher` is the only implementation of the
`JobListFetcher`
-interface, that's why `Flink Autoscaler Standalone` only supports a single
Flink cluster so far.
-We will implement `YarnJobListFetcher` in the future, `Flink Autoscaler
Standalone` will call
-`YarnJobListFetcher#fetch` to fetch job list from yarn cluster periodically.
+Currently `FlinkClusterJobListFetcher` and `YarnJobListFetcher` are
implementations of the
+`JobListFetcher` interface. that's why `Flink Autoscaler Standalone` only
supports a single Flink cluster so far.
Review Comment:
> that's why `Flink Autoscaler Standalone` only supports a single Flink
cluster so far.
It no longer makes sense after adding YARN support, and the sentence should
either be removed or rewritten to explain that each fetcher instance still
monitors a single cluster or YARN deployment.
##########
docs/content/docs/custom-resource/autoscaler.md:
##########
@@ -275,6 +275,14 @@
org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \
Updating the `autoscaler.standalone.fetcher.flink-cluster.host` and
`autoscaler.standalone.fetcher.flink-cluster.port`
based on your flink cluster. In general, the host and port are the same as
Flink WebUI.
+To select the job fetcher use:
+
+```
+--autoscaler.standalone.fetcher.type FLINK_CLUSTER|YARN
+```
+
+When running against Flink-on-YARN (`YARN`), set the host/port to the YARN web
proxy endpoint that exposes the JobManager REST API.
Review Comment:
> set the host/port to the YARN web proxy endpoint
Do you mean `autoscaler.standalone.fetcher.flink-cluster.host` and
`autoscaler.standalone.fetcher.flink-cluster.port`?
If yes, it does not make sense. Because all config options with
`autoscaler.standalone.fetcher.flink-cluster` prefix are related to
flink-cluster. It is better to introduce yarn cluster related config options.
##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java:
##########
@@ -49,6 +49,18 @@ private static ConfigOptions.OptionBuilder
autoscalerStandaloneConfig(String key
.defaultValue(100)
.withDescription("The parallelism of autoscaler standalone
control loop.");
+ public enum FetcherType {
+ FLINK_CLUSTER,
+ YARN
+ }
+
+ public static final ConfigOption<FetcherType> FETCHER_TYPE =
+ autoscalerStandaloneConfig("fetcher.type")
+ .enumType(FetcherType.class)
+ .defaultValue(FetcherType.FLINK_CLUSTER)
+ .withDescription(
+ "The job list fetcher type to use. Supported
values: FLINK_CLUSTER, YARN.");
Review Comment:
https://github.com/apache/flink-kubernetes-operator/blob/main/docs/README.md
Please generate docs according to this doc. Also, IIRC, it is not needed to
mentioned values, and doc tools will list all values by default.
##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java:
##########
@@ -73,15 +76,24 @@ JobListFetcher<KEY, Context>
createJobListFetcher(Configuration conf) {
var port = conf.get(FETCHER_FLINK_CLUSTER_PORT);
var restServerAddress = String.format("http://%s:%s", host, port);
- return (JobListFetcher<KEY, Context>)
- new FlinkClusterJobListFetcher(
- configuration ->
- new RestClusterClient<>(
- configuration,
- "clusterId",
- (c, e) ->
- new
StandaloneClientHAServices(restServerAddress)),
- conf.get(FLINK_CLIENT_TIMEOUT));
+ var fetcherType = conf.get(AutoscalerStandaloneOptions.FETCHER_TYPE);
+ FunctionWithException<Configuration, RestClusterClient<String>,
Exception> clientSupplier =
+ configuration ->
+ new RestClusterClient<>(
+ configuration,
+ "clusterId",
+ (c, e) -> new
StandaloneClientHAServices(restServerAddress));
+
+ switch (fetcherType) {
+ case YARN:
+ return (JobListFetcher<KEY, Context>)
+ new YarnJobListFetcher(clientSupplier,
conf.get(FLINK_CLIENT_TIMEOUT));
+ case FLINK_CLUSTER:
+ default:
+ return (JobListFetcher<KEY, Context>)
+ new FlinkClusterJobListFetcher(
+ clientSupplier,
conf.get(FLINK_CLIENT_TIMEOUT));
Review Comment:
The default value of `AutoscalerStandaloneOptions.FETCHER_TYPE` is
`FLINK_CLUSTER`, so including default case that falls back to `FLINK_CLUSTER`
here does not make sense, because it silently accepts invalid configuration
values. Throwing an exception for unknown fetcher types is better. It could
prevent potential bugs if introducing new type in the future.
##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/yarn/YarnJobListFetcher.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.standalone.JobListFetcher;
+import org.apache.flink.autoscaler.utils.JobStatusUtils;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import
org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_HOST;
+import static
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_PORT;
+
+/** Fetch JobAutoScalerContext based on Flink on YARN cluster. */
+public class YarnJobListFetcher implements JobListFetcher<JobID,
JobAutoScalerContext<JobID>> {
+
+ private final FunctionWithException<Configuration,
RestClusterClient<String>, Exception>
+ restClientGetter;
+ private final Duration restClientTimeout;
+
+ public YarnJobListFetcher(
+ FunctionWithException<Configuration, RestClusterClient<String>,
Exception>
+ restClientGetter,
+ Duration restClientTimeout) {
+ this.restClientGetter = restClientGetter;
+ this.restClientTimeout = restClientTimeout;
+ }
+
+ @Override
+ public Collection<JobAutoScalerContext<JobID>> fetch(Configuration
baseConf) throws Exception {
+
+ List<JobAutoScalerContext<JobID>> discovered =
tryFetchFromFirstRunningYarnApp(baseConf);
+ if (!discovered.isEmpty()) {
+ return discovered;
+ }
+
+ // use supplied client factory (may point to direct JM or a reverse
proxy)
+ try (var restClusterClient = restClientGetter.apply(new
Configuration())) {
+ return restClusterClient
+ .sendRequest(
+ JobsOverviewHeaders.getInstance(),
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance())
+ .thenApply(JobStatusUtils::toJobStatusMessage)
+ .get(restClientTimeout.toSeconds(), TimeUnit.SECONDS)
+ .stream()
+ .map(
+ jobStatusMessage -> {
+ try {
+ return generateJobContext(
+ baseConf, restClusterClient,
jobStatusMessage);
+ } catch (Throwable e) {
+ throw new RuntimeException(
+ "generateJobContext throw
exception", e);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+ }
+
+ private List<JobAutoScalerContext<JobID>> tryFetchFromFirstRunningYarnApp(
+ Configuration baseConf) {
+ List<JobAutoScalerContext<JobID>> contexts = new ArrayList<>();
+ YarnClient yarnClient = null;
+ try {
+ yarnClient = YarnClient.createYarnClient();
+ org.apache.hadoop.conf.Configuration yarnConf =
+ new org.apache.hadoop.conf.Configuration();
+ yarnClient.init(yarnConf);
+ yarnClient.start();
Review Comment:
Creating `YarnClient` without any Hadoop configuration, I am not sure
whether it works. Generally, it needs Hadoop configuration files like
`core-site.xml` or `yarn-site.xml` that might be present in the classpath.
##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/yarn/YarnJobListFetcher.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.standalone.JobListFetcher;
+import org.apache.flink.autoscaler.utils.JobStatusUtils;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import
org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_HOST;
+import static
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_PORT;
+
+/** Fetch JobAutoScalerContext based on Flink on YARN cluster. */
+public class YarnJobListFetcher implements JobListFetcher<JobID,
JobAutoScalerContext<JobID>> {
+
+ private final FunctionWithException<Configuration,
RestClusterClient<String>, Exception>
+ restClientGetter;
+ private final Duration restClientTimeout;
+
+ public YarnJobListFetcher(
+ FunctionWithException<Configuration, RestClusterClient<String>,
Exception>
+ restClientGetter,
+ Duration restClientTimeout) {
+ this.restClientGetter = restClientGetter;
+ this.restClientTimeout = restClientTimeout;
+ }
+
+ @Override
+ public Collection<JobAutoScalerContext<JobID>> fetch(Configuration
baseConf) throws Exception {
+
+ List<JobAutoScalerContext<JobID>> discovered =
tryFetchFromFirstRunningYarnApp(baseConf);
+ if (!discovered.isEmpty()) {
+ return discovered;
+ }
+
+ // use supplied client factory (may point to direct JM or a reverse
proxy)
Review Comment:
why fallback to JM or flink cluster here? If this is what the user expects,
why choosing yarn cluster fetcher instead of flink cluster fetcher?
##########
docs/content/docs/custom-resource/autoscaler.md:
##########
@@ -275,6 +275,14 @@
org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \
Updating the `autoscaler.standalone.fetcher.flink-cluster.host` and
`autoscaler.standalone.fetcher.flink-cluster.port`
based on your flink cluster. In general, the host and port are the same as
Flink WebUI.
+To select the job fetcher use:
+
+```
+--autoscaler.standalone.fetcher.type FLINK_CLUSTER|YARN
Review Comment:
How about introducing a whole demo for yarn mode?
##########
flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypointTest.java:
##########
@@ -36,17 +41,39 @@ public void testLoadConfiguration() {
assertNotNull(conf);
assertEquals(Duration.ofMinutes(1), conf.get(CONTROL_LOOP_INTERVAL));
assertEquals(20, conf.get(CONTROL_LOOP_PARALLELISM));
+ assertEquals(AutoscalerStandaloneOptions.FetcherType.FLINK_CLUSTER,
conf.get(FETCHER_TYPE));
// Test for args override
String[] args =
new String[] {
"--autoscaler.standalone.control-loop.interval",
"2min",
"--autoscaler" + ".standalone.control-loop.parallelism",
- "10"
+ "10",
+ "--autoscaler.standalone.fetcher.type",
+ "YARN"
};
Configuration confOverride =
StandaloneAutoscalerEntrypoint.loadConfiguration(args);
assertNotNull(confOverride);
assertEquals(Duration.ofMinutes(2),
confOverride.get(CONTROL_LOOP_INTERVAL));
assertEquals(10, confOverride.get(CONTROL_LOOP_PARALLELISM));
+ assertEquals(AutoscalerStandaloneOptions.FetcherType.YARN,
confOverride.get(FETCHER_TYPE));
+ }
+
+ @Test
+ public void testFetcherTypeSelection() throws Exception {
+ // Default should select FlinkClusterJobListFetcher
+ Configuration confDefault = new Configuration();
+ var method =
+ StandaloneAutoscalerEntrypoint.class.getDeclaredMethod(
+ "createJobListFetcher", Configuration.class);
+ method.setAccessible(true);
+ Object fetcherDefault = method.invoke(null, confDefault);
Review Comment:
Using reflection to test the private `createJobListFetcher` method does not
make sense. A better approach would be to make the method package-private or
add a `VisibleForTesting` annotation.
##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/yarn/YarnJobListFetcher.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.standalone.JobListFetcher;
+import org.apache.flink.autoscaler.utils.JobStatusUtils;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import
org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_HOST;
+import static
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_PORT;
+
+/** Fetch JobAutoScalerContext based on Flink on YARN cluster. */
+public class YarnJobListFetcher implements JobListFetcher<JobID,
JobAutoScalerContext<JobID>> {
+
+ private final FunctionWithException<Configuration,
RestClusterClient<String>, Exception>
+ restClientGetter;
+ private final Duration restClientTimeout;
+
+ public YarnJobListFetcher(
+ FunctionWithException<Configuration, RestClusterClient<String>,
Exception>
+ restClientGetter,
+ Duration restClientTimeout) {
+ this.restClientGetter = restClientGetter;
+ this.restClientTimeout = restClientTimeout;
+ }
+
+ @Override
+ public Collection<JobAutoScalerContext<JobID>> fetch(Configuration
baseConf) throws Exception {
+
+ List<JobAutoScalerContext<JobID>> discovered =
tryFetchFromFirstRunningYarnApp(baseConf);
+ if (!discovered.isEmpty()) {
+ return discovered;
Review Comment:
Sorry, I do not understand this part. Why only fetch flink jobs on the first
yarn application?
IIUC, if yarn cluster has multiple yarn application, autoscaler should work
for all flink jobs on this cluster, is not it?
##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/yarn/YarnJobListFetcher.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.standalone.JobListFetcher;
+import org.apache.flink.autoscaler.utils.JobStatusUtils;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import
org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_HOST;
+import static
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_PORT;
+
+/** Fetch JobAutoScalerContext based on Flink on YARN cluster. */
+public class YarnJobListFetcher implements JobListFetcher<JobID,
JobAutoScalerContext<JobID>> {
+
+ private final FunctionWithException<Configuration,
RestClusterClient<String>, Exception>
+ restClientGetter;
+ private final Duration restClientTimeout;
+
+ public YarnJobListFetcher(
+ FunctionWithException<Configuration, RestClusterClient<String>,
Exception>
+ restClientGetter,
+ Duration restClientTimeout) {
+ this.restClientGetter = restClientGetter;
+ this.restClientTimeout = restClientTimeout;
+ }
+
+ @Override
+ public Collection<JobAutoScalerContext<JobID>> fetch(Configuration
baseConf) throws Exception {
+
+ List<JobAutoScalerContext<JobID>> discovered =
tryFetchFromFirstRunningYarnApp(baseConf);
+ if (!discovered.isEmpty()) {
+ return discovered;
+ }
+
+ // use supplied client factory (may point to direct JM or a reverse
proxy)
+ try (var restClusterClient = restClientGetter.apply(new
Configuration())) {
+ return restClusterClient
+ .sendRequest(
+ JobsOverviewHeaders.getInstance(),
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance())
+ .thenApply(JobStatusUtils::toJobStatusMessage)
+ .get(restClientTimeout.toSeconds(), TimeUnit.SECONDS)
+ .stream()
+ .map(
+ jobStatusMessage -> {
+ try {
+ return generateJobContext(
+ baseConf, restClusterClient,
jobStatusMessage);
+ } catch (Throwable e) {
+ throw new RuntimeException(
+ "generateJobContext throw
exception", e);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+ }
+
+ private List<JobAutoScalerContext<JobID>> tryFetchFromFirstRunningYarnApp(
+ Configuration baseConf) {
+ List<JobAutoScalerContext<JobID>> contexts = new ArrayList<>();
+ YarnClient yarnClient = null;
+ try {
+ yarnClient = YarnClient.createYarnClient();
+ org.apache.hadoop.conf.Configuration yarnConf =
+ new org.apache.hadoop.conf.Configuration();
+ yarnClient.init(yarnConf);
+ yarnClient.start();
+
+ Set<String> appTypes = new HashSet<>();
+ appTypes.add("Apache Flink");
+ List<ApplicationReport> apps =
yarnClient.getApplications(appTypes);
+
+ String rmBase =
+ String.format(
+ "http://%s:%s",
+ baseConf.get(FETCHER_FLINK_CLUSTER_HOST),
+ baseConf.get(FETCHER_FLINK_CLUSTER_PORT));
+
+ for (ApplicationReport app : apps) {
+ if (app.getYarnApplicationState() !=
YarnApplicationState.RUNNING) {
+ continue;
+ }
+ String appId = app.getApplicationId().toString();
+ String proxyBase = rmBase + "/proxy/" + appId;
+
+ try (var client =
+ new RestClusterClient<>(
+ new Configuration(),
+ "clusterId",
+ (c, e) -> new
StandaloneClientHAServices(proxyBase))) {
+ var fetched =
+ client
+ .sendRequest(
+ JobsOverviewHeaders.getInstance(),
+
EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance())
+
.thenApply(JobStatusUtils::toJobStatusMessage)
+ .get(restClientTimeout.toSeconds(),
TimeUnit.SECONDS)
+ .stream()
+ .map(
+ jobStatusMessage -> {
+ try {
+ return
generateJobContextForEndpoint(
+ baseConf,
proxyBase, jobStatusMessage);
+ } catch (Throwable e) {
+ throw new RuntimeException(
+
"generateJobContext throw exception",
+ e);
+ }
+ })
+ .collect(Collectors.toList());
+ contexts.addAll(fetched);
+ }
+ break;
+ }
+ } catch (Throwable ignore) {
+ // Ignore
Review Comment:
It suppresses all exceptions including critical ones like `OutOfMemoryError`
without any logging, making it impossible to diagnose why YARN-based job
discovery failed, such as: do not know if there are configuration issues,
network problems, or authentication failures.
##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/yarn/YarnJobListFetcher.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.standalone.JobListFetcher;
+import org.apache.flink.autoscaler.utils.JobStatusUtils;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import
org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_HOST;
+import static
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_PORT;
+
+/** Fetch JobAutoScalerContext based on Flink on YARN cluster. */
+public class YarnJobListFetcher implements JobListFetcher<JobID,
JobAutoScalerContext<JobID>> {
+
+ private final FunctionWithException<Configuration,
RestClusterClient<String>, Exception>
+ restClientGetter;
+ private final Duration restClientTimeout;
+
+ public YarnJobListFetcher(
+ FunctionWithException<Configuration, RestClusterClient<String>,
Exception>
+ restClientGetter,
+ Duration restClientTimeout) {
+ this.restClientGetter = restClientGetter;
+ this.restClientTimeout = restClientTimeout;
+ }
+
+ @Override
+ public Collection<JobAutoScalerContext<JobID>> fetch(Configuration
baseConf) throws Exception {
+
+ List<JobAutoScalerContext<JobID>> discovered =
tryFetchFromFirstRunningYarnApp(baseConf);
+ if (!discovered.isEmpty()) {
+ return discovered;
+ }
+
+ // use supplied client factory (may point to direct JM or a reverse
proxy)
+ try (var restClusterClient = restClientGetter.apply(new
Configuration())) {
Review Comment:
Why using an empty conf here? It discards all `baseConf` options that might
be needed for the REST client setup such as timeouts, etc
##########
flink-autoscaler-standalone/pom.xml:
##########
@@ -101,6 +101,12 @@ under the License.
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-yarn</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
Review Comment:
Is it possible to minimize the scope of dependencies? For example, only
`yarn-client` is added here.
Also, is it needed to exclude some dependencies to avoid dependency
conflicts?
##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/yarn/YarnJobListFetcher.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone.yarn;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.standalone.JobListFetcher;
+import org.apache.flink.autoscaler.utils.JobStatusUtils;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import
org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_HOST;
+import static
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_PORT;
+
+/** Fetch JobAutoScalerContext based on Flink on YARN cluster. */
+public class YarnJobListFetcher implements JobListFetcher<JobID,
JobAutoScalerContext<JobID>> {
+
+ private final FunctionWithException<Configuration,
RestClusterClient<String>, Exception>
+ restClientGetter;
+ private final Duration restClientTimeout;
+
+ public YarnJobListFetcher(
+ FunctionWithException<Configuration, RestClusterClient<String>,
Exception>
+ restClientGetter,
+ Duration restClientTimeout) {
+ this.restClientGetter = restClientGetter;
+ this.restClientTimeout = restClientTimeout;
+ }
+
+ @Override
+ public Collection<JobAutoScalerContext<JobID>> fetch(Configuration
baseConf) throws Exception {
+
+ List<JobAutoScalerContext<JobID>> discovered =
tryFetchFromFirstRunningYarnApp(baseConf);
+ if (!discovered.isEmpty()) {
+ return discovered;
+ }
+
+ // use supplied client factory (may point to direct JM or a reverse
proxy)
+ try (var restClusterClient = restClientGetter.apply(new
Configuration())) {
+ return restClusterClient
+ .sendRequest(
+ JobsOverviewHeaders.getInstance(),
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance())
+ .thenApply(JobStatusUtils::toJobStatusMessage)
+ .get(restClientTimeout.toSeconds(), TimeUnit.SECONDS)
+ .stream()
+ .map(
+ jobStatusMessage -> {
+ try {
+ return generateJobContext(
+ baseConf, restClusterClient,
jobStatusMessage);
+ } catch (Throwable e) {
+ throw new RuntimeException(
+ "generateJobContext throw
exception", e);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+ }
+
+ private List<JobAutoScalerContext<JobID>> tryFetchFromFirstRunningYarnApp(
+ Configuration baseConf) {
+ List<JobAutoScalerContext<JobID>> contexts = new ArrayList<>();
+ YarnClient yarnClient = null;
+ try {
+ yarnClient = YarnClient.createYarnClient();
+ org.apache.hadoop.conf.Configuration yarnConf =
+ new org.apache.hadoop.conf.Configuration();
+ yarnClient.init(yarnConf);
+ yarnClient.start();
+
+ Set<String> appTypes = new HashSet<>();
+ appTypes.add("Apache Flink");
+ List<ApplicationReport> apps =
yarnClient.getApplications(appTypes);
+
+ String rmBase =
+ String.format(
+ "http://%s:%s",
+ baseConf.get(FETCHER_FLINK_CLUSTER_HOST),
+ baseConf.get(FETCHER_FLINK_CLUSTER_PORT));
+
+ for (ApplicationReport app : apps) {
+ if (app.getYarnApplicationState() !=
YarnApplicationState.RUNNING) {
+ continue;
+ }
+ String appId = app.getApplicationId().toString();
+ String proxyBase = rmBase + "/proxy/" + appId;
+
+ try (var client =
+ new RestClusterClient<>(
+ new Configuration(),
+ "clusterId",
+ (c, e) -> new
StandaloneClientHAServices(proxyBase))) {
+ var fetched =
+ client
+ .sendRequest(
+ JobsOverviewHeaders.getInstance(),
+
EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance())
+
.thenApply(JobStatusUtils::toJobStatusMessage)
+ .get(restClientTimeout.toSeconds(),
TimeUnit.SECONDS)
+ .stream()
+ .map(
+ jobStatusMessage -> {
+ try {
+ return
generateJobContextForEndpoint(
+ baseConf,
proxyBase, jobStatusMessage);
+ } catch (Throwable e) {
+ throw new RuntimeException(
+
"generateJobContext throw exception",
+ e);
+ }
+ })
+ .collect(Collectors.toList());
+ contexts.addAll(fetched);
+ }
+ break;
+ }
+ } catch (Throwable ignore) {
Review Comment:
This catch does not provide fault isolation among jobs or yarn applications,
if one job is stuck on GC or something else, the autoscaler won't work for all
applciations.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]