This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 2c2e0dd Fixed HTTP redirects with proxy handler (#2670) 2c2e0dd is described below commit 2c2e0ddaad1db5a928df45ed2f44a4e40103638a Author: Matteo Merli <mme...@apache.org> AuthorDate: Thu Sep 27 23:14:26 2018 -0400 Fixed HTTP redirects with proxy handler (#2670) --- .../pulsar/proxy/server/AdminProxyHandler.java | 69 +++++++++++++++++- .../pulsar/tests/integration/proxy/TestProxy.java | 83 ++++++++++++++++++++++ 2 files changed, 149 insertions(+), 3 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java index d6c32df..461c544 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java @@ -24,8 +24,10 @@ import java.io.IOException; import java.net.URI; import java.security.cert.X509Certificate; import java.util.Objects; +import java.util.concurrent.Executor; import javax.net.ssl.SSLContext; +import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; @@ -37,9 +39,13 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.proxy.AsyncProxyServlet; +import org.eclipse.jetty.client.ProtocolHandlers; +import org.eclipse.jetty.client.RedirectProtocolHandler; import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.proxy.AsyncProxyServlet; +import org.eclipse.jetty.util.HttpCookieStore; import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,9 +68,66 @@ class AdminProxyHandler extends AsyncProxyServlet { @Override protected HttpClient createHttpClient() throws ServletException { - HttpClient client = super.createHttpClient(); + ServletConfig config = getServletConfig(); + + HttpClient client = newHttpClient(); + client.setFollowRedirects(true); - return client; + + // Must not store cookies, otherwise cookies of different clients will mix. + client.setCookieStore(new HttpCookieStore.Empty()); + + Executor executor; + String value = config.getInitParameter("maxThreads"); + if (value == null || "-".equals(value)) { + executor = (Executor) getServletContext().getAttribute("org.eclipse.jetty.server.Executor"); + if (executor == null) + throw new IllegalStateException("No server executor for proxy"); + } else { + QueuedThreadPool qtp = new QueuedThreadPool(Integer.parseInt(value)); + String servletName = config.getServletName(); + int dot = servletName.lastIndexOf('.'); + if (dot >= 0) + servletName = servletName.substring(dot + 1); + qtp.setName(servletName); + executor = qtp; + } + + client.setExecutor(executor); + + value = config.getInitParameter("maxConnections"); + if (value == null) + value = "256"; + client.setMaxConnectionsPerDestination(Integer.parseInt(value)); + + value = config.getInitParameter("idleTimeout"); + if (value == null) + value = "30000"; + client.setIdleTimeout(Long.parseLong(value)); + + value = config.getInitParameter("requestBufferSize"); + if (value != null) + client.setRequestBufferSize(Integer.parseInt(value)); + + value = config.getInitParameter("responseBufferSize"); + if (value != null) + client.setResponseBufferSize(Integer.parseInt(value)); + + try { + client.start(); + + // Content must not be decoded, otherwise the client gets confused. + client.getContentDecoderFactories().clear(); + + // Pass traffic to the client, only intercept what's necessary. + ProtocolHandlers protocolHandlers = client.getProtocolHandlers(); + protocolHandlers.clear(); + protocolHandlers.put(new RedirectProtocolHandler(client)); + + return client; + } catch (Exception x) { + throw new ServletException(x); + } } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxy.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxy.java new file mode 100644 index 0000000..aa2a5f6 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/proxy/TestProxy.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.proxy; + +import static org.testng.Assert.assertEquals; + +import java.util.Collections; + +import lombok.Cleanup; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.tests.integration.suites.PulsarTestSuite; +import org.testng.annotations.Test; + +/** + * Test cases for proxy. + */ +public class TestProxy extends PulsarTestSuite { + + @Test + public void testProxy() throws Exception { + + final String tenant = "compaction-test-cli-" + randomName(4); + final String namespace = tenant + "/ns1"; + final String topic = "persistent://" + namespace + "/topic1"; + + @Cleanup + PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(pulsarCluster.getHttpServiceUrl()) + .build(); + + admin.tenants().createTenant(tenant, + new TenantInfo(Collections.emptySet(), Collections.singleton(pulsarCluster.getClusterName()))); + + admin.namespaces().createNamespace(namespace, Collections.singleton(pulsarCluster.getClusterName())); + + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build(); + + client.newConsumer() + .topic(topic) + .subscriptionName("sub1") + .subscribe() + .close(); + + @Cleanup + Producer<String> producer = client.newProducer(Schema.STRING) + .topic(topic) + .create(); + producer.send("content-0"); + producer.send("content-1"); + + for (int i = 0; i < 10; i++) { + // Ensure we can get the stats for the topic irrespective of which broker the proxy decides to connect to + TopicStats stats = admin.topics().getStats(topic); + assertEquals(stats.publishers.size(), 1); + } + } + +}