voonhous commented on code in PR #18680:
URL: https://github.com/apache/hudi/pull/18680#discussion_r3182190292
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java:
##########
@@ -267,7 +267,7 @@ public HoodieCleanMetadata execute() {
if (pendingCleanInstants.size() > 0) {
// try to clean old history schema.
try {
- FileBasedInternalSchemaStorageManager fss = new
FileBasedInternalSchemaStorageManager(table.getMetaClient());
+ HoodieSchemaHistoryStorageManager fss = new
HoodieSchemaHistoryStorageManager(table.getMetaClient());
Review Comment:
Addressed.
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java:
##########
@@ -125,9 +125,9 @@ protected Option<HoodieTableMetadataWriter>
getMetadataWriter(
}
private static void setLatestInternalSchema(HoodieWriteConfig config,
HoodieTableMetaClient metaClient) {
- Option<InternalSchema> internalSchema = new
TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata();
- if (internalSchema.isPresent()) {
- config.setInternalSchemaString(SerDeHelper.toJson(internalSchema.get()));
+ Option<HoodieSchema> evolutionSchema = new
TableSchemaResolver(metaClient).getTableEvolutionSchemaFromCommitMetadata();
Review Comment:
Addressed.
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaSerDe.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+
+/**
+ * HoodieSchema-shaped façade for evolution-schema JSON serialization.
+ *
+ * <p>The on-disk format is fixed: the {@code latest_schema} blob in commit
metadata
+ * and the {@code .hoodie/.schema/} history files use the same JSON layout that
+ * {@link SerDeHelper} has always produced ({@code schemas} array containing
+ * objects with {@code version_id}, {@code max_column_id}, {@code type},
{@code fields}, etc.).
+ * Old tables must remain readable, so this façade delegates to {@link
SerDeHelper}
+ * verbatim and converts at the HoodieSchema/InternalSchema boundary via
+ * {@link HoodieSchemaInternalSchemaBridge}, preserving field ids on the way
out.</p>
+ *
+ * <p>Phase 5 of the InternalSchema removal will rewrite the JSON serializer in
+ * pure HoodieSchema terms behind this stable interface — but the byte-for-byte
+ * compatibility constraint stays in force.</p>
+ */
+public final class HoodieSchemaSerDe {
+
+ /**
+ * Commit-metadata key under which the latest schema's JSON is stored.
Carried
+ * over verbatim from {@link SerDeHelper#LATEST_SCHEMA} so callers reading
old
+ * commit metadata pick up the same blob.
+ */
+ public static final String LATEST_SCHEMA = SerDeHelper.LATEST_SCHEMA;
+
+ /**
+ * JSON object key that wraps the array of historical schemas. Same as
+ * {@link SerDeHelper#SCHEMAS}.
+ */
+ public static final String SCHEMAS = SerDeHelper.SCHEMAS;
+
+ private HoodieSchemaSerDe() {
+ }
+
+ /**
+ * Serializes a single schema to JSON. Output format matches the legacy
+ * {@link SerDeHelper#toJson(InternalSchema)} byte for byte.
+ */
+ public static String toJson(HoodieSchema schema) {
+ return
SerDeHelper.toJson(HoodieSchemaInternalSchemaBridge.toInternalSchema(schema));
+ }
+
+ /**
+ * Serializes a history of schemas to JSON. Output format matches the legacy
+ * {@link SerDeHelper#toJson(List)} byte for byte.
+ */
+ public static String toJsonHistory(List<HoodieSchema> schemas) {
+ List<InternalSchema> converted = new ArrayList<>(schemas.size());
+ for (HoodieSchema s : schemas) {
+ converted.add(HoodieSchemaInternalSchemaBridge.toInternalSchema(s));
+ }
+ return SerDeHelper.toJson(converted);
+ }
+
+ /**
+ * Parses a single-schema JSON blob (typically the {@code latest_schema}
value
+ * from commit metadata). Returns empty if the input is null/empty so callers
+ * can pass through optional commit metadata fields.
+ */
+ public static Option<HoodieSchema> fromJson(String json) {
+ Option<InternalSchema> internal = SerDeHelper.fromJson(json);
+ if (!internal.isPresent()) {
+ return Option.empty();
+ }
+ return
Option.of(HoodieSchemaInternalSchemaBridge.toHoodieSchema(internal.get(),
defaultRecordName(internal.get())));
+ }
+
+ /**
+ * Parses the history-schemas JSON layout (array of versioned schemas) and
+ * returns them keyed by {@code version_id}. Field ids are preserved on each
+ * returned HoodieSchema so the resulting map is directly usable by the
+ * read/merge path.
+ */
+ public static TreeMap<Long, HoodieSchema> parseHistorySchemas(String json) {
Review Comment:
Addressed.
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaHistoryStorageManager.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.InternalSchema;
+import
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import java.util.List;
+
+/**
+ * HoodieSchema-shaped façade around the {@code .hoodie/.schema/}
schema-history
+ * storage layer. Delegates to {@link FileBasedInternalSchemaStorageManager}
so the
+ * on-disk paths, file naming, and JSON layout are all preserved verbatim — a
hard
+ * requirement for backward compatibility with tables created by prior Hudi
versions.
+ *
+ * <p>String-based methods ({@link #persistHistorySchemaStr},
+ * {@link #getHistorySchemaStr}, {@link #cleanOldFiles}) pass through
unchanged.
+ * Only {@link #getSchemaByKey(String)} crosses the schema-type boundary and
uses
+ * {@link HoodieSchemaInternalSchemaBridge} to return a HoodieSchema with
field ids
+ * preserved.</p>
+ */
+public class HoodieSchemaHistoryStorageManager {
+
+ /**
+ * Subdirectory name under {@code .hoodie/} that holds the schema-history
files.
+ * Matches {@link FileBasedInternalSchemaStorageManager#SCHEMA_NAME}
verbatim.
+ */
+ public static final String SCHEMA_NAME =
FileBasedInternalSchemaStorageManager.SCHEMA_NAME;
+
+ private final FileBasedInternalSchemaStorageManager delegate;
+
+ public HoodieSchemaHistoryStorageManager(HoodieStorage storage, StoragePath
baseTablePath) {
+ this.delegate = new FileBasedInternalSchemaStorageManager(storage,
baseTablePath);
+ }
+
+ public HoodieSchemaHistoryStorageManager(HoodieTableMetaClient metaClient) {
+ this.delegate = new FileBasedInternalSchemaStorageManager(metaClient);
+ }
+
+ /**
+ * Persists a history-schema JSON blob keyed by the commit instant time. The
+ * input JSON is expected to be in the layout produced by
+ * {@link HoodieSchemaSerDe#toJsonHistory} or
+ * {@link HoodieSchemaSerDe#inheritHistory}.
+ */
+ public void persistHistorySchemaStr(String instantTime, String
historySchemaStr) {
+ delegate.persistHistorySchemaStr(instantTime, historySchemaStr);
+ }
+
+ /**
+ * Removes schema-history files whose instant times are not in the
valid-commits
+ * set. Safety mechanism for archival cleanup.
+ */
+ public void cleanOldFiles(List<String> validateCommits) {
Review Comment:
Addressed.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java:
##########
@@ -675,15 +673,14 @@ private Pair<ClosableIterator<HoodieRecord>,
HoodieSchema> getRecordsIterator(
*/
private Option<Pair<Function<HoodieRecord, HoodieRecord>, HoodieSchema>>
composeEvolvedSchemaTransformer(
HoodieDataBlock dataBlock) {
- if (internalSchema.isEmptySchema()) {
+ if (evolutionSchema.isEmptySchema()) {
return Option.empty();
}
long currentInstantTime =
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
- InternalSchema fileSchema =
InternalSchemaCache.searchSchemaAndCache(currentInstantTime,
hoodieTableMetaClient);
- InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema,
internalSchema,
+ HoodieSchema fileSchema =
HoodieSchemaHistoryCache.searchSchemaAndCache(currentInstantTime,
hoodieTableMetaClient);
Review Comment:
Fixed by adding a mergeSchema(recordName) overload on HoodieSchemaMerger and
passing readerSchema.getFullName() at this call site, restoring the legacy
InternalSchemaConverter.convert(merged, readerSchema.getFullName()) behavior.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java:
##########
@@ -173,7 +173,9 @@ protected BaseHoodieLogRecordReader(HoodieReaderContext<T>
readerContext, Hoodie
this.instantRange = instantRange;
this.withOperationField = withOperationField;
this.forceFullScan = forceFullScan;
- this.internalSchema = readerContext.getSchemaHandler() != null ?
readerContext.getSchemaHandler().getInternalSchema() : null;
+ this.evolutionSchema = readerContext.getSchemaHandler() != null &&
readerContext.getSchemaHandler().getInternalSchema() != null
+ ?
HoodieSchemaInternalSchemaBridge.toHoodieSchema(readerContext.getSchemaHandler().getInternalSchema(),
readerSchema != null ? readerSchema.getFullName() : null)
Review Comment:
The `readerSchema != null ? readerSchema.getFullName() : null` pattern
doesn't exist in the current file. The only defensive null check is on
getSchemaHandler() in the constructor, which assigns readerSchema directly.
Treating as stale.
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaSerDe.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+
+/**
+ * HoodieSchema-shaped façade for evolution-schema JSON serialization.
+ *
+ * <p>The on-disk format is fixed: the {@code latest_schema} blob in commit
metadata
+ * and the {@code .hoodie/.schema/} history files use the same JSON layout that
+ * {@link SerDeHelper} has always produced ({@code schemas} array containing
+ * objects with {@code version_id}, {@code max_column_id}, {@code type},
{@code fields}, etc.).
+ * Old tables must remain readable, so this façade delegates to {@link
SerDeHelper}
+ * verbatim and converts at the HoodieSchema/InternalSchema boundary via
+ * {@link HoodieSchemaInternalSchemaBridge}, preserving field ids on the way
out.</p>
+ *
+ * <p>Phase 5 of the InternalSchema removal will rewrite the JSON serializer in
+ * pure HoodieSchema terms behind this stable interface — but the byte-for-byte
+ * compatibility constraint stays in force.</p>
+ */
+public final class HoodieSchemaSerDe {
+
+ /**
+ * Commit-metadata key under which the latest schema's JSON is stored.
Carried
+ * over verbatim from {@link SerDeHelper#LATEST_SCHEMA} so callers reading
old
+ * commit metadata pick up the same blob.
+ */
+ public static final String LATEST_SCHEMA = SerDeHelper.LATEST_SCHEMA;
+
+ /**
+ * JSON object key that wraps the array of historical schemas. Same as
+ * {@link SerDeHelper#SCHEMAS}.
+ */
+ public static final String SCHEMAS = SerDeHelper.SCHEMAS;
+
+ private HoodieSchemaSerDe() {
+ }
+
+ /**
+ * Serializes a single schema to JSON. Output format matches the legacy
+ * {@link SerDeHelper#toJson(InternalSchema)} byte for byte.
+ */
+ public static String toJson(HoodieSchema schema) {
+ return
SerDeHelper.toJson(HoodieSchemaInternalSchemaBridge.toInternalSchema(schema));
+ }
+
+ /**
+ * Serializes a history of schemas to JSON. Output format matches the legacy
+ * {@link SerDeHelper#toJson(List)} byte for byte.
+ */
+ public static String toJsonHistory(List<HoodieSchema> schemas) {
+ List<InternalSchema> converted = new ArrayList<>(schemas.size());
+ for (HoodieSchema s : schemas) {
+ converted.add(HoodieSchemaInternalSchemaBridge.toInternalSchema(s));
+ }
+ return SerDeHelper.toJson(converted);
+ }
+
+ /**
+ * Parses a single-schema JSON blob (typically the {@code latest_schema}
value
+ * from commit metadata). Returns empty if the input is null/empty so callers
+ * can pass through optional commit metadata fields.
+ */
+ public static Option<HoodieSchema> fromJson(String json) {
+ Option<InternalSchema> internal = SerDeHelper.fromJson(json);
+ if (!internal.isPresent()) {
+ return Option.empty();
+ }
+ return
Option.of(HoodieSchemaInternalSchemaBridge.toHoodieSchema(internal.get(),
defaultRecordName(internal.get())));
+ }
+
+ /**
+ * Variant of {@link #fromJson(String)} that lets the caller fix the record
name
+ * on the resulting HoodieSchema. Equivalent to the legacy
+ * {@code SerDeHelper.fromJson(...).map(is ->
InternalSchemaConverter.convert(is, recordName))}
+ * pattern, collapsed into a single call.
+ */
+ public static Option<HoodieSchema> fromJson(String json, String recordName) {
+ Option<InternalSchema> internal = SerDeHelper.fromJson(json);
+ if (!internal.isPresent()) {
+ return Option.empty();
+ }
+ return
Option.of(HoodieSchemaInternalSchemaBridge.toHoodieSchema(internal.get(),
recordName));
+ }
+
+ /**
+ * Parses the history-schemas JSON layout (array of versioned schemas) and
+ * returns them keyed by {@code version_id}. Field ids are preserved on each
+ * returned HoodieSchema so the resulting map is directly usable by the
+ * read/merge path.
+ */
+ public static TreeMap<Long, HoodieSchema> parseHistorySchemas(String json) {
+ TreeMap<Long, InternalSchema> internals = SerDeHelper.parseSchemas(json);
+ TreeMap<Long, HoodieSchema> out = new TreeMap<>();
+ for (Long versionId : internals.keySet()) {
+ InternalSchema is = internals.get(versionId);
+ HoodieSchema hs = HoodieSchemaInternalSchemaBridge.toHoodieSchema(is,
defaultRecordName(is));
+ out.put(versionId, hs);
+ }
+ return out;
+ }
+
+ /**
+ * Appends a freshly-evolved schema to an existing serialized history blob
and
+ * returns the new blob. Mirrors {@link
SerDeHelper#inheritSchemas(InternalSchema, String)}
+ * — the {@code oldHistoryJson} is the prior {@code .hoodie/.schema/}
contents
+ * (or empty for the first commit).
+ */
+ public static String inheritHistory(HoodieSchema newSchema, String
oldHistoryJson) {
+ return SerDeHelper.inheritSchemas(
+ HoodieSchemaInternalSchemaBridge.toInternalSchema(newSchema),
oldHistoryJson);
+ }
+
+ /**
+ * Resolves the schema-history entry that applies to a given version id —
exact
+ * match if present, else the largest entry strictly less than {@code
versionId},
+ * else {@code null}. HoodieSchema-shaped replacement for
+ * {@link
org.apache.hudi.internal.schema.utils.InternalSchemaUtils#searchSchema}.
+ *
+ * <p>Note: legacy returned {@code InternalSchema.getEmptyInternalSchema()}
on
+ * miss; this returns {@code null} so callers can choose their own empty
+ * sentinel via {@link HoodieSchema#empty()}. Most callers null-check + fall
+ * back, so the change is benign.</p>
+ */
+ public static HoodieSchema searchSchema(long versionId,
java.util.TreeMap<Long, HoodieSchema> history) {
+ if (history.containsKey(versionId)) {
+ return history.get(versionId);
+ }
+ java.util.SortedMap<Long, HoodieSchema> headMap =
history.headMap(versionId);
+ return headMap.isEmpty() ? null : headMap.get(headMap.lastKey());
Review Comment:
Audited callers. The only one (BaseHoodieWriteClient line 372) already
null-checks and falls back to HoodieSchema.empty(), so no .isEmptySchema() call
would NPE.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]