jianchun opened a new issue, #18270:
URL: https://github.com/apache/hudi/issues/18270

   ### Feature Description
   
   **What the feature achieves:**
   - **Pre-commit extension:** Users can supply extra key-value metadata (e.g. 
Kafka offsets, source timestamps) that is persisted with each Hudi commit.
   - **Post-commit extension:** Users can run custom logic after every 
successful commit (e.g. validation, bookkeeping, metrics, notifications) with 
access to commit context (instant, record counts, table meta client).
   - **Single plugin, clear lifecycle:** One hook implementation is loaded from 
configuration; it is initialized once, then invoked per checkpoint for 
pre-commit and post-commit, and closed on shutdown. The same hook instance can 
keep state across both phases (e.g. verify in post-commit using data gathered 
in pre-commit).
   
   **Why this feature is needed:**
   Streaming Hudi clients often need to:
   
   - Persist **source metadata** with commits (e.g. Kafka partition/offset or 
timestamps) for downstream consumers, debugging, or replay.
   - Do **bookkeeping or validation** after a commit (e.g. validate written 
record counts against offset deltas, update external state).
   - Emit **metrics** tied to commits (e.g. event-time latency, offset 
validation success/failure).
   
   Today this requires embedding logic in the Flink/Hudi codebase or using 
generic mechanisms. A dedicated hook gives a single extension point with 
explicit pre-commit and post-commit phases and shared context, so 
implementations can inject metadata and then run stateful post-commit logic 
(e.g. in post-commit, verify against information from pre-commit).
   
   **Relationship to [Post-commit 
Callback](https://hudi.apache.org/docs/platform_services_post_commit_callback/):**
 The existing callback is stateless and does not participate in building commit 
metadata; it is for “notify after commit” (e.g. HTTP/Kafka). The stream write 
commit hook is stateful and two-phase: it contributes commit metadata and runs 
post-commit with rich context. Use the callback for simple notifications; use 
the hook when you need both metadata injection and stateful post-commit logic 
in one plugin.
   
   
   ### User Experience
   
   **How users will use this feature:**
   - Configuration changes needed
   Two new Flink/Hudi options:
   
   | Option | Description | Required |
   |--------|-------------|----------|
   | `hoodie.stream.write.commit.hook.class` | Fully qualified class name of a 
class implementing `StreamWriteCommitHook`. Loaded via reflection. | No 
(optional). If unset or empty, no hook runs. |
   | `hoodie.stream.write.commit.hook.config` | Opaque config string (e.g. 
JSON) passed to the hook in `init(Configuration)`. Semantics are defined by the 
hook implementation. | No; hook can rely on other existing options if needed. |
   
   Example (e.g. in Flink job config):
   
   ```properties
   hoodie.stream.write.commit.hook.class=com.example.MyStreamWriteCommitHook
   
hoodie.stream.write.commit.hook.config={"serviceName":"my-svc","topicName":"my-topic"}
   ```
   - API changes
   - **New public interface:** 
`org.apache.hudi.sink.extensions.StreamWriteCommitHook` (Flink module). No new 
dependencies; interface extends `Closeable`.
   - **New loader:** `StreamWriteCommitHookLoader.load(Configuration)` — 
returns a hook instance or `null` if not configured / load failure (fail-open: 
commit proceeds without hook).
   - **New context class:** `StreamWriteCommitHook.PostCommitContext` — holds 
`HoodieTableMetaClient`, instant, checkpoint ID, total records, total error 
records for use in `postCommit(...)`.
   
   Implementations must:
   
   1. Implement `StreamWriteCommitHook`.
   2. Have a no-arg constructor (reflective load).
   3. In `init(Configuration)`, read `hoodie.stream.write.commit.hook.config` 
and any other options they need.
   4. Return a non-null map from `getCommitExtraMetadata(long checkpointId)` 
(can be empty).
   5. Optionally use `postCommit(PostCommitContext)` and `close()`.
   
   - Usage examples
   **1. Minimal hook that adds fixed metadata and logs post-commit**
   
   ```java
   public class MyCommitHook implements StreamWriteCommitHook {
     private String tableName;
   
     @Override
     public void init(Configuration conf) {
       tableName = conf.getOptional(FlinkOptions.TABLE_NAME).orElse("unknown");
     }
   
     @Override
     public Map<String, String> getCommitExtraMetadata(long checkpointId) {
       Map<String, String> extra = new HashMap<>();
       extra.put("source", "flink");
       extra.put("checkpointId", String.valueOf(checkpointId));
       return extra;
     }
   
     @Override
     public void postCommit(PostCommitContext context) {
       LOG.info("Committed instant={} checkpointId={} records={}",
           context.getInstant(), context.getCheckpointId(), 
context.getTotalRecords());
     }
   
     @Override
     public void close() {}
   }
   ```
   
   **2. Hook that fetches Kafka offsets in pre-commit and validates in 
post-commit (conceptually)**
   
   - In `getCommitExtraMetadata(checkpointId)`: call an external service to get 
Kafka offsets for this checkpoint, encode them, put in the returned map (e.g. 
under a well-known key). Hudi persists this with the commit.
   - In `postCommit(context)`: read last two commits’ metadata, compute 
expected record count from offset deltas, compare to 
`context.getTotalRecords()` + `context.getTotalErrorRecords()`, and emit a 
metric (e.g. validation pass/fail).
   
   **3. Enabling the hook in the job**
   
   Users set the options when building the Flink stream write pipeline (e.g. on 
the configuration passed to the Hudi sink / `StreamerUtil`):
   
   ```java
   conf.setString("hoodie.stream.write.commit.hook.class", 
"com.mycompany.KafkaOffsetCommitHook");
   conf.setString("hoodie.stream.write.commit.hook.config", 
"{\"topicName\":\"events\",\"cluster\":\"prod\"}");
   ```
   
   No code changes in Hudi core beyond configuring the class name and optional 
config; the hook is loaded and invoked by the existing coordinator/stream write 
path.
   
   ---
   
   ### Hook API (interface shape)
   
   ```java
   package org.apache.hudi.sink.extensions;
   
   import org.apache.hudi.common.table.HoodieTableMetaClient;
   import org.apache.flink.configuration.Configuration;
   
   import java.io.Closeable;
   import java.util.Map;
   
   public interface StreamWriteCommitHook extends Closeable {
   
     void init(Configuration conf);
   
     Map<String, String> getCommitExtraMetadata(long checkpointId);
   
     void postCommit(PostCommitContext context);
   
     class PostCommitContext {
       private final HoodieTableMetaClient metaClient;
       private final String instant;
       private final long checkpointId;
       private final long totalRecords;
       private final long totalErrorRecords;
   
       public PostCommitContext(
           HoodieTableMetaClient metaClient,
           String instant,
           long checkpointId,
           long totalRecords,
           long totalErrorRecords) { ... }
   
       public HoodieTableMetaClient getMetaClient() { return metaClient; }
       public String getInstant() { return instant; }
       public long getCheckpointId() { return checkpointId; }
       public long getTotalRecords() { return totalRecords; }
       public long getTotalErrorRecords() { return totalErrorRecords; }
     }
   }
   ```
   
   ### Hudi RFC Requirements
   
   **RFC PR link:** (if applicable)
   
   **Why RFC is/isn't needed:**
   - Does this change public interfaces/APIs? (Yes/No)
   - Does this change storage format? (Yes/No)
   - Justification:
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to