This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3c89abbdb1af5f3b144a7620187d67732359a427
Author: Addison Higham <[email protected]>
AuthorDate: Fri Mar 6 16:06:11 2020 -0700

    [proxy] Fix proxy routing to functions worker (#6486)
    
    ### Motivation
    
    Currently, the proxy only works to proxy v1/v2 functions routes to the
    function worker.
    
    ### Modifications
    
    This changes this code to proxy all routes for the function worker when
    those routes match. At the moment this is still a static list of
    prefixes, but in the future it may be possible to have this list of
    prefixes be dynamically fetched from the REST routes.
    
    ### Verifying this change
    - added some tests to ensure the routing works as expected
    (cherry picked from commit 329e2310069b61e25ce3f87f2828fab78f97187a)
---
 .../pulsar/proxy/server/AdminProxyHandler.java     | 26 ++++++++-
 .../proxy/server/FunctionWorkerRoutingTest.java    | 66 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index ca44c8f..56a933b 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -26,9 +26,12 @@ import java.io.InputStream;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.security.cert.X509Certificate;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.Executor;
 
 import javax.net.ssl.SSLContext;
@@ -60,6 +63,21 @@ import org.slf4j.LoggerFactory;
 
 class AdminProxyHandler extends ProxyServlet {
     private static final Logger LOG = 
LoggerFactory.getLogger(AdminProxyHandler.class);
+    private static final Set<String> functionRoutes = new 
HashSet<>(Arrays.asList(
+        "/admin/v3/function",
+        "/admin/v2/function",
+        "/admin/function",
+        "/admin/v3/source",
+        "/admin/v2/source",
+        "/admin/source",
+        "/admin/v3/sink",
+        "/admin/v2/sink",
+        "/admin/sink",
+        "/admin/v2/worker",
+        "/admin/v2/worker-stats",
+        "/admin/worker",
+        "/admin/worker-stats"
+    ));
 
     private final ProxyConfiguration config;
     private final BrokerDiscoveryProvider discoveryProvider;
@@ -260,9 +278,11 @@ class AdminProxyHandler extends ProxyServlet {
 
         boolean isFunctionsRestRequest = false;
         String requestUri = request.getRequestURI();
-        if (requestUri.startsWith("/admin/v2/functions")
-            || requestUri.startsWith("/admin/functions")) {
-            isFunctionsRestRequest = true;
+        for (String routePrefix: functionRoutes) {
+            if (requestUri.startsWith(routePrefix)) {
+                isFunctionsRestRequest = true;
+                break;
+            }
         }
 
         if (isFunctionsRestRequest && !isBlank(functionWorkerWebServiceUrl)) {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
new file mode 100644
index 0000000..b5d89cc
--- /dev/null
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import javax.servlet.http.HttpServletRequest;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class FunctionWorkerRoutingTest {
+
+    @Test
+    public void testFunctionWorkerRedirect() throws Exception {
+        String functionWorkerUrl = "http://function";;
+        String brokerUrl = "http://broker";;
+
+        ProxyConfiguration proxyConfig = new ProxyConfiguration();
+        proxyConfig.setBrokerWebServiceURL(brokerUrl);
+        proxyConfig.setFunctionWorkerWebServiceURL(functionWorkerUrl);
+
+        BrokerDiscoveryProvider discoveryProvider = 
mock(BrokerDiscoveryProvider.class);
+        AdminProxyHandler handler = new AdminProxyHandler(proxyConfig, 
discoveryProvider);
+
+        String funcUrl = 
handler.rewriteTarget(buildRequest("/admin/v3/functions/test/test"));
+        Assert.assertEquals(funcUrl, 
String.format("%s/admin/v3/functions/%s/%s",
+                functionWorkerUrl, "test", "test"));
+
+        String sourceUrl = 
handler.rewriteTarget(buildRequest("/admin/v3/sources/test/test"));
+        Assert.assertEquals(sourceUrl, 
String.format("%s/admin/v3/sources/%s/%s",
+                functionWorkerUrl, "test", "test"));
+
+        String sinkUrl = 
handler.rewriteTarget(buildRequest("/admin/v3/sinks/test/test"));
+        Assert.assertEquals(sinkUrl, String.format("%s/admin/v3/sinks/%s/%s",
+                functionWorkerUrl, "test", "test"));
+
+        String tenantUrl = 
handler.rewriteTarget(buildRequest("/admin/v2/tenants/test"));
+        Assert.assertEquals(tenantUrl, String.format("%s/admin/v2/tenants/%s",
+                brokerUrl, "test"));
+    }
+
+    static HttpServletRequest buildRequest(String url) {
+        HttpServletRequest mockReq = mock(HttpServletRequest.class);
+        when(mockReq.getRequestURI()).thenReturn(url);
+        return mockReq;
+    }
+
+}

Reply via email to