hudi-agent commented on code in PR #18988:
URL: https://github.com/apache/hudi/pull/18988#discussion_r3408505156
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -639,7 +626,8 @@ public O postWrite(HoodieWriteMetadata<O> result, String
instantTime, HoodieTabl
boolean postCommitStatus = true;
HoodieTimer postCommitTimer = HoodieTimer.start();
try {
- postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime,
Option.empty());
+ postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime,
+ hoodieTable.getMetaClient().getCommitActionType(), Option.empty());
Review Comment:
🤖 Could this report the wrong action type for auto-committed replacecommit
operations? `hoodieTable.getMetaClient().getCommitActionType()` returns
`commit`/`deltacommit` (operation-agnostic), but
`SparkPartitionTTLActionExecutor` goes through `SparkAutoCommitExecutor` and
commits with `REPLACE_COMMIT_ACTION` — the callback would advertise
`commit`/`deltacommit` for that path. Using
`CommitUtils.getCommitActionType(getOperationType(),
table.getMetaClient().getTableType())` would match the actual committed action.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java:
##########
@@ -69,10 +70,54 @@ public class HoodieWriteCommitCallbackMessage implements
Serializable {
*/
private final Option<Map<String, String>> extraMetadata;
+ /**
+ * Previous base file paths keyed by fileId. Populated by the write client
+ * using the cached FileSystemView so that callback implementations don't
+ * have to rebuild a view. Empty for inserts and for callers that don't
+ * pre-resolve.
+ */
+ private final Map<String, PrevFilePaths> prevFilePaths;
+
+ /**
+ * Free-form context that producers can attach for downstream callback
consumers.
+ * The OSS write client populates this as empty; specialized callsites or
wrappers
+ * may populate it with whatever context their callbacks need. Mirrors the
+ * optional shape of {@link #extraMetadata}.
+ */
+ private final Map<String, String> extraContext;
+
public HoodieWriteCommitCallbackMessage(String commitTime,
String tableName,
String basePath,
List<HoodieWriteStat>
hoodieWriteStat) {
- this(commitTime, tableName, basePath, hoodieWriteStat, Option.empty(),
Option.empty());
+ this(commitTime, tableName, basePath, hoodieWriteStat, Option.empty(),
Option.empty(),
+ Collections.emptyMap(), Collections.emptyMap());
+ }
+
+ public HoodieWriteCommitCallbackMessage(String commitTime,
+ String tableName,
+ String basePath,
+ List<HoodieWriteStat>
hoodieWriteStat,
+ Option<String> commitActionType,
+ Option<Map<String, String>>
extraMetadata) {
+ this(commitTime, tableName, basePath, hoodieWriteStat, commitActionType,
extraMetadata,
+ Collections.emptyMap(), Collections.emptyMap());
+ }
+
+ /**
+ * Container for previously-existing file paths associated with a single
fileId in a
+ * commit. {@link #prevBaseFilePath} is the base file the new write
replaces, and
+ * {@link #bootstrapBaseFilePath} is the bootstrap-source file the previous
+ * base file referenced (null for non-bootstrap tables).
+ */
+ public static class PrevFilePaths implements Serializable {
+ private static final long serialVersionUID = 1L;
+ public final String prevBaseFilePath;
Review Comment:
🤖 nit: the outer class uses Lombok `@Getter` for field access, but
`PrevFilePaths` exposes bare `public final` fields — callers end up using two
different access patterns on the same message object. Would it be worth adding
`@Getter` here (and making the fields `private`) for consistency?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -640,6 +642,8 @@ private void completeClustering(HoodieReplaceCommitMetadata
replaceCommitMetadat
heartbeatClient.stop(clusteringCommitTime);
}
log.info("Clustering successfully on commit {} for table {}",
clusteringCommitTime, table.getConfig().getBasePath());
+ fireCommitCallbackIfNecessary(clusteringCommitTime,
HoodieTimeline.REPLACE_COMMIT_ACTION,
Review Comment:
🤖 For table-version 8+ clustering uses `HoodieTimeline.CLUSTERING_ACTION` as
the completed instant action (not `REPLACE_COMMIT_ACTION`) — the metrics line
just above already uses CLUSTERING_ACTION. Could you read the action from
`clusteringInstant.getAction()` so consumers see the same action that's
actually on the timeline?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -424,6 +424,8 @@ protected void completeCompaction(HoodieCommitMetadata
metadata, HoodieTable tab
);
}
log.info("Compacted successfully on commit {}", compactionCommitTime);
+ fireCommitCallbackIfNecessary(compactionCommitTime,
HoodieTimeline.COMMIT_ACTION,
Review Comment:
🤖 Worth noting: `HoodieFlinkTableServiceClient` overrides both
`completeCompaction` and `completeClustering` and doesn't delegate to super or
call `fireCommitCallbackIfNecessary`, so Flink users won't see the new
table-service callback. Should the Flink overrides also fire the callback, or
is the PR intentionally scoped to Spark/Java?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java:
##########
@@ -462,4 +477,79 @@ private static Map<String, String>
collectRollingMetadataFromTimeline(
protected Option<Map<String, String>> updateExtraMetadata(Option<Map<String,
String>> extraMetadata) {
return CommitMetadataProperties.enrich(extraMetadata, config, context);
}
+
+ /**
+ * Fire {@link HoodieWriteCommitCallback} for a commit, if enabled. Shared by
+ * {@link BaseHoodieWriteClient#postCommit} (regular auto- and
explicit-commit paths)
+ * and {@link BaseHoodieTableServiceClient} (compaction and clustering
completions).
+ * Lazily constructs the callback instance from {@code
hoodie.write.commit.callback.class}.
+ *
+ * <p>Best-effort: catches and logs any exception from the user-supplied
callback so a
+ * misbehaving observer cannot fail the commit.
+ */
+ protected void fireCommitCallbackIfNecessary(String commitTime,
+ String commitActionType,
+ List<HoodieWriteStat> stats,
+ Supplier<BaseFileOnlyView>
fsViewSupplier,
+ Option<Map<String, String>>
extraMetadata) {
+ if (!config.writeCommitCallbackOn()) {
+ return;
+ }
+ try {
+ if (commitCallback == null) {
+ commitCallback = HoodieCommitCallbackFactory.create(config);
+ }
+ commitCallback.call(new HoodieWriteCommitCallbackMessage(
+ commitTime, config.getTableName(), config.getBasePath(),
+ stats, Option.of(commitActionType), extraMetadata,
+ resolvePrevFilePaths(stats, fsViewSupplier.get()),
+ Collections.emptyMap()));
+ } catch (Exception e) {
+ log.warn("HoodieWriteCommitCallback failed for commit {} ({}); ignoring",
+ commitTime, commitActionType, e);
+ }
+ }
+
+ /**
+ * Pre-resolve the previous base file (and bootstrap base file, if any) for
every
+ * {@link HoodieWriteStat} that represents an update, using a populated
+ * {@link BaseFileOnlyView}. The lookup is O(1) per stat against the cached
view, so
+ * this adds no I/O on top of what the writer already paid.
+ *
+ * <p>Used by {@link #fireCommitCallbackIfNecessary} call sites so the
callback message ships
+ * actual file paths rather than forcing each callback impl to rebuild a
+ * {@code FileSystemView}.
+ */
+ protected static Map<String, PrevFilePaths>
resolvePrevFilePaths(List<HoodieWriteStat> stats,
+
BaseFileOnlyView fsView) {
+ Map<String, PrevFilePaths> out = new HashMap<>();
+ if (stats == null || fsView == null) {
+ return out;
+ }
+ for (HoodieWriteStat stat : stats) {
+ String prevCommit = stat.getPrevCommit();
+ if (StringUtils.isNullOrEmpty(prevCommit) ||
HoodieWriteStat.NULL_COMMIT.equals(prevCommit)) {
+ continue;
+ }
+ Option<HoodieBaseFile> prev;
+ try {
+ prev = fsView.getBaseFileOn(stat.getPartitionPath(), prevCommit,
stat.getFileId());
+ } catch (Exception e) {
+ // Best-effort: a remote view 4xx/5xx, a stale view, or a replaced
file group must not
+ // fail the commit. Drop the prev path for this stat and keep going.
+ log.warn("Could not resolve prev base file for fileId={}
prevCommit={}; skipping",
+ stat.getFileId(), prevCommit, e);
+ continue;
+ }
+ if (!prev.isPresent()) {
+ continue;
+ }
+ String prevPath = prev.get().getPath();
+ String bootstrapPath = prev.get().getBootstrapBaseFile().isPresent()
Review Comment:
🤖 nit: `prev.get()` and `getBootstrapBaseFile()` are each called twice —
could you cache them in locals, e.g. `HoodieBaseFile prevFile = prev.get()` and
`Option<BaseFile> bootstrapBase = prevFile.getBootstrapBaseFile()`?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -287,7 +283,7 @@ public boolean commitStats(String instantTime,
TableWriteStats tableWriteStats,
boolean postCommitStatus = true;
HoodieTimer postCommitTimer = HoodieTimer.start();
try {
- postCommit(table, metadata, instantTime, extraMetadata);
+ postCommit(table, metadata, instantTime, commitActionType,
extraMetadata);
Review Comment:
🤖 Is the new ordering intentional? Pre-PR the callback fired after
`mayBeCleanAndArchive` and `runTableServicesInline` (and was skipped if they
threw with `canIgnorePostCommitFailures=false`). Now it fires inside
`postCommit`, before them — so a consumer can receive a successful-commit
callback even when post-commit cleanup throws and `commitStats` ultimately
propagates the exception. The PR description mentions extra callbacks but
doesn't call out this ordering shift.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java:
##########
@@ -69,10 +70,54 @@ public class HoodieWriteCommitCallbackMessage implements
Serializable {
*/
private final Option<Map<String, String>> extraMetadata;
+ /**
+ * Previous base file paths keyed by fileId. Populated by the write client
+ * using the cached FileSystemView so that callback implementations don't
+ * have to rebuild a view. Empty for inserts and for callers that don't
+ * pre-resolve.
+ */
+ private final Map<String, PrevFilePaths> prevFilePaths;
+
+ /**
+ * Free-form context that producers can attach for downstream callback
consumers.
+ * The OSS write client populates this as empty; specialized callsites or
wrappers
+ * may populate it with whatever context their callbacks need. Mirrors the
Review Comment:
🤖 nit: the phrase "Mirrors the optional shape of `extraMetadata`" is
misleading — `extraContext` is `Map<String, String>`, not `Option<Map<String,
String>>`, so the shapes are different. Could you drop or rephrase that
sentence?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]