This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 136111314de YARN-11154. Make router support proxy server. (#5946)
136111314de is described below

commit 136111314deb0b14d77f25ca4ece62191aac8c89
Author: zhengchenyu <zhengcheny...@gmail.com>
AuthorDate: Sat Aug 19 10:29:26 2023 +0800

    YARN-11154. Make router support proxy server. (#5946)
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |   3 +
 .../src/main/resources/yarn-default.xml            |   8 +
 .../apache/hadoop/yarn/server/router/Router.java   |  31 +++
 .../yarn/server/router/RouterServerUtil.java       |   5 +
 .../router/clientrm/RouterClientRMService.java     |  43 ++-
 .../router/webapp/TestRouterWebAppProxy.java       | 298 +++++++++++++++++++++
 6 files changed, 387 insertions(+), 1 deletion(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a753cda7908..e252590ea31 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4401,6 +4401,9 @@ public class YarnConfiguration extends Configuration {
   public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
       false;
 
+  public static final String ROUTER_WEBAPP_PROXY_ENABLE = ROUTER_WEBAPP_PREFIX 
+ "proxy.enable";
+  public static final boolean DEFAULT_ROUTER_WEBAPP_PROXY_ENABLE = true;
+
   private static final String FEDERATION_GPG_PREFIX = FEDERATION_PREFIX + 
"gpg.";
 
   // The number of threads to use for the GPG scheduled executor service
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index a201f4b2345..408295b8f48 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5264,6 +5264,14 @@
     <value>false</value>
   </property>
 
+  <property>
+    <description>
+      Whether to enable proxy service in router. Default is true.
+    </description>
+    <name>yarn.router.webapp.proxy.enable</name>
+    <value>true</value>
+  </property>
+
   <property>
     <description>
       The number of threads to use for the GPG scheduled executor service.
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
index 77abf18bd5f..601e4595558 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.router;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -44,6 +45,10 @@ import 
org.apache.hadoop.yarn.server.router.cleaner.SubClusterCleaner;
 import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
 import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService;
 import org.apache.hadoop.yarn.server.router.webapp.RouterWebApp;
+import org.apache.hadoop.yarn.server.webproxy.FedAppReportFetcher;
+import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
+import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
+import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
 import org.apache.hadoop.yarn.webapp.WebApps.Builder;
@@ -91,6 +96,7 @@ public class Router extends CompositeService {
   @VisibleForTesting
   protected String webAppAddress;
   private static long clusterTimeStamp = System.currentTimeMillis();
+  private FedAppReportFetcher fetcher = null;
 
   /**
    * Priority of the Router shutdown hook.
@@ -209,9 +215,29 @@ public class Router extends CompositeService {
 
     Builder<Object> builder =
         WebApps.$for("cluster", null, null, "ws").with(conf).at(webAppAddress);
+    if (RouterServerUtil.isRouterWebProxyEnable(conf)) {
+      fetcher = new FedAppReportFetcher(conf);
+      builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, 
ProxyUriUtils.PROXY_PATH_SPEC,
+          WebAppProxyServlet.class);
+      builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher);
+      String proxyHostAndPort = getProxyHostAndPort(conf);
+      String[] proxyParts = proxyHostAndPort.split(":");
+      builder.withAttribute(WebAppProxy.PROXY_HOST_ATTRIBUTE, proxyParts[0]);
+    }
     webApp = builder.start(new RouterWebApp(this));
   }
 
+  public static String getProxyHostAndPort(Configuration conf) {
+    String addr = conf.get(YarnConfiguration.PROXY_ADDRESS);
+    if(addr == null || addr.isEmpty()) {
+      InetSocketAddress address = 
conf.getSocketAddr(YarnConfiguration.ROUTER_WEBAPP_ADDRESS,
+          YarnConfiguration.DEFAULT_ROUTER_WEBAPP_ADDRESS,
+          YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PORT);
+      addr = WebAppUtils.getResolvedAddress(address);
+    }
+    return addr;
+  }
+
   public static void main(String[] argv) {
     Configuration conf = new YarnConfiguration();
     Thread
@@ -267,4 +293,9 @@ public class Router extends CompositeService {
   public static long getClusterTimeStamp() {
     return clusterTimeStamp;
   }
+
+  @VisibleForTesting
+  public FedAppReportFetcher getFetcher() {
+    return fetcher;
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
index dff135bdb05..94dbcdce1ae 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
@@ -801,4 +801,9 @@ public final class RouterServerUtil {
 
     return trimmedContext;
   }
+
+  public static boolean isRouterWebProxyEnable(Configuration conf) {
+    return conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PROXY_ENABLE,
+        YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PROXY_ENABLE);
+  }
 }
\ No newline at end of file
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
index e3e84079b71..dd87bee0d31 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
@@ -19,11 +19,14 @@
 package org.apache.hadoop.yarn.server.router.clientrm;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.URL;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -104,11 +107,13 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsReque
 import 
org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.router.RouterServerUtil;
 import 
org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
 import org.apache.hadoop.yarn.util.LRUCacheHashMap;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -138,6 +143,7 @@ public class RouterClientRMService extends AbstractService
   // and remove the oldest used ones.
   private Map<String, RequestInterceptorChainWrapper> userPipelineMap;
 
+  private URL redirectURL;
   private RouterDelegationTokenSecretManager routerDTSecretManager;
 
   public RouterClientRMService() {
@@ -157,6 +163,10 @@ public class RouterClientRMService extends AbstractService
             YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS,
             YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT);
 
+    if (RouterServerUtil.isRouterWebProxyEnable(conf)) {
+      redirectURL = getRedirectURL();
+    }
+
     int maxCacheSize =
         conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
             YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
@@ -318,7 +328,22 @@ public class RouterClientRMService extends AbstractService
   public GetApplicationReportResponse getApplicationReport(
       GetApplicationReportRequest request) throws YarnException, IOException {
     RequestInterceptorChainWrapper pipeline = getInterceptorChain();
-    return pipeline.getRootInterceptor().getApplicationReport(request);
+    GetApplicationReportResponse response = pipeline.getRootInterceptor()
+        .getApplicationReport(request);
+    if (RouterServerUtil.isRouterWebProxyEnable(getConfig())) {
+      // After redirect url, tracking url in application report will
+      // redirect to embeded proxy server of router
+      URL url = new URL(response.getApplicationReport().getTrackingUrl());
+      String redirectUrl = new URL(redirectURL.getProtocol(),
+          redirectURL.getHost(), redirectURL.getPort(), url.getFile())
+          .toString();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("The tracking url of application {} is redirect from {} to 
{}",
+            response.getApplicationReport().getApplicationId(), url, 
redirectUrl);
+      }
+      response.getApplicationReport().setTrackingUrl(redirectUrl);
+    }
+    return response;
   }
 
   @Override
@@ -623,4 +648,20 @@ public class RouterClientRMService extends AbstractService
         YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
     this.userPipelineMap = Collections.synchronizedMap(new 
LRUCacheHashMap<>(maxCacheSize, true));
   }
+
+  private URL getRedirectURL() throws Exception {
+    Configuration conf = getConfig();
+    String webAppAddress = WebAppUtils.getWebAppBindURL(conf, 
YarnConfiguration.ROUTER_BIND_HOST,
+        WebAppUtils.getRouterWebAppURLWithoutScheme(conf));
+    String[] hostPort = StringUtils.split(webAppAddress, ':');
+    if (hostPort.length != 2) {
+      throw new YarnRuntimeException("Router can't get valid redirect proxy 
url");
+    }
+    String host = hostPort[0];
+    int port = Integer.parseInt(hostPort[1]);
+    if (StringUtils.isBlank(host) || host.equals("0.0.0.0")) {
+      host = InetAddress.getLocalHost().getCanonicalHostName();
+    }
+    return new URL(YarnConfiguration.useHttps(this.getConfig()) ? "https" : 
"http", host, port, "");
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebAppProxy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebAppProxy.java
new file mode 100644
index 00000000000..244ed132410
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebAppProxy.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.webapp;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.router.Router;
+import 
org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor;
+import 
org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService.RequestInterceptorChainWrapper;
+import org.apache.hadoop.yarn.server.webproxy.FedAppReportFetcher;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class TestRouterWebAppProxy {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestRouterWebAppProxy.class);
+
+  public static final String AM_PREFIX = "AM";
+  public static final String RM_PREFIX = "RM";
+  public static final String AHS_PREFIX = "AHS";
+
+  /*
+   * Mocked Server is used for simulating the web of AppMaster, 
ResourceMamanger or TimelineServer.
+   * */
+  private static Server mockServer;
+  private static int mockServerPort = 0;
+
+  /**
+   * Simple http server. Server should send answer with status 200
+   */
+  @BeforeClass
+  public static void setUp() throws Exception {
+    mockServer = new Server(0);
+    ((QueuedThreadPool) mockServer.getThreadPool()).setMaxThreads(20);
+    ServletContextHandler context = new ServletContextHandler();
+    context.setContextPath("/");
+    context.addServlet(new ServletHolder(new MockWebServlet(AM_PREFIX)), 
"/amweb/*");
+    context.addServlet(new ServletHolder(new MockWebServlet(RM_PREFIX)), 
"/cluster/app/*");
+    context.addServlet(new ServletHolder(new MockWebServlet(AHS_PREFIX)),
+        "/applicationhistory/app/*");
+    mockServer.setHandler(context);
+    ((ServerConnector) mockServer.getConnectors()[0]).setHost("localhost");
+    mockServer.start();
+    mockServerPort = ((ServerConnector) 
mockServer.getConnectors()[0]).getLocalPort();
+    LOG.info("Running embedded servlet container at: http://localhost:"; + 
mockServerPort);
+  }
+
+  @Test(timeout=10000)
+  public void testRouterWebAppProxyFed() throws Exception {
+
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.ROUTER_WEBAPP_ADDRESS, "localhost:9090");
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true);
+    conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "localhost:" + 
mockServerPort);
+    // overriding num of web server threads, see HttpServer.HTTP_MAXTHREADS
+    conf.setInt("hadoop.http.max.threads", 10);
+
+    // Create sub cluster information.
+    SubClusterId subClusterId1 = SubClusterId.newInstance("scid1");
+    SubClusterId subClusterId2 = SubClusterId.newInstance("scid2");
+    SubClusterInfo subClusterInfo1 = SubClusterInfo.newInstance(subClusterId1, 
"10.0.0.1:1",
+        "10.0.0.1:1", "10.0.0.1:1", "localhost:" + mockServerPort, 
SubClusterState.SC_RUNNING, 0,
+        "");
+    SubClusterInfo subClusterInfo2 = SubClusterInfo.newInstance(subClusterId2, 
"10.0.0.2:1",
+        "10.0.0.2:1", "10.0.0.2:1", "10.0.0.2:1", SubClusterState.SC_RUNNING, 
0, "");
+
+    // App1 and App2 is running applications.
+    ApplicationId appId1 = ApplicationId.newInstance(0, 1);
+    ApplicationId appId2 = ApplicationId.newInstance(0, 2);
+    String appUrl1 = "http://localhost:"; + mockServerPort + "/amweb/" + appId1;
+    String proxyAppUrl1 = "http://localhost:"; + mockServerPort + "/proxy/" + 
appId1;
+    String appUrl2 = "http://localhost:"; + mockServerPort + "/amweb/" + appId2;
+    String proxyAppUrl2 = "http://localhost:"; + mockServerPort + "/proxy/" + 
appId2;
+    // App3 is accepted application, has not registered original url to am.
+    ApplicationId appId3 = ApplicationId.newInstance(0, 3);
+    String proxyAppUrl3 = "http://localhost:"; + mockServerPort + "/proxy/" + 
appId3;
+    // App4 is finished application, has remove from rm, but not remove from 
timeline server.
+    ApplicationId appId4 = ApplicationId.newInstance(0, 4);
+    String proxyAppUrl4 = "http://localhost:"; + mockServerPort + "/proxy/" + 
appId4;
+
+    // Mock for application
+    ApplicationClientProtocol appManager1 = 
mock(ApplicationClientProtocol.class);
+    
Mockito.when(appManager1.getApplicationReport(GetApplicationReportRequest.newInstance(appId1)))
+        .thenReturn(GetApplicationReportResponse.newInstance(
+            newApplicationReport(appId1, YarnApplicationState.RUNNING, 
proxyAppUrl1, appUrl1)));
+    
Mockito.when(appManager1.getApplicationReport(GetApplicationReportRequest.newInstance(appId3)))
+        .thenReturn(GetApplicationReportResponse.newInstance(
+            newApplicationReport(appId3, YarnApplicationState.ACCEPTED, 
proxyAppUrl2, null)));
+
+    ApplicationClientProtocol appManager2 = 
mock(ApplicationClientProtocol.class);
+    
Mockito.when(appManager2.getApplicationReport(GetApplicationReportRequest.newInstance(appId2)))
+        .thenReturn(GetApplicationReportResponse.newInstance(
+            newApplicationReport(appId2, YarnApplicationState.RUNNING, 
proxyAppUrl3, appUrl2)));
+    
Mockito.when(appManager2.getApplicationReport(GetApplicationReportRequest.newInstance(appId4)))
+        .thenThrow(new ApplicationNotFoundException("APP NOT FOUND"));
+
+    ApplicationHistoryProtocol historyManager = 
mock(ApplicationHistoryProtocol.class);
+    Mockito.when(
+            
historyManager.getApplicationReport(GetApplicationReportRequest.newInstance(appId4)))
+        .thenReturn(GetApplicationReportResponse.newInstance(
+            newApplicationReport(appId4, YarnApplicationState.FINISHED, 
proxyAppUrl4, null)));
+
+    // Initial federation store.
+    FederationStateStoreFacade facade = 
FederationStateStoreFacade.getInstance();
+    facade.getStateStore()
+        
.registerSubCluster(SubClusterRegisterRequest.newInstance(subClusterInfo1));
+    facade.getStateStore()
+        
.registerSubCluster(SubClusterRegisterRequest.newInstance(subClusterInfo2));
+    facade.addApplicationHomeSubCluster(
+        ApplicationHomeSubCluster.newInstance(appId1, subClusterId1));
+    facade.addApplicationHomeSubCluster(
+        ApplicationHomeSubCluster.newInstance(appId2, subClusterId2));
+    facade.addApplicationHomeSubCluster(
+        ApplicationHomeSubCluster.newInstance(appId3, subClusterId1));
+    facade.addApplicationHomeSubCluster(
+        ApplicationHomeSubCluster.newInstance(appId4, subClusterId2));
+
+    // Start router for test
+    Router router = new Router();
+    router.init(conf);
+    router.start();
+    String user = UserGroupInformation.getCurrentUser().getUserName();
+    RequestInterceptorChainWrapper wrapper = 
mock(RequestInterceptorChainWrapper.class);
+    FederationClientInterceptor interceptor = 
mock(FederationClientInterceptor.class);
+    
Mockito.when(interceptor.getApplicationReport(GetApplicationReportRequest.newInstance(appId1)))
+        .thenReturn(GetApplicationReportResponse.newInstance(
+            newApplicationReport(appId1, YarnApplicationState.RUNNING, 
proxyAppUrl1, appUrl1)));
+    
Mockito.when(interceptor.getApplicationReport(GetApplicationReportRequest.newInstance(appId2)))
+        .thenReturn(GetApplicationReportResponse.newInstance(
+            newApplicationReport(appId2, YarnApplicationState.RUNNING, 
proxyAppUrl2, appUrl2)));
+    
Mockito.when(interceptor.getApplicationReport(GetApplicationReportRequest.newInstance(appId3)))
+        .thenReturn(GetApplicationReportResponse.newInstance(
+            newApplicationReport(appId3, YarnApplicationState.ACCEPTED, 
proxyAppUrl3, null)));
+    
Mockito.when(interceptor.getApplicationReport(GetApplicationReportRequest.newInstance(appId4)))
+        .thenReturn(GetApplicationReportResponse.newInstance(
+            newApplicationReport(appId4, YarnApplicationState.FINISHED, 
proxyAppUrl4, null)));
+    Mockito.when(wrapper.getRootInterceptor()).thenReturn(interceptor);
+    router.getClientRMProxyService().getUserPipelineMap().put(user, wrapper);
+    try {
+      // set Mocked rm and timeline
+      FedAppReportFetcher appReportFetcher = router.getFetcher();
+      appReportFetcher.registerSubCluster(subClusterInfo1, appManager1);
+      appReportFetcher.registerSubCluster(subClusterInfo2, appManager2);
+      appReportFetcher.setHistoryManager(historyManager);
+
+      // App1 is running in subcluster1, and original url is registered in rm 
of subCluster1.
+      // So router will get original url from rm by getApplicationReport. Then 
router
+      // will fetch the webapp directly.
+      GetApplicationReportResponse response = router.getClientRMProxyService()
+          
.getApplicationReport(GetApplicationReportRequest.newInstance(appId1));
+      URL url = new URL(response.getApplicationReport().getTrackingUrl());
+      HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+      conn.connect();
+      assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+      assertEquals(AM_PREFIX + "/" + appId1, readResponse(conn));
+      conn.disconnect();
+
+      // App2 is running in subcluster2, and original url is registered
+      // in rm of subCluster2. So router will get original url from rm by
+      // getApplicationReport. Then router will fetch the webapp directly.
+      response = router.getClientRMProxyService()
+          
.getApplicationReport(GetApplicationReportRequest.newInstance(appId2));
+      url = new URL(response.getApplicationReport().getTrackingUrl());
+      conn = (HttpURLConnection) url.openConnection();
+      conn.connect();
+      assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+      assertEquals(AM_PREFIX + "/" + appId2, readResponse(conn));
+      conn.disconnect();
+
+      // App3 is accepted in subcluster1, and original url is not registered
+      // yet. So router will fetch the application web from rm.
+      response = router.getClientRMProxyService()
+          
.getApplicationReport(GetApplicationReportRequest.newInstance(appId3));
+      url = new URL(response.getApplicationReport().getTrackingUrl());
+      conn = (HttpURLConnection) url.openConnection();
+      conn.connect();
+      assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+      assertEquals(RM_PREFIX + "/" + appId3, readResponse(conn));
+      conn.disconnect();
+
+      // App4 is finished in subcluster2, and have removed from rm, but not
+      // removed from timeline server. So rouer will fetch the
+      // application web from timeline server.
+      response = router.getClientRMProxyService()
+          
.getApplicationReport(GetApplicationReportRequest.newInstance(appId4));
+      url = new URL(response.getApplicationReport().getTrackingUrl());
+      conn = (HttpURLConnection) url.openConnection();
+      conn.connect();
+      assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+      assertEquals(AHS_PREFIX + "/" + appId4, readResponse(conn));
+      conn.disconnect();
+    } finally {
+      router.close();
+    }
+  }
+
+  private ApplicationReport newApplicationReport(ApplicationId appId, 
YarnApplicationState state,
+                                                 String trackingUrl, String 
origTrackingUrl) {
+    return ApplicationReport.newInstance(appId, null, "testuser", null, null, 
null, 0, null, state,
+        null, trackingUrl, 0, 0, 0, null, null, origTrackingUrl, 0f, null, 
null);
+  }
+
+  private String readResponse(HttpURLConnection conn) throws IOException {
+    InputStream input = conn.getInputStream();
+    byte[] bytes = new byte[input.available()];
+    input.read(bytes);
+    return new String(bytes);
+  }
+
+  /*
+   * This servlet is used for simulate the web of AppMaster, ResourceManager,
+   * TimelineServer and so on.
+   * */
+  public static class MockWebServlet extends HttpServlet {
+
+    private String role;
+
+    public MockWebServlet(String role) {
+      this.role = role;
+    }
+
+    @Override
+    protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+        throws IOException {
+      if (req.getPathInfo() != null) {
+        resp.getWriter().write(role + req.getPathInfo());
+      }
+      resp.setStatus(HttpServletResponse.SC_OK);
+    }
+
+    @Override
+    protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+        throws IOException {
+      InputStream is = req.getInputStream();
+      OutputStream os = resp.getOutputStream();
+      int c = is.read();
+      while (c > -1) {
+        os.write(c);
+        c = is.read();
+      }
+      is.close();
+      os.close();
+      resp.setStatus(HttpServletResponse.SC_OK);
+    }
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to