EronWright opened a new issue #12267:
URL: https://github.com/apache/pulsar/issues/12267


   - **Status:** Draft
   - **Author:** Eron Wright
   - **Pull Request:**
   - **Mailing List discussion:** (discussion) (vote)
   - **Release:**
   
   ## Motivation
   
   Unified stream processing frameworks such as Apache Flink rely on event-time 
watermarks to perform time-based computations. Watermarks facilitate forward 
progress in the processing of unbounded streams, and may be understood as an 
assertion made by the producer about the progression of event time.
   
   Pulsar has a limited concept of event time today: the `eventTime` field of a 
message. This field (or a timestamp extracted from the message body) is 
sometimes used by the consumer to generate watermarks, e.g. using the bounded 
out-of-orderness heuristic. This approach is highly sensitive to Pulsar’s 
system internals, especially in historical processing and when working with 
partitioned topics.
   
   ## Goal
   
   The proposal is to enhance Pulsar to broker watermarks from producers to 
consumers as an alternative to heuristics-based watermarks.  Watermarks are a 
new type of message, generated by producers, aggregated by brokers, and 
delivered by subscriptions, with special semantics for routing and dispatching.
   
   The intention is that all routing and subscription modes be supported.  This 
means that numerous producers may emit watermarks for a given topic, and are 
broadcast to all partitions of the topic.  The broker aggregates these 
watermarks and dispatches to subscribers a watermark that is the minimum across 
all producers for the current position in the topic.  The watermark should 
advance as messages are _acknowledged_, to support consumer failover and 
various subscription modes.  Transactional messaging should be fully supported.
   
   ## API Changes
   
   ### Producer API
   The producer emits watermarks via a new method on the Producer interface.  
The producer is making an assertion that the minimum event time for any 
subsequent message is at least the specified time (see `setEventTime` field of 
`TypedMessageBuilder<T>`).
   
   ```java
   package org.apache.pulsar.client.api;
   
   public interface Producer<T> extends Closeable {
       /**
        * Create a new watermark builder.
        *
        * @return a watermark builder that can be used to construct the 
watermark to be sent through this producer.
        */
       WatermarkBuilder newWatermark();
   }
   ```
   
   Watermarks may also be sent transactionally:
   ```java
   public interface Producer<T> extends Closeable {
       /**
        * Create a new watermark builder with transaction.
        */
       WatermarkBuilder newWatermark(Transaction txn);
   }
   ```
   
   The builder allows for an event time to be set on the watermark, and/or for 
the producer to be idled (taken out of consideration for watermarking purposes).
   ```java
   public interface WatermarkBuilder extends Serializable {
       /**
        * Set the event time for a given watermark.
        *
        * @return the watermark builder instance
        */
       WatermarkBuilder eventTime(long timestamp);
       
       /**
        * Marks the producer as idle.
        *
        * @return the watermark builder instance
        */
       WatermarkBuilder markIdle();
   
       CompletableFuture<WatermarkId> sendAsync();
       
       WatermarkId send() throws PulsarClientException;
   }
   ```
   
   ### Consumer API
   The consumer opts into watermarking using a new method on the 
`ConsumerBuilder`. There are minor semantic changes to the read methods which 
warrant this.
   ```java
   public interface ConsumerBuilder<T> extends Cloneable {
       /**
        * Enable or disable receiving watermarks.
        * @param watermarkingEnabled true if watermarks should be delivered.
        * @return
        */
       ConsumerBuilder<T> enableWatermarking(boolean watermarkingEnabled);
   }
   ```
   
   The consumer receives watermarks using a new method on the `MessageListener` 
interface.
   ```java
   package org.apache.pulsar.client.api;
   
   public interface MessageListener<T> extends Serializable {
       /**
        * This method is called whenever a new watermark is received.
        *
        * <p>Watermarks are guaranteed to be delivered in order (with respect
        * to messages) and from the same thread for a single consumer.
        *
        * <p>This method will only be called once for each watermark
        * @param consumer
        *            the consumer that received the message
        * @param watermark
        *            the watermark object
        */
       default void receivedWatermark(Consumer<T> consumer, Watermark 
watermark) {
           // no-op
       }
   }
   ```
   
   The consumer may also obtain the latest watermark via the `Consumer` 
interface. This should be called after read or `readAsync` completes. To 
accelerate the receipt of watermarks, any outstanding async read is 
automatically completed with a null message.   It is suggested that apps use 
read with a timeout if the synchronous approach is preferred.
   ```java
   public interface Consumer<T> extends Closeable {
       /**
        * @return The latest watermark, or null if watermarking is not enabled 
or a watermark has not been received.
        */
       Watermark getLastWatermark();
   }
   ```
   
   The consumer receives watermarks on the same thread as ordinary messages. 
The consumer may expect that any subsequent message will have an event 
timestamp of at least the watermark value. If the expectation is violated, it 
is due to a false assertion made by a producer. The application should treat 
such messages as true late messages.
   
   ## Implementation
   
   ### Protocol
   Watermarks sent to the broker are sent and persisted into the managed ledger 
