This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 17b531b Updated consumer example to use negative acks (#3761) 17b531b is described below commit 17b531bd18b2e00714d0b2e4bc8d6cfdc2dc1647 Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Mar 6 19:58:47 2019 -0800 Updated consumer example to use negative acks (#3761) --- site2/docs/client-libraries-go.md | 16 +++++++++++++--- site2/docs/client-libraries-java.md | 19 +++++++++++++------ site2/docs/client-libraries-python.md | 18 ++++++++++++++---- site2/docs/reference-terminology.md | 7 ++++++- 4 files changed, 46 insertions(+), 14 deletions(-) diff --git a/site2/docs/client-libraries-go.md b/site2/docs/client-libraries-go.md index 5d7466b..5e22ac6 100644 --- a/site2/docs/client-libraries-go.md +++ b/site2/docs/client-libraries-go.md @@ -262,7 +262,9 @@ Method | Description | Return type `Receive(context.Context)` | Receives a single message from the topic. This method blocks until a message is available. | `(Message, error)` `Ack(Message)` | [Acknowledges](reference-terminology.md#acknowledgment-ack) a message to the Pulsar [broker](reference-terminology.md#broker) | `error` `AckID(MessageID)` | [Acknowledges](reference-terminology.md#acknowledgment-ack) a message to the Pulsar [broker](reference-terminology.md#broker) by message ID | `error` -`AckCumulative(Message)` | [Acknowledges](reference-terminology.md#acknowledgment-ack) *all* the messages in the stream, up to and including the specified message. The `AckCumulative` method will block until the ack has been sent to the broker. After that, the messages will *not* be redelivered to the consumer. Cumulative acking can only be used with a [shared](concepts-messaging.md#shared) subscription type. +`AckCumulative(Message)` | [Acknowledges](reference-terminology.md#acknowledgment-ack) *all* the messages in the stream, up to and including the specified message. The `AckCumulative` method will block until the ack has been sent to the broker. After that, the messages will *not* be redelivered to the consumer. Cumulative acking can only be used with a [shared](concepts-messaging.md#shared) subscription type. | `error` +`Nack(Message)` | Acknowledge the failure to process a single message. | `error` +`NackID(MessageID)` | Acknowledge the failure to process a single message. | `error` `Close()` | Closes the consumer, disabling its ability to receive messages from the broker | `error` `RedeliverUnackedMessages()` | Redelivers *all* unacknowledged messages on the topic. In [failover](concepts-messaging.md#failover) mode, this request is ignored if the consumer isn't active on the specified topic; in [shared](concepts-messaging.md#shared) mode, redelivered messages are distributed across all consumers connected to the topic. **Note**: this is a *non-blocking* operation that doesn't throw an error. | @@ -305,8 +307,15 @@ func main() { if err != nil { log.Fatal(err) } // Do something with the message - - consumer.Ack(msg) + err = processMessage(msg) + + if err == nil { + // Message processed successfully + consumer.Ack(msg) + } else { + // Failed to process messages + consumer.Nack(msg) + } } } ``` @@ -319,6 +328,7 @@ Parameter | Description | Default `SubscriptionName` | The subscription name for this consumer | `Name` | The name of the consumer | `AckTimeout` | | 0 +`NackRedeliveryDelay` | The delay after which to redeliver the messages that failed to be processed. Default is 1min. (See `Consumer.Nack()`) | 1 minute `SubscriptionType` | Available options are `Exclusive`, `Shared`, and `Failover` | `Exclusive` `MessageChannel` | The Go channel used by the consumer. Messages that arrive from the Pulsar topic(s) will be passed to this channel. | `ReceiverQueueSize` | Sets the size of the consumer's receiver queue, i.e. the number of messages that can be accumulated by the consumer before the application calls `Receive`. A value higher than the default of 1000 could increase consumer throughput, though at the expense of more memory utilization. | 1000 diff --git a/site2/docs/client-libraries-java.md b/site2/docs/client-libraries-java.md index b588e0f..b8680d8 100644 --- a/site2/docs/client-libraries-java.md +++ b/site2/docs/client-libraries-java.md @@ -186,18 +186,25 @@ Consumer consumer = client.newConsumer() .subscribe(); ``` -The `subscribe` method will automatically subscribe the consumer to the specified topic and subscription. One way to make the consumer listen on the topic is to set up a `while` loop. In this example loop, the consumer listens for messages, prints the contents of any message that's received, and then [acknowledges](reference-terminology.md#acknowledgment-ack) that the message has been processed: +The `subscribe` method will automatically subscribe the consumer to the specified topic and subscription. One way to make the consumer listen on the topic is to set up a `while` loop. In this example loop, the consumer listens for messages, prints the contents of any message that's received, and then [acknowledges](reference-terminology.md#acknowledgment-ack) that the message has been processed. If the processing logic fails, we use [negative acknowledgement](reference-terminology.md#ack [...] +to have the message redelivered at a later point in time. ```java -do { +while (true) { // Wait for a message Message msg = consumer.receive(); - System.out.printf("Message received: %s", new String(msg.getData())); + try { + // Do something with the message + System.out.printf("Message received: %s", new String(msg.getData())); - // Acknowledge the message so that it can be deleted by the message broker - consumer.acknowledge(msg); -} while (true); + // Acknowledge the message so that it can be deleted by the message broker + consumer.acknowledge(msg); + } catch (Exception e) { + // Message failed to process, redeliver later + consumer.negativeAcknowledge(msg); + } +} ``` ### Configuring consumers diff --git a/site2/docs/client-libraries-python.md b/site2/docs/client-libraries-python.md index 7b79f72..e1b84b1 100644 --- a/site2/docs/client-libraries-python.md +++ b/site2/docs/client-libraries-python.md @@ -71,8 +71,13 @@ consumer = client.subscribe('my-topic', 'my-subscription') while True: msg = consumer.receive() - print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) - consumer.acknowledge(msg) + try: + print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) + # Acknowledge successful processing of the message + consumer.acknowledge(msg) + except: + # Message failed to be processed + consumer.negative_acknowledge(msg) client.close() ``` @@ -147,8 +152,13 @@ consumer = client.subscribe( while True: msg = consumer.receive() ex = msg.value() - print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c)) - consumer.acknowledge(msg) + try: + print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c)) + # Acknowledge successful processing of the message + consumer.acknowledge(msg) + except: + # Message failed to be processed + consumer.negative_acknowledge(msg) ``` ### Supported schema types diff --git a/site2/docs/reference-terminology.md b/site2/docs/reference-terminology.md index fb83976..6b4845c 100644 --- a/site2/docs/reference-terminology.md +++ b/site2/docs/reference-terminology.md @@ -76,6 +76,12 @@ A message sent to a Pulsar broker by a [consumer](#consumer) that a message has An acknowledgement (ack) is Pulsar's way of knowing that the message can be deleted from the system; if no acknowledgement, then the message will be retained until it's processed. +#### Negative Acknowledgment (nack) + +When an application fails to process a particular message, it can sends a "negative ack" to Pulsar +to signal that the message should be replayed at a later timer. (By default, failed messages are +replayed after a 1 minute delay) + #### Unacknowledged A message that has been delivered to a consumer for processing but not yet confirmed as processed by the consumer. @@ -158,4 +164,3 @@ An append-only data structure in [BookKeeper](#bookkeeper) that is used to persi ### Functions Pulsar Functions are lightweight functions that can consume messages from Pulsar topics, apply custom processing logic, and, if desired, publish results to topics. -