hudi-agent commented on code in PR #18885:
URL: https://github.com/apache/hudi/pull/18885#discussion_r3336308019
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java:
##########
@@ -94,12 +96,45 @@ public static HoodieCommitMetadata
buildMetadata(List<HoodieWriteStat> writeStat
if (extraMetadata.isPresent()) {
extraMetadata.get().forEach(commitMetadata::addMetadata);
}
- commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
(schemaToStoreInCommit == null || schemaToStoreInCommit.equals(NULL_SCHEMA_STR))
- ? "" : schemaToStoreInCommit);
+ commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
+ sanitizeSchemaForCommitMetadata(schemaToStoreInCommit));
commitMetadata.setOperationType(operationType);
return commitMetadata;
}
+ /**
+ * Returns the value to persist under {@link
HoodieCommitMetadata#SCHEMA_KEY}.
+ * The schema stored in commit extraMetadata must be the user/write schema
and
+ * must NOT contain Hudi meta fields ({@code _hoodie_commit_time}, etc.). If
+ * the caller-provided schema has meta fields (e.g. because some upstream
code
+ * mutated the in-memory write config schema with
reader-schema-with-meta-fields,
+ * or because a previously-polluted SCHEMA_KEY was read back into the
config),
+ * this strips them so the persisted schema is always clean. When no meta
fields
+ * are present, the input string is returned unchanged.
+ */
+ public static String sanitizeSchemaForCommitMetadata(String
schemaToStoreInCommit) {
+ if (StringUtils.isNullOrEmpty(schemaToStoreInCommit) ||
schemaToStoreInCommit.equals(NULL_SCHEMA_STR)) {
+ return "";
+ }
Review Comment:
🤖 I noticed several other commit paths still write `SCHEMA_KEY` directly
without going through `buildMetadata` (and therefore bypass the new
sanitization): `RunCompactionActionExecutor.execute` (Spark/Java compaction,
lines ~106/109), `CompactHelpers.createCompactionMetadata` (used by Flink
compaction, line ~82), `TransactionUtils.resolveWriteConflictIfAny` (line
~104), and `BaseHoodieWriteClient.saveInternalSchema` (line ~383). The PR
description calls out compaction reader-schema setup and conflict resolution as
upstream pollution sources, so it seems worth either routing those writes
through `sanitizeSchemaForCommitMetadata` too (it's now public) or noting
they're intentionally out of scope. @yihua wdyt — should this PR extend to
those paths, or are they better as a follow-up?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java:
##########
@@ -129,6 +134,99 @@ public void testClusteringWithRow() throws IOException {
writeAndClustering(true);
}
+ /**
+ * Asserts that the schema persisted under HoodieCommitMetadata.SCHEMA_KEY
in a completed
+ * replace (clustering) commit does NOT contain Hudi meta fields like
_hoodie_commit_time.
+ * The schema stored in commit metadata is meant to be the user/write schema.
+ */
+ @Test
+ public void testReplaceCommitSchemaHasNoMetaFields() throws Exception {
+ setup(102400);
+ config.setValue("hoodie.datasource.write.row.writer.enable", "false");
+ config.setValue("hoodie.metadata.enable", "false");
+
config.setValue("hoodie.clustering.plan.strategy.daybased.lookback.partitions",
"1");
+ config.setValue("hoodie.clustering.plan.strategy.target.file.max.bytes",
String.valueOf(1024 * 1024));
+ config.setValue("hoodie.clustering.plan.strategy.max.bytes.per.group",
String.valueOf(2 * 1024 * 1024));
+
+ writeData(1000, true, System.currentTimeMillis());
+
+ String clusteringTime = (String)
writeClient.scheduleClustering(Option.empty()).get();
+ writeClient.cluster(clusteringTime, true);
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieInstant replaceInstant = metaClient.getActiveTimeline()
+ .getCompletedReplaceTimeline()
+ .filter(i -> i.requestedTime().equals(clusteringTime))
+ .firstInstant()
+ .orElseThrow(() -> new AssertionError("No completed replace commit
found for " + clusteringTime));
+
+ HoodieReplaceCommitMetadata replaceCommitMetadata =
+
metaClient.getActiveTimeline().readReplaceCommitMetadata(replaceInstant);
+ assertSchemaHasNoMetaFields(replaceCommitMetadata, "replace (clustering)
commit");
+ }
+
+ /**
+ * Even when {@code config.getSchema()} is pre-polluted with Hudi meta fields
+ * (simulating upstream paths like compaction reader-schema setup that may
set
+ * a schema-with-meta-fields back onto the write config), both ingestion and
+ * clustering commits must persist a clean schema (without meta fields) under
+ * {@link HoodieCommitMetadata#SCHEMA_KEY}. This guards the sanitization in
+ * {@code CommitUtils#sanitizeSchemaForCommitMetadata(String)}.
+ */
+ @Test
+ public void testCommitSchemaCleanedEvenWhenConfigSchemaHasMetaFields()
throws Exception {
+ setup(102400);
+ config.setValue("hoodie.datasource.write.row.writer.enable", "false");
+ config.setValue("hoodie.metadata.enable", "false");
Review Comment:
🤖 nit: these five `config.setValue(...)` lines are identical to lines
146-150 in `testReplaceCommitSchemaHasNoMetaFields` — have you considered
pulling them into a `setupClusteringConfig()` helper so a future change to a
key or value only needs to happen in one place?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java:
##########
@@ -94,12 +96,45 @@ public static HoodieCommitMetadata
buildMetadata(List<HoodieWriteStat> writeStat
if (extraMetadata.isPresent()) {
extraMetadata.get().forEach(commitMetadata::addMetadata);
}
- commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
(schemaToStoreInCommit == null || schemaToStoreInCommit.equals(NULL_SCHEMA_STR))
- ? "" : schemaToStoreInCommit);
+ commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
+ sanitizeSchemaForCommitMetadata(schemaToStoreInCommit));
commitMetadata.setOperationType(operationType);
return commitMetadata;
}
+ /**
+ * Returns the value to persist under {@link
HoodieCommitMetadata#SCHEMA_KEY}.
+ * The schema stored in commit extraMetadata must be the user/write schema
and
+ * must NOT contain Hudi meta fields ({@code _hoodie_commit_time}, etc.). If
+ * the caller-provided schema has meta fields (e.g. because some upstream
code
+ * mutated the in-memory write config schema with
reader-schema-with-meta-fields,
+ * or because a previously-polluted SCHEMA_KEY was read back into the
config),
+ * this strips them so the persisted schema is always clean. When no meta
fields
+ * are present, the input string is returned unchanged.
+ */
+ public static String sanitizeSchemaForCommitMetadata(String
schemaToStoreInCommit) {
+ if (StringUtils.isNullOrEmpty(schemaToStoreInCommit) ||
schemaToStoreInCommit.equals(NULL_SCHEMA_STR)) {
+ return "";
+ }
+ if (!containsHudiMetaField(schemaToStoreInCommit)) {
+ return schemaToStoreInCommit;
+ }
+ HoodieSchema schema = HoodieSchema.parse(schemaToStoreInCommit);
+ if (schema.isSchemaNull()) {
+ return "";
+ }
+ return HoodieSchemaUtils.removeMetadataFields(schema).toString();
+ }
+
+ private static boolean containsHudiMetaField(String schemaStr) {
+ for (String metaField : HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION) {
Review Comment:
🤖 nit: could you simplify this to a stream one-liner? `return
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.stream().anyMatch(schemaStr::contains);`
— matches the pattern used elsewhere in the codebase and avoids the manual
loop.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]