[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-21 Thread GitBox
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211835151
 
 

 ##
 File path: pulsar-broker/src/main/java/org/apache/pulsar/utils/Quorum.java
 ##
 @@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+public class Quorum {
 
 Review comment:
   It maybe better to add some comments, such as how to use it, and the meaning 
of each public methods. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-21 Thread GitBox
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211835573
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 ##
 @@ -2662,4 +2662,108 @@ public void received(Consumer consumer, Message 
message)
 assertEquals(latch.getCount(), 1);
 consumer.close();
 }
+
+@Test
+public void testDeadLetterTopic() throws Exception {
+
+final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+
+final int maxRedeliveryCount = 2;
+
+final int sendMessages = 100;
+
+Producer producer = pulsarClient.newProducer(Schema.BYTES)
+.topic(topic)
+.create();
+
+for (int i = 0; i < sendMessages; i++) {
+producer.send("Hello Pulsar!".getBytes());
 
 Review comment:
   How about combine "i" together with the message content? it may be useful a 
little to debug. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-21 Thread GitBox
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r211836331
 
 

 ##
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 ##
 @@ -2662,4 +2662,108 @@ public void received(Consumer consumer, Message 
message)
 assertEquals(latch.getCount(), 1);
 consumer.close();
 }
+
+@Test
+public void testDeadLetterTopic() throws Exception {
+
+final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+
+final int maxRedeliveryCount = 2;
+
+final int sendMessages = 100;
+
+Producer producer = pulsarClient.newProducer(Schema.BYTES)
+.topic(topic)
+.create();
+
+for (int i = 0; i < sendMessages; i++) {
+producer.send("Hello Pulsar!".getBytes());
+}
+
+producer.close();
+
+Consumer consumer = pulsarClient.newConsumer(Schema.BYTES)
+.topic(topic)
+.subscriptionName("my-subscription")
+.subscriptionType(SubscriptionType.Shared)
+.ackTimeout(3, TimeUnit.SECONDS)
+.maxRedeliveryCount(maxRedeliveryCount)
+.receiverQueueSize(100)
+.maxUnackedMessagesPerConsumer(100)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+.subscribe();
+
+Consumer deadLetterConsumer = 
pulsarClient.newConsumer(Schema.BYTES)
+
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+.subscriptionName("my-subscription")
+.subscribe();
+
+int totalReceived = 0;
+do {
+consumer.receive();
+totalReceived++;
+} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+int totalInDeadLetter = 0;
+do {
+Message message = deadLetterConsumer.receive();
+consumer.acknowledge(message);
+totalInDeadLetter++;
+} while (totalInDeadLetter < sendMessages);
+
 
 Review comment:
   Could we assert some thing to do a verification? such as messages are all 
received and acked?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-23 Thread GitBox
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212513063
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##
 @@ -88,20 +100,32 @@
 private final ServiceConfiguration serviceConfig;
 private DispatchRateLimiter dispatchRateLimiter;
 
+protected volatile int maxRedeliveryCount;
+protected volatile String deadLetterTopic;
+protected RedeliveryTracker redeliveryTracker;
+private volatile ProducerImpl deadLetterTopicProducer;
+
 enum ReadType {
 Normal, Replay
 }
 
-public PersistentDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor) {
+public PersistentDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor, int maxRedeliveryCount,
+ String deadLetterTopic) {
 this.cursor = cursor;
 this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
 this.topic = topic;
 this.messagesToReplay = new ConcurrentLongPairSet(512, 2);
+this.messagesToDeadLetter = new HashSet<>(8);
 
 Review comment:
   +1, @codelipenghui, we may need following the way  of `messagesToReplay`, 
which store ledgerId and entryId as primitive value, and then compose a 
Position from(ledgerId + entryId) when using it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-23 Thread GitBox
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212514520
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##
 @@ -132,11 +156,44 @@ public synchronized void addConsumer(Consumer consumer) 
throws BrokerServiceExce
 throw new ConsumerBusyException("Subscription reached max 
consumers limit");
 }
 
+deadLetterTopicProducer = newDeadLetterProducer();
+
 consumerList.add(consumer);
 consumerList.sort((c1, c2) -> c1.getPriorityLevel() - 
c2.getPriorityLevel());
 consumerSet.add(consumer);
 }
 
+private ProducerImpl newDeadLetterProducer() throws 
BrokerServiceException {
+if (maxRedeliveryCount > 0 && deadLetterTopicProducer == null) {
+try {
+if (maxRedeliveryCount > 0 && 
StringUtils.isBlank(deadLetterTopic)) {
+deadLetterTopic = String.format("%s-%s-DLQ", 
topic.getName(), Codec.decode(cursor.getName()));
 
 Review comment:
   Seems it is up to application to decide.  Application can choose its own 
name, and only use “-DLQ” when the application doesn’t specify.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic

2018-08-23 Thread GitBox
jiazhai commented on a change in pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400#discussion_r212514520
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##
 @@ -132,11 +156,44 @@ public synchronized void addConsumer(Consumer consumer) 
throws BrokerServiceExce
 throw new ConsumerBusyException("Subscription reached max 
consumers limit");
 }
 
+deadLetterTopicProducer = newDeadLetterProducer();
+
 consumerList.add(consumer);
 consumerList.sort((c1, c2) -> c1.getPriorityLevel() - 
c2.getPriorityLevel());
 consumerSet.add(consumer);
 }
 
+private ProducerImpl newDeadLetterProducer() throws 
BrokerServiceException {
+if (maxRedeliveryCount > 0 && deadLetterTopicProducer == null) {
+try {
+if (maxRedeliveryCount > 0 && 
StringUtils.isBlank(deadLetterTopic)) {
+deadLetterTopic = String.format("%s-%s-DLQ", 
topic.getName(), Codec.decode(cursor.getName()));
 
 Review comment:
   Seems it is up to application to decide.  In this change, application can 
choose its own name, and only use “-DLQ” when the application doesn’t specify.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services