This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new adbf818 Change HTTP status code which WebSocket proxy returns to producer whe… (#1242) adbf818 is described below commit adbf818626988f4cd3cf008462d173b440d4c1ba Author: massakam <massa...@yahoo-corp.jp> AuthorDate: Sat Feb 24 07:54:07 2018 +0900 Change HTTP status code which WebSocket proxy returns to producer whe… (#1242) * Change HTTP status code which WebSocket proxy returns to producer when backlog exceeds threshold * Fix WebSocket proxy to return exception message to client * Add some tests for WebSocket --- .../websocket/proxy/ProxyPublishConsumeTest.java | 206 +++++++++++++++++---- .../pulsar/websocket/AbstractWebSocketHandler.java | 5 +- .../apache/pulsar/websocket/ConsumerHandler.java | 39 ++-- .../apache/pulsar/websocket/ProducerHandler.java | 6 +- .../org/apache/pulsar/websocket/ReaderHandler.java | 5 +- 5 files changed, 207 insertions(+), 54 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index c25c947..3c40173 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import javax.servlet.http.HttpServletResponse; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Invocation; @@ -39,6 +40,7 @@ import javax.ws.rs.core.Response; import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.websocket.WebSocketService; import org.apache.pulsar.websocket.service.ProxyServer; @@ -70,8 +72,12 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { private ProxyServer proxyServer; private WebSocketService service; + private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5; + @BeforeMethod public void setup() throws Exception { + conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); + super.internalSetup(); super.producerBaseSetup(); @@ -89,6 +95,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { @AfterMethod protected void cleanup() throws Exception { + super.resetConfig(); super.internalCleanup(); service.close(); proxyServer.stop(); @@ -97,7 +104,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { @Test(timeOut = 10000) public void socketTest() throws Exception { - String consumerUri = "ws://localhost:" + port + final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/my-property/use/my-ns/my-topic1/my-sub1?subscriptionType=Failover"; String readerUri = "ws://localhost:" + port + "/ws/reader/persistent/my-property/use/my-ns/my-topic1"; String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/my-property/use/my-ns/my-topic1/"; @@ -167,32 +174,16 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { } Assert.assertEquals(produceSocket.getBuffer(), readSocket.getBuffer()); } finally { - ExecutorService executor = newFixedThreadPool(1); - try { - executor.submit(() -> { - try { - consumeClient1.stop(); - consumeClient2.stop(); - readClient.stop(); - produceClient.stop(); - log.info("proxy clients are stopped successfully"); - } catch (Exception e) { - log.error(e.getMessage()); - } - }).get(2, TimeUnit.SECONDS); - } catch (Exception e) { - log.error("failed to close clients ", e); - } - executor.shutdownNow(); + stopWebSocketClient(consumeClient1, consumeClient2, readClient, produceClient); } } @Test(timeOut = 10000) - public void badConsumerTest() throws Exception { + public void emptySubcriptionConsumerTest() throws Exception { // Empty subcription name - String consumerUri = "ws://localhost:" + port - + "/ws/consumer/persistent/my-property/use/my-ns/my-topic1/?subscriptionType=Exclusive"; + final String consumerUri = "ws://localhost:" + port + + "/ws/consumer/persistent/my-property/use/my-ns/my-topic2/?subscriptionType=Exclusive"; URI consumeUri = URI.create(consumerUri); WebSocketClient consumeClient1 = new WebSocketClient(); @@ -207,21 +198,148 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { } catch (Exception e) { // Expected Assert.assertTrue(e.getCause() instanceof UpgradeException); + Assert.assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), + HttpServletResponse.SC_BAD_REQUEST); } finally { - ExecutorService executor = newFixedThreadPool(1); + stopWebSocketClient(consumeClient1); + } + } + + @Test(timeOut = 10000) + public void conflictingConsumerTest() throws Exception { + final String consumerUri = "ws://localhost:" + port + + "/ws/consumer/persistent/my-property/use/my-ns/my-topic3/sub1?subscriptionType=Exclusive"; + URI consumeUri = URI.create(consumerUri); + + WebSocketClient consumeClient1 = new WebSocketClient(); + WebSocketClient consumeClient2 = new WebSocketClient(); + SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket(); + SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket(); + + try { + consumeClient1.start(); + ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest(); + Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1); + consumerFuture1.get(); + try { - executor.submit(() -> { - try { - consumeClient1.stop(); - log.info("proxy clients are stopped successfully"); - } catch (Exception e) { - log.error(e.getMessage()); - } - }).get(2, TimeUnit.SECONDS); + consumeClient2.start(); + ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest(); + Future<Session> consumerFuture2 = consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2); + consumerFuture2.get(); + Assert.fail("should fail: conflicting subscription name"); } catch (Exception e) { - log.error("failed to close clients ", e); + // Expected + Assert.assertTrue(e.getCause() instanceof UpgradeException); + Assert.assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), + HttpServletResponse.SC_CONFLICT); + } finally { + stopWebSocketClient(consumeClient2); } - executor.shutdownNow(); + } finally { + stopWebSocketClient(consumeClient1); + } + } + + @Test(timeOut = 10000) + public void conflictingProducerTest() throws Exception { + final String producerUri = "ws://localhost:" + port + + "/ws/producer/persistent/my-property/use/my-ns/my-topic4?producerName=my-producer"; + URI produceUri = URI.create(producerUri); + + WebSocketClient produceClient1 = new WebSocketClient(); + WebSocketClient produceClient2 = new WebSocketClient(); + SimpleProducerSocket produceSocket1 = new SimpleProducerSocket(); + SimpleProducerSocket produceSocket2 = new SimpleProducerSocket(); + + try { + produceClient1.start(); + ClientUpgradeRequest produceRequest1 = new ClientUpgradeRequest(); + Future<Session> producerFuture1 = produceClient1.connect(produceSocket1, produceUri, produceRequest1); + producerFuture1.get(); + + try { + produceClient2.start(); + ClientUpgradeRequest produceRequest2 = new ClientUpgradeRequest(); + Future<Session> producerFuture2 = produceClient2.connect(produceSocket2, produceUri, produceRequest2); + producerFuture2.get(); + Assert.fail("should fail: conflicting producer name"); + } catch (Exception e) { + // Expected + Assert.assertTrue(e.getCause() instanceof UpgradeException); + Assert.assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), + HttpServletResponse.SC_CONFLICT); + } finally { + stopWebSocketClient(produceClient2); + } + } finally { + stopWebSocketClient(produceClient1); + } + } + + @Test(timeOut = 30000) + public void producerBacklogQuotaExceededTest() throws Exception { + admin.namespaces().createNamespace("my-property/use/ns-ws-quota"); + admin.namespaces().setBacklogQuota("my-property/use/ns-ws-quota", + new BacklogQuota(10, BacklogQuota.RetentionPolicy.producer_request_hold)); + + final String topic = "my-property/use/ns-ws-quota/my-topic5"; + final String subscription = "my-sub"; + final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/" + topic + "/" + subscription; + final String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/" + topic; + + URI consumeUri = URI.create(consumerUri); + URI produceUri = URI.create(producerUri); + + WebSocketClient consumeClient = new WebSocketClient(); + WebSocketClient produceClient1 = new WebSocketClient(); + WebSocketClient produceClient2 = new WebSocketClient(); + + SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); + SimpleProducerSocket produceSocket1 = new SimpleProducerSocket(); + SimpleProducerSocket produceSocket2 = new SimpleProducerSocket(); + + // Create subscription + try { + consumeClient.start(); + ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest(); + Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest); + consumerFuture.get(); + } finally { + stopWebSocketClient(consumeClient); + } + + // Fill the backlog + try { + produceClient1.start(); + ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); + Future<Session> producerFuture = produceClient1.connect(produceSocket1, produceUri, produceRequest); + producerFuture.get(); + produceSocket1.sendMessage(100); + } finally { + stopWebSocketClient(produceClient1); + } + + Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); + + // New producer fails to connect + try { + produceClient2.start(); + ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); + Future<Session> producerFuture = produceClient2.connect(produceSocket2, produceUri, produceRequest); + producerFuture.get(); + Assert.fail("should fail: backlog quota exceeded"); + } catch (Exception e) { + // Expected + Assert.assertTrue(e.getCause() instanceof UpgradeException); + Assert.assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), + HttpServletResponse.SC_SERVICE_UNAVAILABLE); + } finally { + stopWebSocketClient(produceClient2); + admin.persistentTopics().skipAllMessages("persistent://" + topic, subscription); + admin.persistentTopics().delete("persistent://" + topic); + admin.namespaces().removeBacklogQuota("my-property/use/ns-ws-quota"); + admin.namespaces().deleteNamespace("my-property/use/ns-ws-quota"); } } @@ -232,7 +350,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { */ @Test(timeOut = 10000) public void testProxyStats() throws Exception { - final String topic = "my-property/use/my-ns/my-topic2"; + final String topic = "my-property/use/my-ns/my-topic6"; final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/" + topic + "/my-sub?subscriptionType=Failover"; final String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/" + topic + "/"; @@ -299,9 +417,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { verifyTopicStat(client, baseUrl, topic); } finally { - consumeClient1.stop(); - produceClient.stop(); - log.info("proxy clients are stopped successfully"); + stopWebSocketClient(consumeClient1, produceClient); } } @@ -361,5 +477,23 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { Assert.assertNotNull(producerStats.remoteConnection); } + private void stopWebSocketClient(WebSocketClient... clients) { + ExecutorService executor = newFixedThreadPool(1); + try { + executor.submit(() -> { + try { + for (WebSocketClient client : clients) { + client.stop(); + } + log.info("proxy clients are stopped successfully"); + } catch (Exception e) { + log.error(e.getMessage()); + } + }).get(2, TimeUnit.SECONDS); + } catch (Exception e) { + log.error("failed to close proxy clients", e); + } + } + private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class); } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java index 91e62b7..ad6f910 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java @@ -48,7 +48,6 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen protected final String topic; protected final Map<String, String> queryParams; - protected final boolean authResult; public AbstractWebSocketHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) { this.service = service; @@ -59,11 +58,9 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen request.getParameterMap().forEach((key, values) -> { queryParams.put(key, values[0]); }); - - authResult = checkAuth(response); } - private boolean checkAuth(ServletUpgradeResponse response) { + protected boolean checkAuth(ServletUpgradeResponse response) { String authRole = "<none>"; AuthenticationDataSource authenticationData = new AuthenticationDataHttps(request); if (service.isAuthenticationEnabled()) { diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index b392e0c..6edcf2a 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -35,6 +35,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.util.DateFormatter; @@ -64,7 +65,7 @@ import com.google.common.base.Splitter; */ public class ConsumerHandler extends AbstractWebSocketHandler { - private final String subscription; + private String subscription = null; private final ConsumerConfiguration conf; private Consumer consumer; @@ -80,18 +81,19 @@ public class ConsumerHandler extends AbstractWebSocketHandler { public ConsumerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) { super(service, request, response); - this.subscription = extractSubscription(request); this.conf = getConsumerConfiguration(); this.maxPendingMessages = (conf.getReceiverQueueSize() == 0) ? 1 : conf.getReceiverQueueSize(); this.numMsgsDelivered = new LongAdder(); this.numBytesDelivered = new LongAdder(); this.numMsgsAcked = new LongAdder(); - if (!authResult) { - return; - } - try { + // checkAuth() should be called after assigning a value to this.subscription + this.subscription = extractSubscription(request); + if (!checkAuth(response)) { + return; + } + this.consumer = service.getPulsarClient().subscribe(topic, subscription, conf); if (!this.service.addConsumer(this)) { log.warn("[{}:{}] Failed to add consumer handler for topic {}", request.getRemoteAddr(), @@ -100,12 +102,9 @@ public class ConsumerHandler extends AbstractWebSocketHandler { } catch (Exception e) { log.warn("[{}:{}] Failed in creating subscription {} on topic {}", request.getRemoteAddr(), request.getRemotePort(), subscription, topic, e); - boolean configError = e instanceof IllegalArgumentException; - int errorCode = configError ? HttpServletResponse.SC_BAD_REQUEST - : HttpServletResponse.SC_INTERNAL_SERVER_ERROR; - String errorMsg = configError ? "Invalid query-param " + e.getMessage() : "Failed to subscribe"; + try { - response.sendError(errorCode, errorMsg); + response.sendError(getErrorCode(e), getErrorMessage(e)); } catch (IOException e1) { log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(), e1.getMessage(), e1); @@ -113,6 +112,24 @@ public class ConsumerHandler extends AbstractWebSocketHandler { } } + private static int getErrorCode(Exception e) { + if (e instanceof IllegalArgumentException) { + return HttpServletResponse.SC_BAD_REQUEST; + } else if (e instanceof ConsumerBusyException) { + return HttpServletResponse.SC_CONFLICT; + } else { + return HttpServletResponse.SC_INTERNAL_SERVER_ERROR; + } + } + + private static String getErrorMessage(Exception e) { + if (e instanceof IllegalArgumentException) { + return "Invalid query params: " + e.getMessage(); + } else { + return "Failed to subscribe: " + e.getMessage(); + } + } + private void receiveMessage() { if (log.isDebugEnabled()) { log.debug("[{}:{}] [{}] [{}] Receive next message", request.getRemoteAddr(), request.getRemotePort(), topic, subscription); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java index cb2e288..7df14a5 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java @@ -41,6 +41,8 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.ProducerConfiguration.HashingScheme; import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError; +import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException; import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -85,7 +87,7 @@ public class ProducerHandler extends AbstractWebSocketHandler { this.numMsgsFailed = new LongAdder(); this.publishLatencyStatsUSec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC); - if (!authResult) { + if (!checkAuth(response)) { return; } @@ -114,6 +116,8 @@ public class ProducerHandler extends AbstractWebSocketHandler { return HttpServletResponse.SC_BAD_REQUEST; } else if (e instanceof ProducerBusyException) { return HttpServletResponse.SC_CONFLICT; + } else if (e instanceof ProducerBlockedQuotaExceededError || e instanceof ProducerBlockedQuotaExceededException) { + return HttpServletResponse.SC_SERVICE_UNAVAILABLE; } else { return HttpServletResponse.SC_INTERNAL_SERVER_ERROR; } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java index 4d6c271..d643df2 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java @@ -82,7 +82,7 @@ public class ReaderHandler extends AbstractWebSocketHandler { this.numMsgsDelivered = new LongAdder(); this.numBytesDelivered = new LongAdder(); - if (!authResult) { + if (!checkAuth(response)) { return; } @@ -97,7 +97,8 @@ public class ReaderHandler extends AbstractWebSocketHandler { log.warn("[{}:{}] Failed in creating reader {} on topic {}", request.getRemoteAddr(), request.getRemotePort(), subscription, topic, e); try { - response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to create reader"); + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, + "Failed to create reader: " + e.getMessage()); } catch (IOException e1) { log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(), e1.getMessage(), e1); -- To stop receiving notification emails like this one, please contact mme...@apache.org.