[GitHub] [pulsar] codelipenghui commented on a change in pull request #8249: Nack support in WS

2021-01-20 Thread GitBox


codelipenghui commented on a change in pull request #8249:
URL: https://github.com/apache/pulsar/pull/8249#discussion_r560965409



##
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerMessageHandler.java
##
@@ -0,0 +1,7 @@
+package org.apache.pulsar.websocket.proxy;

Review comment:
   Miss license header here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [pulsar] codelipenghui commented on a change in pull request #8249: Nack support in WS

2020-11-16 Thread GitBox


codelipenghui commented on a change in pull request #8249:
URL: https://github.com/apache/pulsar/pull/8249#discussion_r524858103



##
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
##
@@ -564,6 +566,76 @@ public void socketPullModeTest() throws Exception {
 }
 }
 
+@Test(timeOut = 1)
+public void nackMessageTest() throws Exception {
+final String subscription = "my-sub";
+final String dlqTopic = "my-property/my-ns/my-topic10";
+final String consumerTopic = "my-property/my-ns/my-topic9";
+
+final String dlqUri = "ws://localhost:" + 
proxyServer.getListenPortHTTP().get() +
+  "/ws/v2/consumer/persistent/" +
+  dlqTopic + "/" + subscription +
+  "?subscriptionType=Shared";
+
+final String consumerUri = "ws://localhost:" + 
proxyServer.getListenPortHTTP().get() +
+  "/ws/v2/consumer/persistent/" +
+  consumerTopic + "/" + subscription +
+  "?deadLetterTopic=" + dlqTopic +
+  
"&maxRedeliverCount=0&subscriptionType=Shared&ackTimeoutMillis=1000&negativeAckRedeliveryDelay=1000";
+
+final String producerUri = "ws://localhost:" + 
proxyServer.getListenPortHTTP().get() +
+  "/ws/v2/producer/persistent/" + consumerTopic;
+
+WebSocketClient consumeClient1 = new WebSocketClient();
+SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
+WebSocketClient consumeClient2 = new WebSocketClient();
+SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
+WebSocketClient produceClient = new WebSocketClient();
+SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+consumeSocket1.setMessageHandler((id, data) -> {
+JsonObject nack = new JsonObject();
+nack.add("messageId", new JsonPrimitive(id));
+nack.add("type", new JsonPrimitive("negativeAcknowledge"));
+return nack.toString();
+});
+
+try {
+consumeClient1.start();
+consumeClient2.start();
+ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
+ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
+Future consumerFuture1 = 
consumeClient1.connect(consumeSocket1, URI.create(consumerUri), 
consumeRequest1);
+Future consumerFuture2 = 
consumeClient2.connect(consumeSocket2, URI.create(dlqUri), consumeRequest2);
+
+assertTrue(consumerFuture1.get().isOpen());
+assertTrue(consumerFuture2.get().isOpen());
+
+ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+produceClient.start();
+Future producerFuture = 
produceClient.connect(produceSocket, URI.create(producerUri), produceRequest);
+assertTrue(producerFuture.get().isOpen());
+
+assertEquals(consumeSocket1.getReceivedMessagesCount(), 0);
+assertEquals(consumeSocket2.getReceivedMessagesCount(), 0);
+
+produceSocket.sendMessage(1);
+
+Thread.sleep(500);
+
+//assertEquals(consumeSocket1.getReceivedMessagesCount(), 1);

Review comment:
   I think you can refine the test as 
https://github.com/apache/pulsar/pull/8557 does. Sleep also introduces too much 
flaky tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org