merlimat closed pull request #1271: Message deduplication documentation URL: https://github.com/apache/incubator-pulsar/pull/1271
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/conf/broker.conf b/conf/broker.conf index d78fbbc02d..d38abf7046 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -86,9 +86,9 @@ brokerDeduplicationEnabled=false brokerDeduplicationMaxNumberOfProducers=10000 # Number of entries after which a dedup info snapshot is taken. -# A bigger interval will lead to less snapshots being taken though it would -# increase the topic recovery time, when the entries published after the -# snapshot need to be replayed +# A larger interval will lead to fewer snapshots being taken, though it would +# increase the topic recovery time when the entries published after the +# snapshot need to be replayed. brokerDeduplicationEntriesInterval=1000 # Time of inactivity after which the broker will discard the deduplication information diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java index 16dcd009cb..03d82a7858 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java @@ -101,6 +101,8 @@ * Set the send timeout <i>(default: 30 seconds)</i> * <p> * If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported. + * Setting the timeout to zero, for example <code>setTimeout(0, TimeUnit.SECONDS)</code> will set the timeout + * to infinity, which can be useful when using Pulsar's message deduplication feature. * * @param sendTimeout * the send timeout diff --git a/site/Gemfile.lock b/site/Gemfile.lock index 1eb692b4ef..be5c876094 100644 --- a/site/Gemfile.lock +++ b/site/Gemfile.lock @@ -47,7 +47,7 @@ GEM rouge (3.1.1) ruby_dep (1.5.0) safe_yaml (1.0.4) - sass (3.5.5) + sass (3.5.6) sass-listen (~> 4.0.0) sass-listen (4.0.0) rb-fsevent (~> 0.9, >= 0.9.4) diff --git a/site/_data/cli/pulsar-admin.yaml b/site/_data/cli/pulsar-admin.yaml index ef56909cda..424c2b41a2 100644 --- a/site/_data/cli/pulsar-admin.yaml +++ b/site/_data/cli/pulsar-admin.yaml @@ -326,6 +326,16 @@ commands: - name: delete description: Deletes a namespace argument: property/cluster/namespace + - name: set-deduplication + description: Enable or disable message deduplication on a namespace + argument: property/cluster/namespace + options: + - flags: --enable, -e + description: Enable message deduplication on the specified namespace + default: 'false' + - flags: --disable, -d + description: Disable message deduplication on the specified namespace + default: 'false' - name: permissions description: Get the permissions on a namespace argument: property/cluster/namespace diff --git a/site/_data/config/broker.yaml b/site/_data/config/broker.yaml index 445b4a6d8e..9cc9fb6d9b 100644 --- a/site/_data/config/broker.yaml +++ b/site/_data/config/broker.yaml @@ -47,10 +47,22 @@ configs: description: Hostname or IP address the service binds on, default is 0.0.0.0. - name: advertisedAddress default: '' - description: Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used. + description: Hostname or IP address the service advertises to the outside world. If not set, the value of `InetAddress.getLocalHost().getHostName()` is used. - name: clusterName default: '' description: Name of the cluster to which this broker belongs to +- name: brokerDeduplicationEnabled + default: 'false' + description: Sets the default behavior for message deduplication in the broker. If enabled, the broker will reject messages that were already stored in the topic. This setting can be overridden on a per-namespace basis. +- name: brokerDeduplicationMaxNumberOfProducers + default: '10000' + description: The maximum number of producers for which information will be stored for deduplication purposes. +- name: brokerDeduplicationEntriesInterval + default: '1000' + description: The number of entries after which a deduplication informational snapshot is taken. A larger interval will lead to fewer snapshots being taken, though this would also lengthen the topic recovery time (the time required for entries published after the snapshot to be replayed). +- name: brokerDeduplicationProducerInactivityTimeoutMinutes + default: '360' + description: The time of inactivity (in minutes) after which the broker will discard deduplication information related to a disconnected producer. - name: zooKeeperSessionTimeoutMillis default: '30000' description: Zookeeper session timeout in milliseconds diff --git a/site/_data/sidebar.yaml b/site/_data/sidebar.yaml index 91028b2cf7..a4eeb0ba5c 100644 --- a/site/_data/sidebar.yaml +++ b/site/_data/sidebar.yaml @@ -129,9 +129,11 @@ groups: - title: Apache Storm endpoint: PulsarStorm -- title: Advanced - dir: advanced +- title: Cookbooks + dir: cookbooks docs: + - title: Message deduplication + endpoint: message-deduplication - title: Partitioned topics endpoint: PartitionedTopics - title: Retention and expiry diff --git a/site/docs/latest/admin/ZooKeeperBookKeeper.md b/site/docs/latest/admin/ZooKeeperBookKeeper.md index e7467aa021..25204eeda8 100644 --- a/site/docs/latest/admin/ZooKeeperBookKeeper.md +++ b/site/docs/latest/admin/ZooKeeperBookKeeper.md @@ -59,7 +59,7 @@ Configuration for global ZooKeeper is handled by the [`conf/global-zookeeper.con {% popover BookKeeper %} is responsible for all durable message storage in Pulsar. BookKeeper is a distributed [write-ahead log](https://en.wikipedia.org/wiki/Write-ahead_logging) WAL system that guarantees read consistency of independent message logs called {% popover ledgers %}. Individual BookKeeper servers are also called *bookies*. {% include admonition.html type="info" content=" -For a guide to managing message persistence, retention, and expiry in Pulsar, see [this guide](../../advanced/RetentionExpiry). +For a guide to managing message persistence, retention, and expiry in Pulsar, see [this cookbook](../../cookbooks/RetentionExpiry). " %} ### Deploying BookKeeper diff --git a/site/docs/latest/clients/Java.md b/site/docs/latest/clients/Java.md index 237a00dede..59b00164a5 100644 --- a/site/docs/latest/clients/Java.md +++ b/site/docs/latest/clients/Java.md @@ -151,7 +151,7 @@ Producer producer = client.newProducer() ### Message routing -When using {% popover partitioned topics %}, you can specify the routing mode whenever you publish messages using a {% popover producer %}. For more on specifying a routing mode using the Java client, see the [Partitioned Topics](../../advanced/PartitionedTopics) guide. +When using {% popover partitioned topics %}, you can specify the routing mode whenever you publish messages using a {% popover producer %}. For more on specifying a routing mode using the Java client, see the [Partitioned Topics](../../cookbooks/PartitionedTopics) cookbook. ### Async send diff --git a/site/docs/latest/advanced/Encryption.md b/site/docs/latest/cookbooks/Encryption.md similarity index 100% rename from site/docs/latest/advanced/Encryption.md rename to site/docs/latest/cookbooks/Encryption.md diff --git a/site/docs/latest/advanced/PartitionedTopics.md b/site/docs/latest/cookbooks/PartitionedTopics.md similarity index 100% rename from site/docs/latest/advanced/PartitionedTopics.md rename to site/docs/latest/cookbooks/PartitionedTopics.md diff --git a/site/docs/latest/advanced/RetentionExpiry.md b/site/docs/latest/cookbooks/RetentionExpiry.md similarity index 100% rename from site/docs/latest/advanced/RetentionExpiry.md rename to site/docs/latest/cookbooks/RetentionExpiry.md diff --git a/site/docs/latest/cookbooks/message-deduplication.md b/site/docs/latest/cookbooks/message-deduplication.md new file mode 100644 index 0000000000..3a6a2b8c5c --- /dev/null +++ b/site/docs/latest/cookbooks/message-deduplication.md @@ -0,0 +1,121 @@ +--- +title: Message deduplication +tags: [admin, deduplication, cookbook] +new: true +--- + +**Message deduplication** is a feature of Pulsar that, when enabled, ensures that each message produced on Pulsar {% popover topics %} is persisted to disk *only once*, even if the message is produced more than once. Message deduplication essentially unburdens Pulsar applications of the responsibility of ensuring deduplication and instead handles it automatically on the server side. + +Using message deduplication in Pulsar involves making some [configuration changes](#configuration) to your Pulsar brokers as well as some minor changes to the behavior of Pulsar [clients](#clients). + +{% include admonition.html type="info" content="For a more thorough theoretical explanation of message deduplication, see the [Concepts and Architecture](../../getting-started/ConceptsAndArchitecture#message-deduplication) document." %} + +## How it works + +Message deduplication can be enabled and disabled on a per-{% popover namespace %} basis. By default, it is *disabled* on all namespaces and can enabled in the following ways: + +* Using the [`pulsar-admin namespaces`](#enabling) interface +* As a {% popover broker %}-level [default](#default) for all namespaces + +## Configuration for message deduplication {#configuration} + +You can configure message deduplication in Pulsar using the [`broker.conf`](../../reference/Configuration#broker) configuration file. The following deduplication-related parameters are available: + +Parameter | Description | Default +:---------|:------------|:------- +`brokerDeduplicationEnabled` | Sets the default behavior for message deduplication in the Pulsar {% popover broker %}. If set to `true`, message deduplication will be enabled by default on all namespaces; if set to `false` (the default), deduplication will have to be [enabled](#enabling) and [disabled](#disabling) on a per-namespace basis. | `false` +`brokerDeduplicationMaxNumberOfProducers` | The maximum number of producers for which information will be stored for deduplication purposes. | `10000` +`brokerDeduplicationEntriesInterval` | The number of entries after which a deduplication informational snapshot is taken. A larger interval will lead to fewer snapshots being taken, though this would also lengthen the topic recovery time (the time required for entries published after the snapshot to be replayed). | `1000` +`brokerDeduplicationProducerInactivityTimeoutMinutes` | The time of inactivity (in minutes) after which the broker will discard deduplication information related to a disconnected producer. | `360` (6 hours) + +Any configuration changes you make won't take effect until you re-start the broker. + +### Setting the broker-level default {#default} + +By default, message deduplication is *disabled* on all Pulsar namespaces. To enable it by default on all namespaces, set the `brokerDeduplicationEnabled` parameter to `true` and re-start the broker. + +Regardless of the value of `brokerDeduplicationEnabled`, [enabling](#enabling) and [disabling](#disabling) via the CLI will override the broker-level default. + +### Enabling message deduplication {#enabling} + +You can enable message deduplication on specific namespaces, regardless of the the [default](#default) for the broker, using the [`pulsar-admin namespace set-deduplication`](../../CliTools#pulsar-admin-namespace-set-deduplication) command. You can use the `--enable`/`-e` flag and specify the namespace. Here's an example: + +```bash +$ bin/pulsar-admin namespaces set-deduplication \ + persistent://sample/standalone/ns1/topic-1 \ + --enable # or just -e +``` + +### Disabling message deduplication {#disabling} + +You can disable message deduplication on a specific namespace using the same method shown [above](#enabling), except using the `--disable`/`-d` flag instead. Here's an example: + +```bash +$ bin/pulsar-admin namespaces set-deduplication \ + persistent://sample/standalone/ns1/topic-1 \ + --disable # or just -d +``` + +## Message deduplication and Pulsar clients {#clients} + +If you enable message deduplication in your Pulsar {% popover brokers %}, you won't need to make any major changes to your Pulsar clients. There are, however, two settings that you need to provide for your client {% popover producers %}: + +1. The producer must be given a name +1. The message send timeout needs to be set to infinity (i.e. no timeout) + +Instructions for [Java](#java), [Python](#python), and [C++](#cpp) clients can be found below. + +### Java clients {#java} + +To enable message deduplication on a [Java producer](../../clients/Java#producers), set the producer name using the `producerName` setter and set the timeout to 0 using the `sendTimeout` setter. Here's an example: + +```java +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import java.util.concurrent.TimeUnit; + +PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .build(); +Producer producer = pulsarClient.newProducer() + .producerName("producer-1") + .topic("persistent://sample/standalone/ns1/topic-1") + .sendTimeout(0, TimeUnit.SECONDS) + .create(); +``` + +### Python clients {#python} + +To enable message deduplication on a [Python producer](../../clients/Python#producers), set the producer name using `producer_name` and the timeout to 0 using `send_timeout_millis`. Here's an example: + +```python +import pulsar + +client = pulsar.Client("pulsar://localhost:6650") +producer = client.create_producer( + "persistent://sample/standalone/ns1/topic-1", + producer_name="producer-1", + send_timeout_millis=0) +``` + +## C++ clients {#cpp} + +To enable message deduplication on a [C++ producer](../../clients/Cpp#producer), set the producer name using `producer_name` and the timeout to 0 using `send_timeout_millis`. Here's an example: + +```cpp +#include <pulsar/Client.h> + +std::string serviceUrl = "pulsar://localhost:6650"; +std::string topic = "persistent://prop/unit/ns1/topic-1"; +std::string producerName = "producer-1"; + +Client client(serviceUrl); + +ProducerConfiguration producerConfig; +producerConfig.setSendTimeout(0); +producerConfig.setProducerName(producerName); + +Producer producer; + +Result result = client.createProducer("persistent://sample/standalone/ns1/my-topic", producerConfig, producer); +``` \ No newline at end of file diff --git a/site/docs/latest/getting-started/ConceptsAndArchitecture.md b/site/docs/latest/getting-started/ConceptsAndArchitecture.md index 7557cdb071..66c7558741 100644 --- a/site/docs/latest/getting-started/ConceptsAndArchitecture.md +++ b/site/docs/latest/getting-started/ConceptsAndArchitecture.md @@ -3,6 +3,7 @@ title: Pulsar concepts and architecture lead: A high-level overview of Pulsar's moving pieces tags: - architecture +- deduplication --- <!-- @@ -315,7 +316,7 @@ Pulsar has two features, however, that enable you to override this default behav * Message **retention** enables you to store messages that have been acknowledged by a consumer * Message **expiry** enables you to set a time to live (TTL) for messages that have not yet been acknowledged -{% include admonition.html type="info" content='All message retention and expiry is managed at the [namespace](#namespaces) level. For a how-to, see the [Message retention and expiry](../../advanced/RetentionExpiry) admin documentation.' %} +{% include admonition.html type="info" content='All message retention and expiry is managed at the [namespace](#namespaces) level. For a how-to, see the [Message retention and expiry](../../cookbooks/RetentionExpiry) cookbook.' %} The diagram below illustrates both concepts: @@ -333,9 +334,35 @@ For an in-depth look at Pulsar Functions, see the [Pulsar Functions overview](.. Pulsar enables messages to be produced and consumed in different geo-locations. For instance, your application may be publishing data in one region or market and you would like to process it for consumption in other regions or markets. [Geo-replication](../../admin/GeoReplication) in Pulsar enables you to do that. +## Message deduplication + +Message **duplication** occurs when a message is [persisted](#persistent-storage) by Pulsar more than once. Message ***de*duplication** is an optional Pulsar feature that prevents unnecessary message duplication by processing each message only once, *even if the message is received more than once*. + +The following diagram illustrates what happens when message deduplication is disabled vs. enabled: + +{% img /img/message-deduplication.png 75 %} + +Message deduplication is disabled in the scenario shown at the top. Here, a producer publishes message 1 on a topic; the message reaches a Pulsar {% popover broker %} and is [persisted](#persistent-storage) to BookKeeper. The producer then sends message 1 again (in this case due to some retry logic), and the message is received by the broker and stored in BookKeeper again, which means that duplication has occurred. + +In the second scenario at the bottom, the producer publishes message 1, which is received by the broker and persisted, as in the first scenario. When the producer attempts to publish the message again, however, the broker knows that it has already seen message 1 and thus does not persist the message. + +{% include admonition.html type="info" content='Message deduplication is handled at the namespace level. For more instructions, see the [message deduplication cookbook](../../cookbooks/message-deduplication).' %} + +### Producer idempotency + +The other available approach to message deduplication is to ensure that each message is *only produced once*. This approach is typically called **producer idempotency**. The drawback of this approach is that it defers the work of message deduplication to the application. In Pulsar, this is handled at the {% popover broker %} level, which means that you don't need to modify your Pulsar client code. Instead, you only need to make administrative changes (see the [Managing message deduplication](../../cookbooks/message-deduplication) cookbook for a guide). + +### Deduplication and effectively-once semantics + +Message deduplication makes Pulsar an ideal messaging system to be used in conjunction with stream processing engines (SPEs) and other systems seeking to provide [effectively-once](https://blog.streaml.io/exactly-once/) processing semantics. Messaging systems that don't offer automatic message deduplication require the SPE or other system to guarantee deduplication, which means that strict message ordering comes at the cost of burdening the application with the responsibility of deduplication. With Pulsar, strict ordering guarantees come at no application-level cost. + +{% include admonition.html type="info" content=' +More in-depth information can be found in [this post](https://blog.streaml.io/pulsar-effectively-once/) on the [Streamlio blog](https://blog.streaml.io). +' %} + ## Multi-tenancy -Pulsar was created from the ground up as a {% popover multi-tenant %} system. To support multi-tenancy, Pulsar has a concept of {% popover properties %}. Properties can be spread across {% popover clusters %} and can each have their own [authentication and authorization](../../admin/Authz) scheme applied to them. They are also the administrative unit at which [storage quotas](TODO), [message TTL](../../advanced/RetentionExpiry#time-to-live-ttl), and isolation policies can be managed. +Pulsar was created from the ground up as a {% popover multi-tenant %} system. To support multi-tenancy, Pulsar has a concept of {% popover properties %}. Properties can be spread across {% popover clusters %} and can each have their own [authentication and authorization](../../admin/Authz) scheme applied to them. They are also the administrative unit at which [storage quotas](TODO), [message TTL](../../cookbooks/RetentionExpiry#time-to-live-ttl), and isolation policies can be managed. The multi-tenant nature of Pulsar is reflected mostly visibly in topic URLs, which have this structure: @@ -411,7 +438,7 @@ The **reader interface** for Pulsar enables applications to manually manage curs * The **latest** available message in the topic * Some other message between the earliest and the latest. If you select this option, you'll need to explicitly provide a message ID. Your application will be responsible for "knowing" this message ID in advance, perhaps fetching it from a persistent data store or cache. -The reader interface is helpful for use cases like using Pulsar to provide [effectively-once](https://streaml.io/blog/exactly-once/) processing semantics for a stream processing system. For this use case, it's essential that the stream processing system be able to "rewind" topics to a specific message and begin reading there. The reader interface provides Pulsar clients with the low-level abstraction necessary to "manually position" themselves within a topic. +The reader interface is helpful for use cases like using Pulsar to provide [effectively-once](https://blog.streaml.io/exactly-once/) processing semantics for a stream processing system. For this use case, it's essential that the stream processing system be able to "rewind" topics to a specific message and begin reading there. The reader interface provides Pulsar clients with the low-level abstraction necessary to "manually position" themselves within a topic. <img src="/img/pulsar-reader-consumer-interfaces.png" alt="The Pulsar consumer and reader interfaces" width="80%"> diff --git a/site/img/message-deduplication.png b/site/img/message-deduplication.png new file mode 100644 index 0000000000..23e3e6009d Binary files /dev/null and b/site/img/message-deduplication.png differ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services