This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1708933303a [improve][pip] PIP-439: Adding Transaction Support to 
Pulsar Functions Through Managed Transaction Wrapping (#24704)
1708933303a is described below

commit 1708933303ab6cc101b109e79fd328aef66a8a84
Author: wzhramc <[email protected]>
AuthorDate: Wed Jan 14 09:56:30 2026 +0100

    [improve][pip] PIP-439: Adding Transaction Support to Pulsar Functions 
Through Managed Transaction Wrapping (#24704)
---
 pip/pip-439.md | 413 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 413 insertions(+)

diff --git a/pip/pip-439.md b/pip/pip-439.md
new file mode 100644
index 00000000000..569411d8234
--- /dev/null
+++ b/pip/pip-439.md
@@ -0,0 +1,413 @@
+# PIP-439: Adding Transaction Support to Pulsar Functions Through Managed 
Transaction Wrapping
+
+# Background knowledge
+
+Apache Pulsar transactions enable atomic operations across multiple topics, 
allowing producers to send messages and consumers to acknowledge messages as a 
single unit
+of work. This provides the foundation for exactly-once processing semantics in 
streaming applications.
+
+## Transaction Architecture
+
+Pulsar's transaction system consists of four key components:
+
+1. **Transaction Coordinator (TC)**: A broker module that manages transaction 
lifecycles, allocates transaction IDs, and orchestrates the commit/abort 
process.
+
+2. **Transaction Log**: A persistent topic storing transaction metadata and 
state changes, enabling recovery after failures.
+
+3. **Transaction Buffer**: Temporarily stores messages produced within 
transactions, making them visible to consumers only after commit.
+
+4. **Pending Acknowledge State**: Tracks message acknowledgments within 
transactions, preventing conflicts between competing transactions.
+
+## Transaction Lifecycle
+
+Transactions follow a defined lifecycle:
+
+1. **OPEN**: Client obtains a transaction ID from the Transaction Coordinator.
+2. **PRODUCING/ACKNOWLEDGING**: Client registers topic 
partitions/subscriptions with the TC, then produces/acknowledges messages 
within the transaction.
+3. **COMMITTING/ABORTING**: Client requests to end the transaction, TC begins 
two-phase commit.
+4. **COMMITTED/ABORTED**: After processing all partitions, TC finalizes the 
transaction state.
+5. **TIMED_OUT**: Transactions exceeding their timeout are automatically 
aborted.
+
+## Transaction Guarantees
+
+Pulsar transactions provide:
+- Atomic writes across multiple topics
+- Conditional acknowledgment to prevent duplicate processing by "zombie" 
instances
+- Visibility control ensuring consumers only see committed transaction messages
+- Support for exactly-once processing in consume-transform-produce patterns
+
+## Pulsar Functions
+
+Pulsar Functions is a lightweight compute framework integrated with Apache 
Pulsar that
+enables stream processing without managing infrastructure. Key characteristics 
include:
+ - Simple Programming Model: Functions receive messages, process them, and 
optionally
+produce output
+ - Processing Patterns: Supports both synchronous and asynchronous message 
processing
+ - Context Object: Provides access to message metadata, output production, and 
state
+storage
+ - Integration: Natively integrated with Pulsar's pub-sub messaging system
+ - Deployment: Managed by Pulsar with automatic scaling and fault tolerance
+
+Functions operate on a per-message basis, making them ideal for implementing 
stream
+processing with exactly-once semantics when combined with transactions.
+
+# Motivation
+
+Currently, Pulsar Functions cannot publish to multiple topics transactionally, 
which is a significant limitation for use cases requiring atomic multi-topic
+publishing. For instance, if a function processes an input message and needs 
to publish related updates to several output topics, there's no guarantee that 
all
+operations will succeed atomically.
+
+This limitation prevents building robust stream processing applications that 
require exactly-once semantics across multiple input and output topics. Without
+transaction support in Functions, developers must implement their own error 
handling and retry mechanisms, which can be complex and error-prone.
+
+Adding transaction support to Pulsar Functions would finally ensure message 
processing atomicity.
+
+# Goals
+
+## In Scope
+
+1. Enable automatic transaction support for Pulsar Functions through 
configuration
+2. Allow Functions to publish messages to multiple topics within a single 
transaction
+3. Support transactional acknowledgment of input messages
+4. Ensure transactions are committed only if message processing completes 
successfully
+5. Provide transaction timeout configuration for Functions
+6. Add transaction support for async functions
+7. Handling multiple transactions in batches to improve performance, added in 
a later phase of implementation
+
+## Out of Scope
+
+1. Exposing explicit transaction management APIs in the Functions interface
+2. Supporting multi-function transactions (transactions spanning multiple 
function invocations)
+3. Adding transaction support to Pulsar IO connectors
+4. Changes to the Function interface itself
+
+# High Level Design
+
+The proposed solution introduces managed transaction wrapping for Pulsar 
Functions through configuration settings. When enabled, each function execution 
will be automatically wrapped in a transaction without requiring code changes 
to the function implementation.
+
+The general flow will be:
+1. Function is configured with `transactionMode: MANAGED`
+2. When a message arrives, the function runtime creates a new transaction
+3. The function processes the message with an enhanced Context that uses the 
transaction
+4. Any output messages are published using the transaction
+5. Input message acknowledgment is performed within the transaction
+6. If the function completes successfully, the transaction is committed
+7. If the function throws an exception, the transaction is aborted
+
+This approach provides transaction support in a way that is transparent to 
function implementers, requiring only configuration changes rather than code 
changes.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+### Configuration Classes
+
+We will update the FunctionConfig to include transaction-related settings 
through a new `TransactionConfig` class:
+
+```java
+public enum TransactionMode {
+  OFF,
+  MANAGED
+}
+
+public class TransactionConfig {
+  private TransactionMode transactionMode = TransactionMode.OFF;
+  private Long transactionTimeoutMs = 60000L;
+  private Integer transactionBatchingMaxEntries = 1;
+  private Long transactionBatchingQuietPeriodMs = 100L;
+
+  // Getters and setters...
+}
+
+public class FunctionConfig {
+  // Existing fields...
+
+  private TransactionConfig transaction = new TransactionConfig();
+
+  // Getter and setter ...
+}
+```
+
+```java
+We also need to update the protobuf definition for FunctionDetails to include 
these fields:
+
+message TransactionSpec {
+  enum TransactionMode {
+      OFF = 0;
+      MANAGED = 1;
+  }
+  TransactionMode transactionMode = 1;
+  int64 transactionTimeoutMs = 2;
+  int64 transactionBatchingMaxEntries = 3;
+  int64 transactionBatchingQuietPeriodMs = 4;
+}
+
+message FunctionDetails {
+  // Other existing fields...
+  TransactionSpec transaction = 24;
+}
+```
+
+### Modifications to ContextImpl
+
+
+```java
+class ContextImpl implements Context, SinkContext, SourceContext, 
AutoCloseable {
+    // Existing fields...
+
+    // Finds the proper transaction to tie to current function execution 
(sync/async)
+    private Transaction getManagedTransaction() {
+
+        // implementation...
+    }
+
+    // Existing methods...
+
+    public void setCurrentTransaction(Transaction transaction) {
+        this.currentTransaction = transaction;
+    }
+
+    @Override
+    public <T> TypedMessageBuilder<T> newOutputMessage(String topicName, 
Schema<T> schema)
+          throws PulsarClientException {
+      MessageBuilderImpl<T> messageBuilder = new MessageBuilderImpl<>();
+      TypedMessageBuilder<T> typedMessageBuilder;
+      Producer<T> producer = getProducer(topicName, schema);
+      Transaction managedTransaction = getManagedTransaction(); 
+    
+      if (currentTransaction != null) {
+          if (schema != null) {
+              // Uses the new API that supports both schema and transaction
+              typedMessageBuilder = producer.newMessage(schema, 
managedTransaction);
+          } else {
+              typedMessageBuilder = producer.newMessage(managedTransaction);
+          }
+      } else if (schema != null) {
+          typedMessageBuilder = producer.newMessage(schema);
+      } else {
+          typedMessageBuilder = producer.newMessage();
+      }
+    
+      messageBuilder.setUnderlyingBuilder(typedMessageBuilder);
+      return messageBuilder;
+    }
+}
+```
+
+## Asynchronous Functions Support
+
+It's important to note that Pulsar Functions supports asynchronous processing, 
where functions can return `CompletableFuture` objects. This proposal ensures 
that transaction support works seamlessly with both synchronous and 
asynchronous functions.
+
+For asynchronous functions:
+1. The transaction is created at the beginning of message processing, just 
like for synchronous functions
+2. When the function returns a `CompletableFuture`, the transaction is 
maintained until the future completes
+ - Any Context-related operations inside of the returned 'CompletableFuture' 
objects are tied to the correct transaction
+3. When the future completes successfully, the transaction is committed
+4. If the future completes exceptionally, the transaction is aborted
+
+## Batch Processing of Transactions
+
+To optimize performance and reduce the overhead on the Transaction 
Coordinator, this proposal introduces transaction batching.
+Transaction batching allows multiple incoming messages to be processed within 
the same transaction, reducing the total number of
+transactions created.
+
+## Transaction Batching Concept
+
+Transaction batching is distinct from Pulsar's message batching. While message 
batching combines multiple messages into a single "batch
+message" for efficient network transfer, transaction batching processes 
multiple incoming messages (or batch messages) within the scope
+of a single transaction.
+
+Key benefits of transaction batching include:
+1. **Reduced Load on Transaction Coordinator**: Fewer transactions means less 
coordination overhead
+2. **Improved Throughput**: Higher message processing capacity with lower 
per-message overhead
+3. **Optimized Resource Usage**: Better utilization of transaction resources
+4. **Consistent Performance at Scale**: Maintains performance characteristics 
under high load
+
+### Transaction Batching Parameters
+
+Transaction batching is controlled by two main parameters:
+
+1. **`transactionBatchingMaxEntries`**: The maximum number of entries 
(incoming messages or batch messages) to process within a single
+transaction before committing it and starting a new one.
+ - An "entry" refers to an incoming batch message which could itself contain 
multiple individual messages
+ - Default: 1 (recommended minimum to ensure batch index acknowledgment state 
doesn't span transactions)
+ - Setting this to **0 disables** transaction batching!
+ - Setting this to higher values increases throughput but may impact latency
+
+
+2. **`transactionBatchingQuietPeriodMs`**: The maximum amount of time to wait 
for additional messages before committing a transaction if
+`transactionBatchingMaxEntries` is not reached.
+ - Default: 1ms (matching the Pulsar Client producer's default 
`batchingMaxPublishDelay` value)
+ - This parameter handles scenarios where message flow isn't continuous
+ - When the quiet period elapses without new messages arriving, the current 
transaction is committed
+
+
+# Public-facing Changes
+
+## Configuration
+
+Transaction support will be configured using the new TransactionConfig class:
+
+```java
+TransactionConfig txnConfig = new TransactionConfig();
+txnConfig.setTransactionMode(TransactionMode.MANAGED);
+txnConfig.setTransactionTimeoutMs(30000L); // 30 seconds
+txnConfig.setTransactionBatchingMaxEntries(10); // Process up to 10 entries 
per transaction
+txnConfig.setTransactionBatchingQuietPeriodMs(1L); // Commit after 1ms of 
inactivity
+
+FunctionConfig functionConfig = new FunctionConfig();
+functionConfig.setTransaction(txnConfig);
+// Other configuration...
+```
+
+The Functions worker configuration will include a setting to en-/disable 
`pulsar_function_txn_latency`:
+
+```yaml
+# Transaction metrics configuration
+transactionMetrics:
+  # Enable/disable specific transaction metrics individually
+  txnLatency: false          # Transaction duration histograms (high 
cardinality)
+    # ... disable any future metric deemed as too expensive to enable per 
default
+```
+
+
+
+## CLI
+
+The Pulsar Admin CLI will be updated to support these new configuration 
options:
+
+```bash
+$ pulsar-admin functions create \
+  --auto-transactions-enabled true \
+  --transaction-timeout-ms 30000 \
+  --transaction-batching-max-entries 10 \
+  --transaction-batching-quiet-period-ms 1 \
+  ... other options ...
+```
+
+Similarly, the CLI will support updating these options with the update command.
+
+# Metrics
+
+The following new metrics will be added to track transaction usage in 
functions:
+
+1. `pulsar_function_txn_created_total`: Counter tracking the total number of 
transactions created by functions
+ - Labels: `tenant`, `namespace`, `name` (function name), `instance_id`, 
`cluster`
+ - Unit: Count
+
+2. `pulsar_function_txn_committed_total`: Counter tracking successfully 
committed transactions
+ - Labels: `tenant`, `namespace`, `name` (function name), `instance_id`, 
`cluster`
+ - Unit: Count
+
+3. `pulsar_function_txn_aborted_total`: Counter tracking aborted transactions
+ - Labels: `tenant`, `namespace`, `name` (function name), `instance_id`, 
`cluster`
+ - Unit: Count
+
+4. `pulsar_function_txn_timeout_total`: Counter tracking transactions that 
timed out
+ - Labels: `tenant`, `namespace`, `name` (function name), `instance_id`, 
`cluster`
+ - Unit: Count
+
+5. `pulsar_function_txn_latency`: Histogram of transaction duration from 
creation to commit/abort (configurable in functions worker)
+ - Labels: `tenant`, `namespace`, `name` (function name), `instance_id`, 
`cluster`
+ - Unit: Milliseconds
+ - turned off by default to avoid adding too much volume and will be 
configurable in the functions worker
+
+6. `pulsar_function_txn_batch_size`: Histogram of transaction batch sizes 
(number of entries processed per transaction)
+ - Labels: `tenant`, `namespace`, `name` (function name), `instance_id`, 
`cluster`
+ - Unit: Count
+
+7. `pulsar_function_txn_batch_commit_reason`: Counter tracking transaction 
batch commits by reason
+ - Labels: `tenant`, `namespace`, `name` (function name), `instance_id`, 
`cluster`, `reason` ("max_entries", "quiet_period",
+"function_close")
+ - Unit: Count
+
+8. `pulsar_function_txn_entries_per_second`: Gauge tracking the rate of 
entries processed in transactions
+ - Labels: `tenant`, `namespace`, `name` (function name), `instance_id`, 
`cluster`
+ - Unit: Entries/second
+
+# Monitoring
+
+To monitor the transaction functionality in Pulsar Functions, users should:
+
+1. Monitor the transaction metrics mentioned above to track transaction usage 
and success/failure rates.
+2. Set up alerts for high transaction abort rates or timeouts, which may 
indicate issues with function processing or transaction configuration.
+3. Monitor function processing latency metrics to ensure that using 
transactions doesn't introduce unacceptable overhead.
+4. Set up alerts for transaction timeout occurrences, which might indicate 
that the configured timeout is too low for the function's processing time.
+5. Configure appropriate logging levels to capture transaction-related errors 
for debugging.
+
+Example alert thresholds:
+- Transaction abort rate > 5% over 5 minutes
+- Transaction timeout rate > 1% over 5 minutes
+- Transaction duration approaching timeout value (e.g., > 80% of configured 
timeout)
+
+# Security Considerations
+
+The proposed transaction support doesn't introduce new security concerns as it 
builds on top of existing Pulsar transaction mechanisms. All security aspects of
+transactions, including authentication and authorization, are inherited from 
the Pulsar client's transaction implementation.
+
+Functions will only be able to create transactions and operate on topics they 
already have permission to access. No additional permissions are required 
beyond what's already needed for the function to operate.
+
+# Backward & Forward Compatibility
+
+## Upgrade
+
+The proposed changes are backward compatible with existing Pulsar Functions:
+
+1. The new transaction mode defaults to `OFF` so existing functions will 
continue to operate without transaction support.
+2. Existing functions can enable transaction support by simply updating their 
configuration.
+3. The Function interface remains unchanged, so existing function 
implementations will work with transaction support when enabled.
+
+To enable transaction support for existing functions:
+
+1. Ensure transactions are enabled at the broker level.
+2. Update the function configuration to set `transactionMode: MANAGED` and 
optionally configure transactionTimeoutMs.
+3. Restart or update the function to apply the new configuration.
+
+## Downgrade / Rollback
+
+
+When downgrading to a version that doesn't support the transaction features, 
several
+steps need to be taken to ensure compatibility:
+
+1. Configuration Reset:
+ - Update all function configurations to set `transactionMode: OFF` before 
downgrading
+ - If the downgrade target doesn't recognize the transaction configuration 
fields,
+these fields should be explicitly removed from the function configuration to 
prevent
+errors
+2. Function Updates:
+ - After downgrading Pulsar, any functions that were using managed 
transactions must be
+updated or redeployed
+ - This ensures that the runtime configurations are compatible with the older 
version
+
+## Pulsar Upgrade & Downgrade/Rollback Considerations
+
+1. Transactions created by functions in one cluster will not span to other 
clusters; they are local to the cluster where the function executes.
+2. Functions running in different clusters should each be configured for 
transaction support independently.
+3. During rolling upgrades, ensure that transaction-enabled functions are only 
deployed to clusters that support transactions.
+4. When downgrading, first disable transactions in function configurations 
before downgrading the clusters.
+
+# Alternatives
+
+## Explicit Transaction API
+
+Instead of automatic transaction wrapping, we could expose explicit 
transaction APIs in the Context interface:
+
+```java
+Transaction newTransaction();
+void commitTransaction(Transaction txn);
+void abortTransaction(Transaction txn);
+```
+This approach gives function authors more control but requires code changes to 
use transactions. We rejected this approach to keep the programming model 
simpler
+and avoid breaking the Function interface.
+
+# General Notes
+
+The implementation of transaction support in Pulsar Functions should be 
considered a stepping stone toward a more comprehensive exactly-once processing 
model for
+Pulsar's stream processing capabilities. Future work may include Transaction 
support in Pulsar IO connectors.
+
+# Links
+
+- https://github.com/apache/pulsar/issues/24588
+- [Pulsar transactions](https://pulsar.apache.org/docs/txn-why/)
+- [Mailing List Kickoff Discussion 
Thread](https://lists.apache.org/thread/rll8qyovpd7t9v5yxth25qo44zksbgkn)
+- [Mailing List PIP Discussion 
Thread](https://lists.apache.org/thread/ztvxg5d2w526ov0w9c9l66tgpfogtvph)
+- [Mailing List PIP Voting 
Thread](https://lists.apache.org/thread/8g43xgv8d0p3m918c9zrpwn81p4lh9d4)

Reply via email to