EronWright opened a new issue #12267: URL: https://github.com/apache/pulsar/issues/12267
- **Status:** Draft - **Author:** Eron Wright - **Pull Request:** - **Mailing List discussion:** (discussion) (vote) - **Release:** ## Motivation Unified stream processing frameworks such as Apache Flink rely on event-time watermarks to perform time-based computations. Watermarks facilitate forward progress in the processing of unbounded streams, and may be understood as an assertion made by the producer about the progression of event time. Pulsar has a limited concept of event time today: the `eventTime` field of a message. This field (or a timestamp extracted from the message body) is sometimes used by the consumer to generate watermarks, e.g. using the bounded out-of-orderness heuristic. This approach is highly sensitive to Pulsar’s system internals, especially in historical processing and when working with partitioned topics. ## Goal The proposal is to enhance Pulsar to broker watermarks from producers to consumers as an alternative to heuristics-based watermarks. Watermarks are a new type of message, generated by producers, aggregated by brokers, and delivered by subscriptions, with special semantics for routing and dispatching. The intention is that all routing and subscription modes be supported. This means that numerous producers may emit watermarks for a given topic, and are broadcast to all partitions of the topic. The broker aggregates these watermarks and dispatches to subscribers a watermark that is the minimum across all producers for the current position in the topic. The watermark should advance as messages are _acknowledged_, to support consumer failover and various subscription modes. Transactional messaging should be fully supported. ## API Changes ### Producer API The producer emits watermarks via a new method on the Producer interface. The producer is making an assertion that the minimum event time for any subsequent message is at least the specified time (see `setEventTime` field of `TypedMessageBuilder<T>`). ```java package org.apache.pulsar.client.api; public interface Producer<T> extends Closeable { /** * Create a new watermark builder. * * @return a watermark builder that can be used to construct the watermark to be sent through this producer. */ WatermarkBuilder newWatermark(); } ``` Watermarks may also be sent transactionally: ```java public interface Producer<T> extends Closeable { /** * Create a new watermark builder with transaction. */ WatermarkBuilder newWatermark(Transaction txn); } ``` The builder allows for an event time to be set on the watermark, and/or for the producer to be idled (taken out of consideration for watermarking purposes). ```java public interface WatermarkBuilder extends Serializable { /** * Set the event time for a given watermark. * * @return the watermark builder instance */ WatermarkBuilder eventTime(long timestamp); /** * Marks the producer as idle. * * @return the watermark builder instance */ WatermarkBuilder markIdle(); CompletableFuture<WatermarkId> sendAsync(); WatermarkId send() throws PulsarClientException; } ``` ### Consumer API The consumer opts into watermarking using a new method on the `ConsumerBuilder`. There are minor semantic changes to the read methods which warrant this. ```java public interface ConsumerBuilder<T> extends Cloneable { /** * Enable or disable receiving watermarks. * @param watermarkingEnabled true if watermarks should be delivered. * @return */ ConsumerBuilder<T> enableWatermarking(boolean watermarkingEnabled); } ``` The consumer receives watermarks using a new method on the `MessageListener` interface. ```java package org.apache.pulsar.client.api; public interface MessageListener<T> extends Serializable { /** * This method is called whenever a new watermark is received. * * <p>Watermarks are guaranteed to be delivered in order (with respect * to messages) and from the same thread for a single consumer. * * <p>This method will only be called once for each watermark * @param consumer * the consumer that received the message * @param watermark * the watermark object */ default void receivedWatermark(Consumer<T> consumer, Watermark watermark) { // no-op } } ``` The consumer may also obtain the latest watermark via the `Consumer` interface. This should be called after read or `readAsync` completes. To accelerate the receipt of watermarks, any outstanding async read is automatically completed with a null message. It is suggested that apps use read with a timeout if the synchronous approach is preferred. ```java public interface Consumer<T> extends Closeable { /** * @return The latest watermark, or null if watermarking is not enabled or a watermark has not been received. */ Watermark getLastWatermark(); } ``` The consumer receives watermarks on the same thread as ordinary messages. The consumer may expect that any subsequent message will have an event timestamp of at least the watermark value. If the expectation is violated, it is due to a false assertion made by a producer. The application should treat such messages as true late messages. ## Implementation ### Protocol Watermarks sent to the broker are sent and persisted into the managed ledger as “marker” messages (using the existing `markerType` metadata field). A number of variants are defined below. Note that marker messages aren't vended directly to consumers. Watermarks sent by the broker to the consumer aren’t messages; watermarks aren’t acknowledged nor have a `MessageId`. #### Marker: Set Watermark A message with marker type `W_SET` records a given producer’s watermark assertion, with the corresponding timestamp stored in the `eventTime` metadata field, and producer name stored in the `producerName` metadata field. Once a watermark from a given producer is stored in the ledger, the producer becomes an active member of the set of producers to consider when materializing an overall watermark (based on the minimum across active producers). There is no explicit way for a producer to join the set aside from sending an initial watermark. To maximize correctness, producers should eagerly send an initial watermark (e.g. during an initialization phase). #### Marker: Idle Producer A message with marker type `W_IDLE` removes a given producer from the set of producers to consider when materializing an overall watermark, with producer name stored in the `producerName` metadata field. #### Marker: Watermark Snapshot A message with marker type `W_SNAPSHOT` records a snapshot of the latest watermark information across all active producers. Note that watermarks are transactional, thus the snapshot also stores uncommitted watermarks (organized by transaction ID). #### Command: SendWatermark The dispatcher uses a new command, `SendWatermark`, to send a watermark to a consumer. The watermark simply consists of an `eventTime` field. Watermark commands are sent when a consumer first attaches to a subscription (if a watermark is available), and when the watermark changes. ### Broker The broker materializes watermarks for consumers based on watermark messages in the ledger. The broker aggregates the watermarks sent by the producers to determine the effective watermark for a given offset in a topic or partition. #### Watermark Cursor Consumers use subscriptions to coordinate message delivery according to a subscription mode and to track message acknowledgement. For each subscription, the broker uses a secondary managed cursor to materialize the effective (i.e. minimum) watermark across all producers. The watermark tracks _acknowledged messages_, so that the watermark doesn't advance prematurely for any one consumer. This allows the system to work well with all subscription types, for example in failover mode where un-acked messages from one consumer may be redirected to another consumer. Implementation-wise, the watermark cursor tracks the _mark-delete point_ of the subscription's main cursor. The watermark cursor is durable or non-durable, consistent with the durability of the main cursor. The mark-delete point of the watermark cursor tracks the latest watermark snapshot. #### Watermark Snapshots The broker must be able to efficiently materialize a watermark from a given topic position, to support subscriptions where the initial position is specified, to support seeking, etc. It would be impractical to scan the ledger from the beginning to gather the watermark messages. Also, watermark messages aren’t retained indefinitely. To solve this requirement, the broker periodically writes a snapshot message to the ledger. The snapshot consists of the latest watermark keyed by producer name, and a table of outstanding transactions, with each transaction consisting of the latest watermark keyed by producer name. New policy elements are defined to control snapshotting, e.g. limiting the amount of message data between snapshots. #### Watermark Generator Within the broker, the watermark cursor is encapsulated in a `WatermarkGenerator` that accepts tracking updates from the subscription. ```java package org.apache.pulsar.broker.service.eventtime; public interface WatermarkGenerator { /** * Get the current watermark. * @return */ Long getWatermark(); /** * Advance the tracking position of the watermark generator. */ CompletableFuture<Void> seek(Position position); /** * Register a listener for changes to the watermark. */ setListener(WatermarkGeneratorListener listener); } ``` The generator materializes a watermark corresponding to a given tracking position, with the semantic that the watermark reflects the minimum event timestamp of any message subsequent to that position in the ledger. A managed cursor is used for this purpose. The generator reads the ledger from the earliest available message or from the latest watermark snapshot, vending watermarks to the registered listener as watermark messages are observed. The generator is careful not to read past the tracking position. As the tracking position advances, reading continues. The generator is also responsible for generating snapshots, and advancing the mark-delete position of the watermark cursor such that the latest snapshot is retained. When a subscription is created or seeked, the generator must read backwards from the tracking position to locate the most recent snapshot. The generator supports transactions. If an uncommitted watermark is encountered, the generator holds it in state until the transaction is committed. This state is also snapshotted. #### Dispatcher The dispatcher for a given subscription listens to the watermark generator. The dispatcher forwards watermarks to all consumers and handles sending the latest watermark to any new consumer. The latest watermark may be dispatched without considering unacknowledged messages, and may use broadcast semantics in all cases. ## Reject Alternatives -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org