as “marker” messages (using the existing `markerType` metadata field).  A 
number of variants are defined below.  Note that marker messages aren't vended 
directly to consumers.
   
   Watermarks sent by the broker to the consumer aren’t messages; watermarks 
aren’t acknowledged nor have a `MessageId`.
   
   #### Marker: Set Watermark
   A message with marker type `W_SET` records a given producer’s watermark 
assertion, with the corresponding timestamp stored in the `eventTime` metadata 
field, and producer name stored in the `producerName` metadata field.
   
   Once a watermark from a given producer is stored in the ledger, the producer 
becomes an active member of the set of producers to consider when materializing 
an overall watermark (based on the minimum across active producers). There is 
no explicit way for a producer to join the set aside from sending an initial 
watermark. To maximize correctness, producers should eagerly send an initial 
watermark (e.g. during an initialization phase).
   
   #### Marker: Idle Producer
   A message with marker type `W_IDLE` removes a given producer from the set of 
producers to consider when materializing an overall watermark, with producer 
name stored in the `producerName` metadata field.
   
   #### Marker: Watermark Snapshot
   A message with marker type `W_SNAPSHOT` records a snapshot of the latest 
watermark information across all active producers.  Note that watermarks are 
transactional, thus the snapshot also stores uncommitted watermarks (organized 
by transaction ID). 
   
   #### Command: SendWatermark
   The dispatcher uses a new command, `SendWatermark`, to send a watermark to a 
consumer.  The watermark simply consists of an `eventTime` field.
   
   Watermark commands are sent when a consumer first attaches to a subscription 
(if a watermark is available), and when the watermark changes.
   
   ### Broker
   The broker materializes watermarks for consumers based on watermark messages 
in the ledger. The broker aggregates the watermarks sent by the producers to 
determine the effective watermark for a given offset in a topic or partition.
   
   #### Watermark Cursor
   Consumers use subscriptions to coordinate message delivery according to a 
subscription mode and to track message acknowledgement.  For each subscription, 
the broker uses a secondary managed cursor to materialize the effective (i.e. 
minimum) watermark across all producers. The watermark tracks _acknowledged 
messages_, so that the watermark doesn't advance prematurely for any one 
consumer. This allows the system to work well with all subscription types, for 
example in failover mode where un-acked messages from one consumer may be 
redirected to another consumer.  Implementation-wise, the watermark cursor 
tracks the _mark-delete point_ of the subscription's main cursor.
   
   The watermark cursor is durable or non-durable, consistent with the 
durability of the main cursor.
   
   The mark-delete point of the watermark cursor tracks the latest watermark 
snapshot.
   
   #### Watermark Snapshots
   The broker must be able to efficiently materialize a watermark from a given 
topic position, to support subscriptions where the initial position is 
specified, to support seeking, etc.  It would be impractical to scan the ledger 
from the beginning to gather the watermark messages.  Also, watermark messages 
aren’t retained indefinitely.
   
   To solve this requirement, the broker periodically writes a snapshot message 
to the ledger.  The snapshot consists of the latest watermark keyed by producer 
name, and a table of outstanding transactions, with each transaction consisting 
of the latest watermark keyed by producer name.
   
   New policy elements are defined to control snapshotting, e.g. limiting the 
amount of message data between snapshots.
   
   #### Watermark Generator
   Within the broker, the watermark cursor is encapsulated in a 
`WatermarkGenerator` that accepts tracking updates from the subscription.
   
   ```java
   package org.apache.pulsar.broker.service.eventtime;
   
   public interface WatermarkGenerator {
       /**
        * Get the current watermark.
        * @return
        */
       Long getWatermark();
   
       /**
        * Advance the tracking position of the watermark generator.
        */
       CompletableFuture<Void> seek(Position position);
       
       /**
        * Register a listener for changes to the watermark.
        */
       setListener(WatermarkGeneratorListener listener);
   }
   ```
   
   The generator materializes a watermark corresponding to a given tracking 
position, with the semantic that the watermark reflects the minimum event 
timestamp of any message subsequent to that position in the ledger.  A managed 
cursor is used for this purpose.  The generator reads the ledger from the 
earliest available message or from the latest watermark snapshot, vending 
watermarks to the registered listener as watermark messages are observed.
   
   The generator is careful not to read past the tracking position.  As the 
tracking position advances, reading continues.
   
   The generator is also responsible for generating snapshots, and advancing 
the mark-delete position of the watermark cursor such that the latest snapshot 
is retained.  When a subscription is created or seeked, the generator must read 
backwards from the tracking position to locate the most recent snapshot.
   
   The generator supports transactions. If an uncommitted watermark is 
encountered, the generator holds it in state until the transaction is 
committed.  This state is also snapshotted.
   
   #### Dispatcher
   The dispatcher for a given subscription listens to the watermark generator. 
The dispatcher forwards watermarks to all consumers and handles sending the 
latest watermark to any new consumer.  The latest watermark may be dispatched 
without considering unacknowledged messages, and may use broadcast semantics in 
all cases.
   
   ## Reject Alternatives
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to