jianchun opened a new issue, #18270: URL: https://github.com/apache/hudi/issues/18270
### Feature Description **What the feature achieves:** - **Pre-commit extension:** Users can supply extra key-value metadata (e.g. Kafka offsets, source timestamps) that is persisted with each Hudi commit. - **Post-commit extension:** Users can run custom logic after every successful commit (e.g. validation, bookkeeping, metrics, notifications) with access to commit context (instant, record counts, table meta client). - **Single plugin, clear lifecycle:** One hook implementation is loaded from configuration; it is initialized once, then invoked per checkpoint for pre-commit and post-commit, and closed on shutdown. The same hook instance can keep state across both phases (e.g. verify in post-commit using data gathered in pre-commit). **Why this feature is needed:** Streaming Hudi clients often need to: - Persist **source metadata** with commits (e.g. Kafka partition/offset or timestamps) for downstream consumers, debugging, or replay. - Do **bookkeeping or validation** after a commit (e.g. validate written record counts against offset deltas, update external state). - Emit **metrics** tied to commits (e.g. event-time latency, offset validation success/failure). Today this requires embedding logic in the Flink/Hudi codebase or using generic mechanisms. A dedicated hook gives a single extension point with explicit pre-commit and post-commit phases and shared context, so implementations can inject metadata and then run stateful post-commit logic (e.g. in post-commit, verify against information from pre-commit). **Relationship to [Post-commit Callback](https://hudi.apache.org/docs/platform_services_post_commit_callback/):** The existing callback is stateless and does not participate in building commit metadata; it is for “notify after commit” (e.g. HTTP/Kafka). The stream write commit hook is stateful and two-phase: it contributes commit metadata and runs post-commit with rich context. Use the callback for simple notifications; use the hook when you need both metadata injection and stateful post-commit logic in one plugin. ### User Experience **How users will use this feature:** - Configuration changes needed Two new Flink/Hudi options: | Option | Description | Required | |--------|-------------|----------| | `hoodie.stream.write.commit.hook.class` | Fully qualified class name of a class implementing `StreamWriteCommitHook`. Loaded via reflection. | No (optional). If unset or empty, no hook runs. | | `hoodie.stream.write.commit.hook.config` | Opaque config string (e.g. JSON) passed to the hook in `init(Configuration)`. Semantics are defined by the hook implementation. | No; hook can rely on other existing options if needed. | Example (e.g. in Flink job config): ```properties hoodie.stream.write.commit.hook.class=com.example.MyStreamWriteCommitHook hoodie.stream.write.commit.hook.config={"serviceName":"my-svc","topicName":"my-topic"} ``` - API changes - **New public interface:** `org.apache.hudi.sink.extensions.StreamWriteCommitHook` (Flink module). No new dependencies; interface extends `Closeable`. - **New loader:** `StreamWriteCommitHookLoader.load(Configuration)` — returns a hook instance or `null` if not configured / load failure (fail-open: commit proceeds without hook). - **New context class:** `StreamWriteCommitHook.PostCommitContext` — holds `HoodieTableMetaClient`, instant, checkpoint ID, total records, total error records for use in `postCommit(...)`. Implementations must: 1. Implement `StreamWriteCommitHook`. 2. Have a no-arg constructor (reflective load). 3. In `init(Configuration)`, read `hoodie.stream.write.commit.hook.config` and any other options they need. 4. Return a non-null map from `getCommitExtraMetadata(long checkpointId)` (can be empty). 5. Optionally use `postCommit(PostCommitContext)` and `close()`. - Usage examples **1. Minimal hook that adds fixed metadata and logs post-commit** ```java public class MyCommitHook implements StreamWriteCommitHook { private String tableName; @Override public void init(Configuration conf) { tableName = conf.getOptional(FlinkOptions.TABLE_NAME).orElse("unknown"); } @Override public Map<String, String> getCommitExtraMetadata(long checkpointId) { Map<String, String> extra = new HashMap<>(); extra.put("source", "flink"); extra.put("checkpointId", String.valueOf(checkpointId)); return extra; } @Override public void postCommit(PostCommitContext context) { LOG.info("Committed instant={} checkpointId={} records={}", context.getInstant(), context.getCheckpointId(), context.getTotalRecords()); } @Override public void close() {} } ``` **2. Hook that fetches Kafka offsets in pre-commit and validates in post-commit (conceptually)** - In `getCommitExtraMetadata(checkpointId)`: call an external service to get Kafka offsets for this checkpoint, encode them, put in the returned map (e.g. under a well-known key). Hudi persists this with the commit. - In `postCommit(context)`: read last two commits’ metadata, compute expected record count from offset deltas, compare to `context.getTotalRecords()` + `context.getTotalErrorRecords()`, and emit a metric (e.g. validation pass/fail). **3. Enabling the hook in the job** Users set the options when building the Flink stream write pipeline (e.g. on the configuration passed to the Hudi sink / `StreamerUtil`): ```java conf.setString("hoodie.stream.write.commit.hook.class", "com.mycompany.KafkaOffsetCommitHook"); conf.setString("hoodie.stream.write.commit.hook.config", "{\"topicName\":\"events\",\"cluster\":\"prod\"}"); ``` No code changes in Hudi core beyond configuring the class name and optional config; the hook is loaded and invoked by the existing coordinator/stream write path. --- ### Hook API (interface shape) ```java package org.apache.hudi.sink.extensions; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.flink.configuration.Configuration; import java.io.Closeable; import java.util.Map; public interface StreamWriteCommitHook extends Closeable { void init(Configuration conf); Map<String, String> getCommitExtraMetadata(long checkpointId); void postCommit(PostCommitContext context); class PostCommitContext { private final HoodieTableMetaClient metaClient; private final String instant; private final long checkpointId; private final long totalRecords; private final long totalErrorRecords; public PostCommitContext( HoodieTableMetaClient metaClient, String instant, long checkpointId, long totalRecords, long totalErrorRecords) { ... } public HoodieTableMetaClient getMetaClient() { return metaClient; } public String getInstant() { return instant; } public long getCheckpointId() { return checkpointId; } public long getTotalRecords() { return totalRecords; } public long getTotalErrorRecords() { return totalErrorRecords; } } } ``` ### Hudi RFC Requirements **RFC PR link:** (if applicable) **Why RFC is/isn't needed:** - Does this change public interfaces/APIs? (Yes/No) - Does this change storage format? (Yes/No) - Justification: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
