YARN-8299. Added CLI and REST API for query container status. Contributed by Chandni Singh
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/121865c3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/121865c3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/121865c3 Branch: refs/heads/HDFS-12943 Commit: 121865c3f96166e2190ed54b433ebcf8d053b91c Parents: efb4e27 Author: Eric Yang <ey...@apache.org> Authored: Mon Jul 16 17:41:23 2018 -0400 Committer: Eric Yang <ey...@apache.org> Committed: Mon Jul 16 17:41:23 2018 -0400 ---------------------------------------------------------------------- .../yarn/service/client/ApiServiceClient.java | 74 ++++++--- .../hadoop/yarn/service/webapp/ApiServer.java | 67 ++++++-- .../hadoop/yarn/service/ClientAMProtocol.java | 5 + .../hadoop/yarn/service/ClientAMService.java | 14 ++ .../yarn/service/client/ServiceClient.java | 47 ++++++ .../component/instance/ComponentInstance.java | 41 ++++- .../yarn/service/conf/RestApiConstants.java | 5 +- .../pb/client/ClientAMProtocolPBClientImpl.java | 13 ++ .../service/ClientAMProtocolPBServiceImpl.java | 13 ++ .../hadoop/yarn/service/utils/FilterUtils.java | 81 ++++++++++ .../yarn/service/utils/ServiceApiUtil.java | 9 ++ .../src/main/proto/ClientAMProtocol.proto | 12 ++ .../yarn/service/MockRunningServiceContext.java | 154 +++++++++++++++++++ .../yarn/service/client/TestServiceCLI.java | 25 ++- .../yarn/service/client/TestServiceClient.java | 54 ++++++- .../yarn/service/component/TestComponent.java | 133 +--------------- .../instance/TestComponentInstance.java | 46 +++--- .../yarn/service/utils/TestFilterUtils.java | 102 ++++++++++++ .../hadoop/yarn/client/cli/ApplicationCLI.java | 68 +++++++- .../hadoop/yarn/client/cli/TestYarnCLI.java | 6 +- .../hadoop/yarn/client/api/AppAdminClient.java | 6 + 21 files changed, 773 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index 9232fc8..f5162e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -25,8 +25,10 @@ import java.util.List; import java.util.Map; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.UriBuilder; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -48,10 +50,8 @@ import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ServiceStatus; import org.apache.hadoop.yarn.service.conf.RestApiConstants; -import org.apache.hadoop.yarn.service.utils.JsonSerDeser; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.util.RMHAUtils; -import org.codehaus.jackson.map.PropertyNamingStrategy; import org.eclipse.jetty.util.UrlEncoded; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,11 +147,7 @@ public class ApiServiceClient extends AppAdminClient { api.append("/"); api.append(appName); } - Configuration conf = getConfig(); - if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase("simple")) { - api.append("?user.name=" + UrlEncoded - .encodeString(System.getProperty("user.name"))); - } + appendUserNameIfRequired(api); return api.toString(); } @@ -162,15 +158,27 @@ public class ApiServiceClient extends AppAdminClient { api.append(url); api.append("/app/v1/services/").append(appName).append("/") .append(RestApiConstants.COMP_INSTANCES); - Configuration conf = getConfig(); - if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase( - "simple")) { - api.append("?user.name=" + UrlEncoded - .encodeString(System.getProperty("user.name"))); - } + appendUserNameIfRequired(api); return api.toString(); } + private String getInstancePath(String appName, List<String> components, + String version, List<String> containerStates) throws IOException { + UriBuilder builder = UriBuilder.fromUri(getInstancesPath(appName)); + if (components != null && !components.isEmpty()) { + components.forEach(compName -> + builder.queryParam(RestApiConstants.PARAM_COMP_NAME, compName)); + } + if (!Strings.isNullOrEmpty(version)){ + builder.queryParam(RestApiConstants.PARAM_VERSION, version); + } + if (containerStates != null && !containerStates.isEmpty()){ + containerStates.forEach(state -> + builder.queryParam(RestApiConstants.PARAM_CONTAINER_STATE, state)); + } + return builder.build().toString(); + } + private String getComponentsPath(String appName) throws IOException { Preconditions.checkNotNull(appName); String url = getRMWebAddress(); @@ -178,13 +186,17 @@ public class ApiServiceClient extends AppAdminClient { api.append(url); api.append("/app/v1/services/").append(appName).append("/") .append(RestApiConstants.COMPONENTS); + appendUserNameIfRequired(api); + return api.toString(); + } + + private void appendUserNameIfRequired(StringBuilder builder) { Configuration conf = getConfig(); if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase( "simple")) { - api.append("?user.name=" + UrlEncoded + builder.append("?user.name=").append(UrlEncoded .encodeString(System.getProperty("user.name"))); } - return api.toString(); } private Builder getApiClient() throws IOException { @@ -553,7 +565,7 @@ public class ApiServiceClient extends AppAdminClient { container.setState(ContainerState.UPGRADING); toUpgrade[idx++] = container; } - String buffer = CONTAINER_JSON_SERDE.toJson(toUpgrade); + String buffer = ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(toUpgrade); ClientResponse response = getApiClient(getInstancesPath(appName)) .put(ClientResponse.class, buffer); result = processResponse(response); @@ -577,7 +589,7 @@ public class ApiServiceClient extends AppAdminClient { component.setState(ComponentState.UPGRADING); toUpgrade[idx++] = component; } - String buffer = COMP_JSON_SERDE.toJson(toUpgrade); + String buffer = ServiceApiUtil.COMP_JSON_SERDE.toJson(toUpgrade); ClientResponse response = getApiClient(getComponentsPath(appName)) .put(ClientResponse.class, buffer); result = processResponse(response); @@ -599,11 +611,25 @@ public class ApiServiceClient extends AppAdminClient { return result; } - private static final JsonSerDeser<Container[]> CONTAINER_JSON_SERDE = - new JsonSerDeser<>(Container[].class, - PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); - - private static final JsonSerDeser<Component[]> COMP_JSON_SERDE = - new JsonSerDeser<>(Component[].class, - PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); + @Override + public String getInstances(String appName, List<String> components, + String version, List<String> containerStates) throws IOException, + YarnException { + try { + String uri = getInstancePath(appName, components, version, + containerStates); + ClientResponse response = getApiClient(uri).get(ClientResponse.class); + if (response.getStatus() != 200) { + StringBuilder sb = new StringBuilder(); + sb.append("Failed: HTTP error code: "); + sb.append(response.getStatus()); + sb.append(" ErrorMsg: ").append(response.getEntity(String.class)); + return sb.toString(); + } + return response.getEntity(String.class); + } catch (Exception e) { + LOG.error("Fail to get containers {}", e); + } + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index 82fadae..4db0ac8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -44,14 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; +import javax.ws.rs.*; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -61,13 +54,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED; @@ -582,6 +569,40 @@ public class ApiServer { return Response.status(Status.NO_CONTENT).build(); } + @GET + @Path(COMP_INSTANCES_PATH) + @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8}) + public Response getComponentInstances(@Context HttpServletRequest request, + @PathParam(SERVICE_NAME) String serviceName, + @QueryParam(PARAM_COMP_NAME) List<String> componentNames, + @QueryParam(PARAM_VERSION) String version, + @QueryParam(PARAM_CONTAINER_STATE) List<String> containerStates) { + try { + UserGroupInformation ugi = getProxyUser(request); + LOG.info("GET: component instances for service = {}, compNames in {}, " + + "version = {}, containerStates in {}, user = {}", serviceName, + Objects.toString(componentNames, "[]"), Objects.toString(version, ""), + Objects.toString(containerStates, "[]"), ugi); + + List<ContainerState> containerStatesDe = containerStates.stream().map( + ContainerState::valueOf).collect(Collectors.toList()); + + return Response.ok(getContainers(ugi, serviceName, componentNames, + version, containerStatesDe)).build(); + } catch (IllegalArgumentException iae) { + return formatResponse(Status.BAD_REQUEST, "valid container states are: " + + Arrays.toString(ContainerState.values())); + } catch (AccessControlException e) { + return formatResponse(Response.Status.FORBIDDEN, e.getMessage()); + } catch (IOException | InterruptedException e) { + return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, + e.getMessage()); + } catch (UndeclaredThrowableException e) { + return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, + e.getCause().getMessage()); + } + } + private Response flexService(Service service, UserGroupInformation ugi) throws IOException, InterruptedException { String appName = service.getName(); @@ -752,6 +773,22 @@ public class ApiServer { }); } + private Container[] getContainers(UserGroupInformation ugi, + String serviceName, List<String> componentNames, String version, + List<ContainerState> containerStates) throws IOException, + InterruptedException { + return ugi.doAs((PrivilegedExceptionAction<Container[]>) () -> { + Container[] result; + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + result = sc.getContainers(serviceName, componentNames, version, + containerStates); + sc.close(); + return result; + }); + } + /** * Used by negative test case. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java index 45ff98a..652a314 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java @@ -23,6 +23,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRespons import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; @@ -55,4 +57,7 @@ public interface ClientAMProtocol { CompInstancesUpgradeResponseProto upgrade( CompInstancesUpgradeRequestProto request) throws IOException, YarnException; + + GetCompInstancesResponseProto getCompInstances( + GetCompInstancesRequestProto request) throws IOException, YarnException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java index e97c3d6..5bf1833 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java @@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRespons import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; @@ -43,15 +45,18 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; +import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; +import org.apache.hadoop.yarn.service.utils.FilterUtils; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.List; import static org.apache.hadoop.yarn.service.component.ComponentEventType.FLEX; @@ -194,4 +199,13 @@ public class ClientAMService extends AbstractService } return CompInstancesUpgradeResponseProto.newBuilder().build(); } + + @Override + public GetCompInstancesResponseProto getCompInstances( + GetCompInstancesRequestProto request) throws IOException { + List<Container> containers = FilterUtils.filterInstances(context, request); + return GetCompInstancesResponseProto.newBuilder().setCompInstances( + ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(containers.toArray( + new Container[containers.size()]))).build(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 699a4e5..4b67998 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; @@ -66,6 +68,7 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto import org.apache.hadoop.yarn.service.ClientAMProtocol; import org.apache.hadoop.yarn.service.ServiceMaster; import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; @@ -100,6 +103,7 @@ import java.nio.ByteBuffer; import java.text.MessageFormat; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import static org.apache.hadoop.yarn.api.records.YarnApplicationState.*; import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.*; @@ -318,6 +322,49 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, } } + @Override + public String getInstances(String appName, + List<String> components, String version, List<String> containerStates) + throws IOException, YarnException { + GetCompInstancesResponseProto result = filterContainers(appName, components, + version, containerStates); + return result.getCompInstances(); + } + + public Container[] getContainers(String appName, List<String> components, + String version, List<ContainerState> containerStates) + throws IOException, YarnException { + GetCompInstancesResponseProto result = filterContainers(appName, components, + version, containerStates != null ? containerStates.stream() + .map(Enum::toString).collect(Collectors.toList()) : null); + + return ServiceApiUtil.CONTAINER_JSON_SERDE.fromJson( + result.getCompInstances()); + } + + private GetCompInstancesResponseProto filterContainers(String appName, + List<String> components, String version, + List<String> containerStates) throws IOException, YarnException { + ApplicationReport appReport = yarnClient.getApplicationReport(getAppId( + appName)); + if (StringUtils.isEmpty(appReport.getHost())) { + throw new YarnException(appName + " AM hostname is empty."); + } + ClientAMProtocol proxy = createAMProxy(appName, appReport); + GetCompInstancesRequestProto.Builder req = GetCompInstancesRequestProto + .newBuilder(); + if (components != null && !components.isEmpty()) { + req.addAllComponentNames(components); + } + if (version != null) { + req.setVersion(version); + } + if (containerStates != null && !containerStates.isEmpty()){ + req.addAllContainerStates(containerStates); + } + return proxy.getCompInstances(req.build()); + } + public int actionUpgrade(Service service, List<Container> compInstances) throws IOException, YarnException { ApplicationReport appReport = http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 529596d..64f35d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -97,6 +97,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, private long containerStartedTime = 0; // This container object is used for rest API query private org.apache.hadoop.yarn.service.api.records.Container containerSpec; + private String serviceVersion; private static final StateMachineFactory<ComponentInstance, @@ -194,6 +195,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, compInstance.getCompSpec().addContainer(container); compInstance.containerStartedTime = containerStartTime; compInstance.component.incRunningContainers(); + compInstance.serviceVersion = compInstance.scheduler.getApp() + .getVersion(); if (compInstance.timelineServiceEnabled) { compInstance.serviceTimelinePublisher @@ -210,6 +213,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) { compInstance.component.incContainersReady(false); compInstance.component.decContainersThatNeedUpgrade(); + compInstance.serviceVersion = compInstance.component.getUpgradeEvent() + .getUpgradeVersion(); ComponentEvent checkState = new ComponentEvent( compInstance.component.getName(), ComponentEventType.CHECK_STABLE); compInstance.scheduler.getDispatcher().getEventHandler().handle( @@ -382,6 +387,30 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, } } + /** + * Returns the version of service at which the instance is at. + */ + public String getServiceVersion() { + this.readLock.lock(); + try { + return this.serviceVersion; + } finally { + this.readLock.unlock(); + } + } + + /** + * Returns the state of the container in the container spec. + */ + public ContainerState getContainerState() { + this.readLock.lock(); + try { + return this.containerSpec.getState(); + } finally { + this.readLock.unlock(); + } + } + @Override public void handle(ComponentInstanceEvent event) { try { @@ -667,8 +696,16 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, return result; } - @VisibleForTesting public org.apache.hadoop.yarn.service.api.records + /** + * Returns container spec. + */ + public org.apache.hadoop.yarn.service.api.records .Container getContainerSpec() { - return containerSpec; + readLock.lock(); + try { + return containerSpec; + } finally { + readLock.unlock(); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java index 2d7db32..45ad7e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java @@ -37,11 +37,14 @@ public interface RestApiConstants { String COMPONENTS = "components"; String COMPONENTS_PATH = SERVICE_PATH + "/" + COMPONENTS; - // Query param String SERVICE_NAME = "service_name"; String COMPONENT_NAME = "component_name"; String COMP_INSTANCE_NAME = "component_instance_name"; + String PARAM_COMP_NAME = "componentName"; + String PARAM_VERSION = "version"; + String PARAM_CONTAINER_STATE = "containerState"; + String MEDIA_TYPE_JSON_UTF8 = MediaType.APPLICATION_JSON + ";charset=utf-8"; Long DEFAULT_UNLIMITED_LIFETIME = -1l; http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java index e82181e..49ecd2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java @@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRespons import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.service.impl.pb.service.ClientAMProtocolPB; @@ -128,4 +130,15 @@ public class ClientAMProtocolPBClientImpl } return null; } + + @Override + public GetCompInstancesResponseProto getCompInstances( + GetCompInstancesRequestProto request) throws IOException, YarnException { + try { + return proxy.getCompInstances(null, request); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + } + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java index 50a678b..eab3f9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java @@ -25,6 +25,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequest import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; @@ -103,4 +105,15 @@ public class ClientAMProtocolPBServiceImpl implements ClientAMProtocolPB { throw new ServiceException(e); } } + + @Override + public GetCompInstancesResponseProto getCompInstances( + RpcController controller, GetCompInstancesRequestProto request) + throws ServiceException { + try { + return real.getCompInstances(request); + } catch (IOException | YarnException e) { + throw new ServiceException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/FilterUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/FilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/FilterUtils.java new file mode 100644 index 0000000..10f9fea --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/FilterUtils.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.proto.ClientAMProtocol; +import org.apache.hadoop.yarn.service.ServiceContext; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class FilterUtils { + + /** + * Returns containers filtered by requested fields. + * + * @param context service context + * @param filterReq filter request + */ + public static List<Container> filterInstances(ServiceContext context, + ClientAMProtocol.GetCompInstancesRequestProto filterReq) { + List<Container> results = new ArrayList<>(); + Map<ContainerId, ComponentInstance> instances = + context.scheduler.getLiveInstances(); + + instances.forEach(((containerId, instance) -> { + boolean include = true; + if (filterReq.getComponentNamesList() != null && + !filterReq.getComponentNamesList().isEmpty()) { + // filter by component name + if (!filterReq.getComponentNamesList().contains( + instance.getComponent().getName())) { + include = false; + } + } + + if (filterReq.getVersion() != null && !filterReq.getVersion().isEmpty()) { + // filter by version + String instanceServiceVersion = instance.getServiceVersion(); + if (instanceServiceVersion == null || !instanceServiceVersion.equals( + filterReq.getVersion())) { + include = false; + } + } + + if (filterReq.getContainerStatesList() != null && + !filterReq.getContainerStatesList().isEmpty()) { + // filter by state + if (!filterReq.getContainerStatesList().contains( + instance.getContainerState().toString())) { + include = false; + } + } + + if (include) { + results.add(instance.getContainerSpec()); + } + })); + + return results; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java index 705e040..447250f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java @@ -72,6 +72,15 @@ public class ServiceApiUtil { public static JsonSerDeser<Service> jsonSerDeser = new JsonSerDeser<>(Service.class, PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); + + public static final JsonSerDeser<Container[]> CONTAINER_JSON_SERDE = + new JsonSerDeser<>(Container[].class, + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); + + public static final JsonSerDeser<Component[]> COMP_JSON_SERDE = + new JsonSerDeser<>(Component[].class, + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); + private static final PatternValidator namePattern = new PatternValidator("[a-z][a-z0-9-]*"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto index 91721b0..6166ded 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto @@ -32,6 +32,8 @@ service ClientAMProtocolService { returns (RestartServiceResponseProto); rpc upgrade(CompInstancesUpgradeRequestProto) returns (CompInstancesUpgradeResponseProto); + rpc getCompInstances(GetCompInstancesRequestProto) returns + (GetCompInstancesResponseProto); } message FlexComponentsRequestProto { @@ -81,4 +83,14 @@ message CompInstancesUpgradeRequestProto { } message CompInstancesUpgradeResponseProto { +} + +message GetCompInstancesRequestProto { + repeated string componentNames = 1; + optional string version = 2; + repeated string containerStates = 3; +} + +message GetCompInstancesResponseProto { + optional string compInstances = 1; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java new file mode 100644 index 0000000..89888c5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.component.Component; +import org.apache.hadoop.yarn.service.component.ComponentEvent; +import org.apache.hadoop.yarn.service.component.ComponentEventType; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; +import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; +import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.Map; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Mocked service context for a running service. + */ +public class MockRunningServiceContext extends ServiceContext { + + public MockRunningServiceContext(ServiceTestUtils.ServiceFSWatcher fsWatcher, + Service serviceDef) throws Exception { + super(); + this.service = serviceDef; + this.fs = fsWatcher.getFs(); + + ContainerLaunchService mockLaunchService = mock( + ContainerLaunchService.class); + + this.scheduler = new ServiceScheduler(this) { + @Override + protected YarnRegistryViewForProviders + createYarnRegistryOperations( + ServiceContext context, RegistryOperations registryClient) { + return mock(YarnRegistryViewForProviders.class); + } + + @Override + public NMClientAsync createNMClient() { + NMClientAsync nmClientAsync = super.createNMClient(); + NMClient nmClient = mock(NMClient.class); + try { + when(nmClient.getContainerStatus(anyObject(), anyObject())) + .thenAnswer( + (Answer<ContainerStatus>) invocation -> ContainerStatus + .newInstance((ContainerId) invocation.getArguments()[0], + org.apache.hadoop.yarn.api.records.ContainerState + .RUNNING, + "", 0)); + } catch (YarnException | IOException e) { + throw new RuntimeException(e); + } + nmClientAsync.setClient(nmClient); + return nmClientAsync; + } + + @Override + public ContainerLaunchService getContainerLaunchService() { + return mockLaunchService; + } + }; + this.scheduler.init(fsWatcher.getConf()); + + ServiceTestUtils.createServiceManager(this); + + doNothing().when(mockLaunchService). + reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject()); + stabilizeComponents(this); + } + + private void stabilizeComponents(ServiceContext context) { + + ApplicationId appId = ApplicationId.fromString(context.service.getId()); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + context.attemptId = attemptId; + Map<String, Component> + componentState = context.scheduler.getAllComponents(); + + int counter = 0; + for (org.apache.hadoop.yarn.service.api.records.Component componentSpec : + context.service.getComponents()) { + Component component = new org.apache.hadoop.yarn.service.component. + Component(componentSpec, 1L, context); + componentState.put(component.getName(), component); + component.handle(new ComponentEvent(component.getName(), + ComponentEventType.FLEX)); + + for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) { + counter++; + assignNewContainer(attemptId, counter, component); + } + + component.handle(new ComponentEvent(component.getName(), + ComponentEventType.CHECK_STABLE)); + } + } + + public void assignNewContainer(ApplicationAttemptId attemptId, + long containerNum, Component component) { + + Container container = org.apache.hadoop.yarn.api.records.Container + .newInstance(ContainerId.newContainerId(attemptId, containerNum), + NODE_ID, "localhost", null, null, + null); + component.handle(new ComponentEvent(component.getName(), + ComponentEventType.CONTAINER_ALLOCATED) + .setContainer(container).setContainerId(container.getId())); + ComponentInstance instance = this.scheduler.getLiveInstances().get( + container.getId()); + ComponentInstanceEvent startEvent = new ComponentInstanceEvent( + container.getId(), ComponentInstanceEventType.START); + instance.handle(startEvent); + + ComponentInstanceEvent readyEvent = new ComponentInstanceEvent( + container.getId(), ComponentInstanceEventType.BECOME_READY); + instance.handle(readyEvent); + } + + private static final NodeId NODE_ID = NodeId.fromString("localhost:0"); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java index 363fe91..0e047c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java @@ -166,7 +166,7 @@ public class TestServiceCLI { checkApp(serviceName, "master", 1L, 1000L, "qname"); } - @Test (timeout = 180000) + @Test public void testInitiateServiceUpgrade() throws Exception { String[] args = {"app", "-upgrade", "app-1", "-initiate", ExampleAppJson.resourceName(ExampleAppJson.APP_JSON), @@ -185,7 +185,7 @@ public class TestServiceCLI { Assert.assertEquals(result, 0); } - @Test (timeout = 180000) + @Test public void testUpgradeInstances() throws Exception { conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE, DummyServiceClient.class.getName()); @@ -197,7 +197,7 @@ public class TestServiceCLI { Assert.assertEquals(result, 0); } - @Test (timeout = 180000) + @Test public void testUpgradeComponents() throws Exception { conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE, DummyServiceClient.class.getName()); @@ -209,6 +209,18 @@ public class TestServiceCLI { Assert.assertEquals(result, 0); } + @Test + public void testGetInstances() throws Exception { + conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE, + DummyServiceClient.class.getName()); + cli.setConf(conf); + String[] args = {"container", "-list", "app-1", + "-components", "comp1,comp2", + "-appTypes", DUMMY_APP_TYPE}; + int result = cli.run(ApplicationCLI.preProcessArgs(args)); + Assert.assertEquals(result, 0); + } + @Test (timeout = 180000) public void testEnableFastLaunch() throws Exception { fs.getFileSystem().create(new Path(basedir.getAbsolutePath(), "test.jar")) @@ -313,5 +325,12 @@ public class TestServiceCLI { throws IOException, YarnException { return 0; } + + @Override + public String getInstances(String appName, List<String> components, + String version, List<String> containerStates) + throws IOException, YarnException { + return ""; + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java index d3664ea..700655c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service.client; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -32,8 +33,12 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.ClientAMProtocol; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; +import org.apache.hadoop.yarn.service.MockRunningServiceContext; +import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.ServiceTestUtils; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Container; @@ -41,6 +46,7 @@ import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.conf.YarnServiceConf; import org.apache.hadoop.yarn.service.exceptions.ErrorStrings; +import org.apache.hadoop.yarn.service.utils.FilterUtils; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.junit.Assert; import org.junit.Rule; @@ -52,6 +58,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.List; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -122,6 +129,26 @@ public class TestServiceClient { client.stop(); } + @Test + public void testGetCompInstances() throws Exception { + Service service = createService(); + MockServiceClient client = MockServiceClient.create(rule, service, true); + + //upgrade the service + service.setVersion("v2"); + client.initiateUpgrade(service); + + //add containers to the component that needs to be upgraded. + Component comp = service.getComponents().iterator().next(); + ContainerId containerId = ContainerId.newContainerId(client.attemptId, 1L); + comp.addContainer(new Container().id(containerId.toString())); + + Container[] containers = client.getContainers(service.getName(), + Lists.newArrayList("compa"), "v1", null); + Assert.assertEquals("num containers", 2, containers.length); + client.stop(); + } + private Service createService() throws IOException, YarnException { Service service = ServiceTestUtils.createExampleApplication(); @@ -137,6 +164,7 @@ public class TestServiceClient { private final ClientAMProtocol amProxy; private Object proxyResponse; private Service service; + private ServiceContext context; private MockServiceClient() { amProxy = mock(ClientAMProtocol.class); @@ -147,8 +175,12 @@ public class TestServiceClient { static MockServiceClient create(ServiceTestUtils.ServiceFSWatcher rule, Service service, boolean enableUpgrade) - throws IOException, YarnException { + throws Exception { MockServiceClient client = new MockServiceClient(); + ApplicationId applicationId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + service.setId(applicationId.toString()); + client.context = new MockRunningServiceContext(rule, service); YarnClient yarnClient = createMockYarnClient(); ApplicationReport appReport = mock(ApplicationReport.class); @@ -175,10 +207,28 @@ public class TestServiceClient { CompInstancesUpgradeRequestProto.class))).thenAnswer( (Answer<CompInstancesUpgradeResponseProto>) invocation -> { CompInstancesUpgradeResponseProto response = - CompInstancesUpgradeResponseProto.newBuilder().build(); + CompInstancesUpgradeResponseProto.newBuilder().build(); client.proxyResponse = response; return response; }); + + when(client.amProxy.getCompInstances(Matchers.any( + GetCompInstancesRequestProto.class))).thenAnswer( + (Answer<GetCompInstancesResponseProto>) invocation -> { + + GetCompInstancesRequestProto req = (GetCompInstancesRequestProto) + invocation.getArguments()[0]; + List<Container> containers = FilterUtils.filterInstances( + client.context, req); + GetCompInstancesResponseProto response = + GetCompInstancesResponseProto.newBuilder().setCompInstances( + ServiceApiUtil.CONTAINER_JSON_SERDE.toJson( + containers.toArray(new Container[containers.size()]))) + .build(); + client.proxyResponse = response; + return response; + }); + client.setFileSystem(rule.getFs()); client.setYarnClient(yarnClient); client.service = service; http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java index d7c15ec..d5fb941 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java @@ -18,19 +18,10 @@ package org.apache.hadoop.yarn.service.component; -import org.apache.hadoop.registry.client.api.RegistryOperations; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.client.api.NMClient; -import org.apache.hadoop.yarn.client.api.async.NMClientAsync; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.ServiceContext; -import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.ServiceTestUtils; import org.apache.hadoop.yarn.service.TestServiceManager; import org.apache.hadoop.yarn.service.api.records.ComponentState; @@ -38,23 +29,15 @@ import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; - -import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; -import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.apache.hadoop.yarn.service.MockRunningServiceContext; import org.apache.log4j.Logger; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; -import org.mockito.stubbing.Answer; -import java.io.IOException; import java.util.Iterator; -import java.util.Map; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP; - -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -63,7 +46,6 @@ import static org.mockito.Mockito.when; */ public class TestComponent { - private static final int WAIT_MS_PER_LOOP = 1000; static final Logger LOG = Logger.getLogger(TestComponent.class); @Rule @@ -115,7 +97,7 @@ public class TestComponent { @Test public void testContainerCompletedWhenUpgrading() throws Exception { String serviceName = "testContainerComplete"; - ServiceContext context = createTestContext(rule, serviceName); + MockRunningServiceContext context = createTestContext(rule, serviceName); Component comp = context.scheduler.getAllComponents().entrySet().iterator() .next().getValue(); @@ -148,7 +130,7 @@ public class TestComponent { ComponentState.FLEXING, comp.getComponentSpec().getState()); // new container get allocated - assignNewContainer(context.attemptId, 10, context, comp); + context.assignNewContainer(context.attemptId, 10, comp); // second instance finished upgrading ComponentInstance instance2 = instanceIter.next(); @@ -174,7 +156,7 @@ public class TestComponent { serviceName); TestServiceManager.createDef(serviceName, testService); - ServiceContext context = createTestContext(rule, testService); + ServiceContext context = new MockRunningServiceContext(rule, testService); for (Component comp : context.scheduler.getAllComponents().values()) { @@ -225,114 +207,11 @@ public class TestComponent { return spec; } - public static ServiceContext createTestContext( + public static MockRunningServiceContext createTestContext( ServiceTestUtils.ServiceFSWatcher fsWatcher, String serviceName) throws Exception { - return createTestContext(fsWatcher, + return new MockRunningServiceContext(fsWatcher, TestServiceManager.createBaseDef(serviceName)); } - - public static ServiceContext createTestContext( - ServiceTestUtils.ServiceFSWatcher fsWatcher, Service serviceDef) - throws Exception { - ServiceContext context = new ServiceContext(); - context.service = serviceDef; - context.fs = fsWatcher.getFs(); - - ContainerLaunchService mockLaunchService = mock( - ContainerLaunchService.class); - - context.scheduler = new ServiceScheduler(context) { - @Override protected YarnRegistryViewForProviders - createYarnRegistryOperations( - ServiceContext context, RegistryOperations registryClient) { - return mock(YarnRegistryViewForProviders.class); - } - - @Override public NMClientAsync createNMClient() { - NMClientAsync nmClientAsync = super.createNMClient(); - NMClient nmClient = mock(NMClient.class); - try { - when(nmClient.getContainerStatus(anyObject(), anyObject())) - .thenAnswer( - (Answer<ContainerStatus>) invocation -> ContainerStatus - .newInstance((ContainerId) invocation.getArguments()[0], - org.apache.hadoop.yarn.api.records.ContainerState - .RUNNING, - "", 0)); - } catch (YarnException | IOException e) { - throw new RuntimeException(e); - } - nmClientAsync.setClient(nmClient); - return nmClientAsync; - } - - @Override public ContainerLaunchService getContainerLaunchService() { - return mockLaunchService; - } - }; - context.scheduler.init(fsWatcher.getConf()); - - ServiceTestUtils.createServiceManager(context); - - doNothing().when(mockLaunchService). - reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject()); - stabilizeComponents(context); - - return context; - } - - private static void stabilizeComponents(ServiceContext context) { - - ApplicationId appId = ApplicationId.fromString(context.service.getId()); - ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); - context.attemptId = attemptId; - Map<String, Component> - componentState = context.scheduler.getAllComponents(); - - int counter = 0; - for (org.apache.hadoop.yarn.service.api.records.Component componentSpec : - context.service.getComponents()) { - Component component = new org.apache.hadoop.yarn.service.component. - Component(componentSpec, 1L, context); - componentState.put(component.getName(), component); - component.handle(new ComponentEvent(component.getName(), - ComponentEventType.FLEX)); - - for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) { - counter++; - assignNewContainer(attemptId, counter, context, component); - } - - component.handle(new ComponentEvent(component.getName(), - ComponentEventType.CHECK_STABLE)); - } - } - - private static void assignNewContainer( - ApplicationAttemptId attemptId, long containerNum, - ServiceContext context, Component component) { - - - Container container = org.apache.hadoop.yarn.api.records.Container - .newInstance(ContainerId.newContainerId(attemptId, containerNum), - NODE_ID, "localhost", null, null, - null); - component.handle(new ComponentEvent(component.getName(), - ComponentEventType.CONTAINER_ALLOCATED) - .setContainer(container).setContainerId(container.getId())); - ComponentInstance instance = context.scheduler.getLiveInstances().get( - container.getId()); - ComponentInstanceEvent startEvent = new ComponentInstanceEvent( - container.getId(), ComponentInstanceEventType.START); - instance.handle(startEvent); - - ComponentInstanceEvent readyEvent = new ComponentInstanceEvent( - container.getId(), ComponentInstanceEventType.BECOME_READY); - instance.handle(readyEvent); - } - - private static final NodeId NODE_ID = NodeId.fromString("localhost:0"); - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java index 26e8c93..0e7816c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -60,19 +60,20 @@ import static org.mockito.Mockito.when; */ public class TestComponentInstance { - @Rule public ServiceTestUtils.ServiceFSWatcher rule = + @Rule + public ServiceTestUtils.ServiceFSWatcher rule = new ServiceTestUtils.ServiceFSWatcher(); - @Test public void testContainerUpgrade() throws Exception { + @Test + public void testContainerUpgrade() throws Exception { ServiceContext context = TestComponent.createTestContext(rule, "testContainerUpgrade"); - Component component = - context.scheduler.getAllComponents().entrySet().iterator().next() - .getValue(); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); upgradeComponent(component); - ComponentInstance instance = - component.getAllComponentInstances().iterator().next(); + ComponentInstance instance = component.getAllComponentInstances().iterator() + .next(); ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent( instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); instance.handle(instanceEvent); @@ -82,16 +83,16 @@ public class TestComponentInstance { containerSpec.getState()); } - @Test public void testContainerReadyAfterUpgrade() throws Exception { + @Test + public void testContainerReadyAfterUpgrade() throws Exception { ServiceContext context = TestComponent.createTestContext(rule, "testContainerStarted"); - Component component = - context.scheduler.getAllComponents().entrySet().iterator().next() - .getValue(); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); upgradeComponent(component); - ComponentInstance instance = - component.getAllComponentInstances().iterator().next(); + ComponentInstance instance = component.getAllComponentInstances().iterator() + .next(); ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent( instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); @@ -100,9 +101,8 @@ public class TestComponentInstance { instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(), ComponentInstanceEventType.BECOME_READY)); Assert.assertEquals("instance not ready", ContainerState.READY, - instance.getCompSpec() - .getContainer(instance.getContainer().getId().toString()) - .getState()); + instance.getCompSpec().getContainer( + instance.getContainer().getId().toString()).getState()); } private void upgradeComponent(Component component) { @@ -113,9 +113,8 @@ public class TestComponentInstance { private Component createComponent(ServiceScheduler scheduler, org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum - restartPolicy, - int nSucceededInstances, int nFailedInstances, int totalAsk, - int componentId) { + restartPolicy, int nSucceededInstances, int nFailedInstances, + int totalAsk, int componentId) { assert (nSucceededInstances + nFailedInstances) <= totalAsk; @@ -214,7 +213,8 @@ public class TestComponentInstance { return componentInstance; } - @Test public void testComponentRestartPolicy() { + @Test + public void testComponentRestartPolicy() { Map<String, Component> allComponents = new HashMap<>(); Service mockService = mock(Service.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestFilterUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestFilterUtils.java new file mode 100644 index 0000000..065c37a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestFilterUtils.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import com.google.common.collect.Lists; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto; +import org.apache.hadoop.yarn.service.ServiceContext; +import org.apache.hadoop.yarn.service.ServiceTestUtils; +import org.apache.hadoop.yarn.service.TestServiceManager; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.MockRunningServiceContext; +import org.apache.hadoop.yarn.service.api.records.ContainerState; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import java.util.List; + +public class TestFilterUtils { + + @Rule + public ServiceTestUtils.ServiceFSWatcher rule = + new ServiceTestUtils.ServiceFSWatcher(); + + @Test + public void testNoFilter() throws Exception { + GetCompInstancesRequestProto req = GetCompInstancesRequestProto.newBuilder() + .build(); + List<Container> containers = FilterUtils.filterInstances( + new MockRunningServiceContext(rule, + TestServiceManager.createBaseDef("service")), req); + Assert.assertEquals("num containers", 4, containers.size()); + } + + @Test + public void testFilterWithComp() throws Exception { + GetCompInstancesRequestProto req = GetCompInstancesRequestProto.newBuilder() + .addAllComponentNames(Lists.newArrayList("compa")).build(); + List<Container> containers = FilterUtils.filterInstances( + new MockRunningServiceContext(rule, + TestServiceManager.createBaseDef("service")), req); + Assert.assertEquals("num containers", 2, containers.size()); + } + + @Test + public void testFilterWithVersion() throws Exception { + ServiceContext sc = new MockRunningServiceContext(rule, + TestServiceManager.createBaseDef("service")); + GetCompInstancesRequestProto.Builder reqBuilder = + GetCompInstancesRequestProto.newBuilder(); + + reqBuilder.setVersion("v2"); + Assert.assertEquals("num containers", 0, + FilterUtils.filterInstances(sc, reqBuilder.build()).size()); + + reqBuilder.addAllComponentNames(Lists.newArrayList("compa")) + .setVersion("v1").build(); + + Assert.assertEquals("num containers", 2, + FilterUtils.filterInstances(sc, reqBuilder.build()).size()); + + reqBuilder.setVersion("v2").build(); + Assert.assertEquals("num containers", 0, + FilterUtils.filterInstances(sc, reqBuilder.build()).size()); + } + + @Test + public void testFilterWithState() throws Exception { + ServiceContext sc = new MockRunningServiceContext(rule, + TestServiceManager.createBaseDef("service")); + GetCompInstancesRequestProto.Builder reqBuilder = + GetCompInstancesRequestProto.newBuilder(); + + reqBuilder.addAllContainerStates(Lists.newArrayList( + ContainerState.READY.toString())); + Assert.assertEquals("num containers", 4, + FilterUtils.filterInstances(sc, reqBuilder.build()).size()); + + reqBuilder.clearContainerStates(); + reqBuilder.addAllContainerStates(Lists.newArrayList( + ContainerState.STOPPED.toString())); + Assert.assertEquals("num containers", 0, + FilterUtils.filterInstances(sc, reqBuilder.build()).size()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index 1d26a96..14710a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -105,6 +105,8 @@ public class ApplicationCLI extends YarnCLI { public static final String UPGRADE_FINALIZE = "finalize"; public static final String COMPONENT_INSTS = "instances"; public static final String COMPONENTS = "components"; + public static final String VERSION = "version"; + public static final String STATES = "states"; private static String firstArg = null; @@ -294,10 +296,39 @@ public class ApplicationCLI extends YarnCLI { opts.addOption(STATUS_CMD, true, "Prints the status of the container."); opts.addOption(LIST_CMD, true, - "List containers for application attempt."); + "List containers for application attempt when application " + + "attempt ID is provided. When application name is provided, " + + "then it finds the instances of the application based on app's " + + "own implementation, and -appTypes option must be specified " + + "unless it is the default yarn-service type. With app name, it " + + "supports optional use of -version to filter instances based on " + + "app version, -components to filter instances based on component " + + "names, -states to filter instances based on instance state."); opts.addOption(HELP_CMD, false, "Displays help for all commands."); opts.getOption(STATUS_CMD).setArgName("Container ID"); - opts.getOption(LIST_CMD).setArgName("Application Attempt ID"); + opts.getOption(LIST_CMD).setArgName("Application Name or Attempt ID"); + opts.addOption(APP_TYPE_CMD, true, "Works with -list to " + + "specify the app type when application name is provided."); + opts.getOption(APP_TYPE_CMD).setValueSeparator(','); + opts.getOption(APP_TYPE_CMD).setArgs(Option.UNLIMITED_VALUES); + opts.getOption(APP_TYPE_CMD).setArgName("Types"); + + opts.addOption(VERSION, true, "Works with -list " + + "to filter instances based on input application version."); + opts.getOption(VERSION).setArgs(1); + + opts.addOption(COMPONENTS, true, "Works with -list to " + + "filter instances based on input comma-separated list of " + + "component names."); + opts.getOption(COMPONENTS).setValueSeparator(','); + opts.getOption(COMPONENTS).setArgs(Option.UNLIMITED_VALUES); + + opts.addOption(STATES, true, "Works with -list to " + + "filter instances based on input comma-separated list of " + + "instance states."); + opts.getOption(STATES).setValueSeparator(','); + opts.getOption(STATES).setArgs(Option.UNLIMITED_VALUES); + opts.addOption(SIGNAL_CMD, true, "Signal the container. The available signal commands are " + java.util.Arrays.asList(SignalContainerCommand.values()) + @@ -426,11 +457,40 @@ public class ApplicationCLI extends YarnCLI { } listApplicationAttempts(cliParser.getOptionValue(LIST_CMD)); } else if (title.equalsIgnoreCase(CONTAINER)) { - if (hasAnyOtherCLIOptions(cliParser, opts, LIST_CMD)) { + if (hasAnyOtherCLIOptions(cliParser, opts, LIST_CMD, APP_TYPE_CMD, + VERSION, COMPONENTS, STATES)) { printUsage(title, opts); return exitCode; } - listContainers(cliParser.getOptionValue(LIST_CMD)); + String appAttemptIdOrName = cliParser.getOptionValue(LIST_CMD); + try { + // try parsing attempt id, if it succeeds, it means it's appId + ApplicationAttemptId.fromString(appAttemptIdOrName); + listContainers(appAttemptIdOrName); + } catch (IllegalArgumentException e) { + // not appAttemptId format, it could be appName. If app-type is not + // provided, assume it is yarn-service type. + AppAdminClient client = AppAdminClient + .createAppAdminClient(getSingleAppTypeFromCLI(cliParser), + getConf()); + String version = cliParser.getOptionValue(VERSION); + String[] components = cliParser.getOptionValues(COMPONENTS); + String[] instanceStates = cliParser.getOptionValues(STATES); + try { + sysout.println(client.getInstances(appAttemptIdOrName, + components == null ? null : Arrays.asList(components), + version, instanceStates == null ? null : + Arrays.asList(instanceStates))); + return 0; + } catch (ApplicationNotFoundException exception) { + System.err.println("Application with name '" + appAttemptIdOrName + + "' doesn't exist in RM or Timeline Server."); + return -1; + } catch (Exception ex) { + System.err.println(ex.getMessage()); + return -1; + } + } } } else if (cliParser.hasOption(KILL_CMD)) { if (hasAnyOtherCLIOptions(cliParser, opts, KILL_CMD)) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 518cd1c..6b823b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -2280,13 +2280,17 @@ public class TestYarnCLI { ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintWriter pw = new PrintWriter(baos); pw.println("usage: container"); + pw.println(" -appTypes <Types> Works with -list to specify the app type when application name is provided."); + pw.println(" -components <arg> Works with -list to filter instances based on input comma-separated list of component names."); pw.println(" -help Displays help for all commands."); - pw.println(" -list <Application Attempt ID> List containers for application attempt."); + pw.println(" -list <Application Name or Attempt ID> List containers for application attempt when application attempt ID is provided. When application name is provided, then it finds the instances of the application based on app's own implementation, and -appTypes option must be specified unless it is the default yarn-service type. With app name, it supports optional use of -version to filter instances based on app version, -components to filter instances based on component names, -states to filter instances based on instance state."); pw.println(" -signal <container ID [signal command]> Signal the container."); pw.println("The available signal commands are "); pw.println(java.util.Arrays.asList(SignalContainerCommand.values())); pw.println(" Default command is OUTPUT_THREAD_DUMP."); + pw.println(" -states <arg> Works with -list to filter instances based on input comma-separated list of instance states."); pw.println(" -status <Container ID> Prints the status of the container."); + pw.println(" -version <arg> Works with -list to filter instances based on input application version. "); pw.close(); try { return normalize(baos.toString("UTF-8")); http://git-wip-us.apache.org/repos/asf/hadoop/blob/121865c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java index 3cd1a78..3fb4778 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java @@ -282,4 +282,10 @@ public abstract class AppAdminClient extends CompositeService { public abstract int actionCleanUp(String appName, String userName) throws IOException, YarnException; + @Public + @Unstable + public abstract String getInstances(String appName, + List<String> components, String version, List<String> containerStates) + throws IOException, YarnException; + } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org