This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 17b531b  Updated consumer example to use negative acks (#3761)
17b531b is described below

commit 17b531bd18b2e00714d0b2e4bc8d6cfdc2dc1647
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Mar 6 19:58:47 2019 -0800

    Updated consumer example to use negative acks (#3761)
---
 site2/docs/client-libraries-go.md     | 16 +++++++++++++---
 site2/docs/client-libraries-java.md   | 19 +++++++++++++------
 site2/docs/client-libraries-python.md | 18 ++++++++++++++----
 site2/docs/reference-terminology.md   |  7 ++++++-
 4 files changed, 46 insertions(+), 14 deletions(-)

diff --git a/site2/docs/client-libraries-go.md 
b/site2/docs/client-libraries-go.md
index 5d7466b..5e22ac6 100644
--- a/site2/docs/client-libraries-go.md
+++ b/site2/docs/client-libraries-go.md
@@ -262,7 +262,9 @@ Method | Description | Return type
 `Receive(context.Context)` | Receives a single message from the topic. This 
method blocks until a message is available. | `(Message, error)`
 `Ack(Message)` | [Acknowledges](reference-terminology.md#acknowledgment-ack) a 
message to the Pulsar [broker](reference-terminology.md#broker) | `error`
 `AckID(MessageID)` | 
[Acknowledges](reference-terminology.md#acknowledgment-ack) a message to the 
Pulsar [broker](reference-terminology.md#broker) by message ID | `error`
-`AckCumulative(Message)` | 
[Acknowledges](reference-terminology.md#acknowledgment-ack) *all* the messages 
in the stream, up to and including the specified message. The `AckCumulative` 
method will block until the ack has been sent to the broker. After that, the 
messages will *not* be redelivered to the consumer. Cumulative acking can only 
be used with a [shared](concepts-messaging.md#shared) subscription type.
+`AckCumulative(Message)` | 
[Acknowledges](reference-terminology.md#acknowledgment-ack) *all* the messages 
in the stream, up to and including the specified message. The `AckCumulative` 
method will block until the ack has been sent to the broker. After that, the 
messages will *not* be redelivered to the consumer. Cumulative acking can only 
be used with a [shared](concepts-messaging.md#shared) subscription type. | 
`error`
+`Nack(Message)` | Acknowledge the failure to process a single message. | 
`error`
+`NackID(MessageID)` | Acknowledge the failure to process a single message. | 
`error`
 `Close()` | Closes the consumer, disabling its ability to receive messages 
from the broker | `error`
 `RedeliverUnackedMessages()` | Redelivers *all* unacknowledged messages on the 
topic. In [failover](concepts-messaging.md#failover) mode, this request is 
ignored if the consumer isn't active on the specified topic; in 
[shared](concepts-messaging.md#shared) mode, redelivered messages are 
distributed across all consumers connected to the topic. **Note**: this is a 
*non-blocking* operation that doesn't throw an error. |
 
@@ -305,8 +307,15 @@ func main() {
         if err != nil { log.Fatal(err) }
 
         // Do something with the message
-
-        consumer.Ack(msg)
+        err = processMessage(msg)
+
+        if err == nil {
+            // Message processed successfully
+            consumer.Ack(msg)
+        } else {
+            // Failed to process messages
+            consumer.Nack(msg)
+        }
     }
 }
 ```
@@ -319,6 +328,7 @@ Parameter | Description | Default
 `SubscriptionName` | The subscription name for this consumer |
 `Name` | The name of the consumer |
 `AckTimeout` | | 0
+`NackRedeliveryDelay` | The delay after which to redeliver the messages that 
failed to be processed. Default is 1min. (See `Consumer.Nack()`) | 1 minute
 `SubscriptionType` | Available options are `Exclusive`, `Shared`, and 
`Failover` | `Exclusive`
 `MessageChannel` | The Go channel used by the consumer. Messages that arrive 
from the Pulsar topic(s) will be passed to this channel. |
 `ReceiverQueueSize` | Sets the size of the consumer's receiver queue, i.e. the 
number of messages that can be accumulated by the consumer before the 
application calls `Receive`. A value higher than the default of 1000 could 
increase consumer throughput, though at the expense of more memory utilization. 
| 1000
diff --git a/site2/docs/client-libraries-java.md 
b/site2/docs/client-libraries-java.md
index b588e0f..b8680d8 100644
--- a/site2/docs/client-libraries-java.md
+++ b/site2/docs/client-libraries-java.md
@@ -186,18 +186,25 @@ Consumer consumer = client.newConsumer()
         .subscribe();
 ```
 
-The `subscribe` method will automatically subscribe the consumer to the 
specified topic and subscription. One way to make the consumer listen on the 
topic is to set up a `while` loop. In this example loop, the consumer listens 
for messages, prints the contents of any message that's received, and then 
[acknowledges](reference-terminology.md#acknowledgment-ack) that the message 
has been processed:
+The `subscribe` method will automatically subscribe the consumer to the 
specified topic and subscription. One way to make the consumer listen on the 
topic is to set up a `while` loop. In this example loop, the consumer listens 
for messages, prints the contents of any message that's received, and then 
[acknowledges](reference-terminology.md#acknowledgment-ack) that the message 
has been processed. If the processing logic fails, we use [negative 
acknowledgement](reference-terminology.md#ack [...]
+to have the message redelivered at a later point in time.
 
 ```java
-do {
+while (true) {
   // Wait for a message
   Message msg = consumer.receive();
 
-  System.out.printf("Message received: %s", new String(msg.getData()));
+  try {
+      // Do something with the message
+      System.out.printf("Message received: %s", new String(msg.getData()));
 
-  // Acknowledge the message so that it can be deleted by the message broker
-  consumer.acknowledge(msg);
-} while (true);
+      // Acknowledge the message so that it can be deleted by the message 
broker
+      consumer.acknowledge(msg);
+  } catch (Exception e) {
+      // Message failed to process, redeliver later
+      consumer.negativeAcknowledge(msg);
+  }
+}
 ```
 
 ### Configuring consumers
diff --git a/site2/docs/client-libraries-python.md 
b/site2/docs/client-libraries-python.md
index 7b79f72..e1b84b1 100644
--- a/site2/docs/client-libraries-python.md
+++ b/site2/docs/client-libraries-python.md
@@ -71,8 +71,13 @@ consumer = client.subscribe('my-topic', 'my-subscription')
 
 while True:
     msg = consumer.receive()
-    print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
-    consumer.acknowledge(msg)
+    try:
+        print("Received message '{}' id='{}'".format(msg.data(), 
msg.message_id()))
+        # Acknowledge successful processing of the message
+        consumer.acknowledge(msg)
+    except:
+        # Message failed to be processed
+        consumer.negative_acknowledge(msg)
 
 client.close()
 ```
@@ -147,8 +152,13 @@ consumer = client.subscribe(
 while True:
     msg = consumer.receive()
     ex = msg.value()
-    print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c))
-    consumer.acknowledge(msg)
+    try:
+        print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c))
+        # Acknowledge successful processing of the message
+        consumer.acknowledge(msg)
+    except:
+        # Message failed to be processed
+        consumer.negative_acknowledge(msg)
 ```
 
 ### Supported schema types
diff --git a/site2/docs/reference-terminology.md 
b/site2/docs/reference-terminology.md
index fb83976..6b4845c 100644
--- a/site2/docs/reference-terminology.md
+++ b/site2/docs/reference-terminology.md
@@ -76,6 +76,12 @@ A message sent to a Pulsar broker by a [consumer](#consumer) 
that a message has
 An acknowledgement (ack) is Pulsar's way of knowing that the message can be 
deleted from the system;
 if no acknowledgement, then the message will be retained until it's processed.
 
+#### Negative Acknowledgment (nack)
+
+When an application fails to process a particular message, it can sends a 
"negative ack" to Pulsar
+to signal that the message should be replayed at a later timer. (By default, 
failed messages are
+replayed after a 1 minute delay)
+
 #### Unacknowledged
 
 A message that has been delivered to a consumer for processing but not yet 
confirmed as processed by the consumer.
@@ -158,4 +164,3 @@ An append-only data structure in [BookKeeper](#bookkeeper) 
that is used to persi
 ### Functions
 
 Pulsar Functions are lightweight functions that can consume messages from 
Pulsar topics, apply custom processing logic, and, if desired, publish results 
to topics.
-

Reply via email to