hudi-agent commented on code in PR #18384:
URL: https://github.com/apache/hudi/pull/18384#discussion_r3252412839
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java:
##########
@@ -201,14 +202,19 @@ protected AbstractHoodieLogRecordScanner(HoodieStorage
storage, String basePath,
this.populateMetaFields = false;
this.recordKeyField = keyFieldOverride.get();
this.partitionPathFieldOpt = Option.empty();
- } else if (tableConfig.populateMetaFields()) {
- this.populateMetaFields = true;
- this.recordKeyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
- this.partitionPathFieldOpt =
Option.of(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
} else {
- this.populateMetaFields = false;
- this.recordKeyField = tableConfig.getRecordKeyFieldProp();
- this.partitionPathFieldOpt =
Option.of(tableConfig.getPartitionFieldProp());
+ // _hoodie_record_key may be null in log blocks when the column is
excluded via
+ // META_FIELDS_EXCLUDE_LIST (even with populate.meta.fields=true); in
that case the
+ // reader must read the configured source record-key field instead.
Partition path uses
+ // the analogous flag.
+ HoodieMetaFieldFlags flags =
HoodieMetaFieldFlags.fromConfig(tableConfig);
+ this.populateMetaFields = flags.isRecordKeyPopulated();
Review Comment:
🤖 Could this NPE when only `_hoodie_partition_path` is excluded (but
`_hoodie_record_key` is not)? With this assignment `populateMetaFields=true`
and `partitionPathFieldOpt` correctly points at the source field, but
`processDataBlock` (~L466) gates on `populateMetaFields` and passes
`Option.empty()` for the field pair. That routes through
`HoodieAvroUtils.createHoodieRecordFromAvro`'s `populateMetaFields=true` branch
into `SpillableMapUtils.convertToHoodieRecordPayload(rec, …,
withOperationField)`, which hardcodes `PARTITION_PATH_METADATA_FIELD` and does
`record.get(...).toString()` — that `.toString()` would NPE on the now-null
column. Should this be `flags.isRecordKeyPopulated() &&
flags.isPartitionPathPopulated()`, or should the field pair always be
constructed from the already-resolved `recordKeyField`/`partitionPathFieldOpt`?
@nsivabalan for visibility.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java:
##########
@@ -412,9 +412,14 @@ public void write(HoodieRecord<T> oldRecord) {
protected void writeToFile(HoodieKey key, HoodieRecord<T> record,
HoodieSchema schema, Properties props, boolean shouldPreserveRecordMetadata)
throws IOException {
if (shouldPreserveRecordMetadata) {
// NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly
point to the
- // file holding this record even in cases when overall metadata is
preserved
- HoodieRecord populatedRecord = record.updateMetaField(schema,
HoodieRecord.FILENAME_META_FIELD_ORD, newFilePath.getName());
- fileWriter.write(key.getRecordKey(), populatedRecord,
writeSchemaWithMetaFields, props);
+ // file holding this record even in cases when overall metadata is
preserved -
+ // unless the column is excluded via META_FIELDS_EXCLUDE_LIST, in
which case
+ // it must stay null on disk to honor the persisted exclusion.
+ HoodieRecord toWrite = hoodieTable.getMetaClient().getTableConfig()
Review Comment:
🤖 In the preserve path only `_hoodie_file_name` is nullified for excluded
fields. The other four meta fields are then written via `fileWriter.write(...)`
which routes to `writeAvro(...)` and bypasses `prepRecordWithMetadata`
entirely, so any pre-existing values from the source record (e.g.
`_hoodie_record_key` from an older base file written before that field was
excluded) flow straight into the new file. Should this also null out the other
excluded fields here (or switch to the writeWithMetadata path) so compaction
actually realizes the exclude-list storage savings?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -486,7 +487,10 @@ protected void
appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header,
List<HoodieLogBlock> blocks = new ArrayList<>(2);
HoodieLogBlock dataBlock = null;
if (!recordList.isEmpty()) {
- String keyField = config.populateMetaFields()
+ // Log-block key field names the column readers consult for the record
key. If
Review Comment:
🤖 The keyField is being derived from
`HoodieMetaFieldFlags.fromConfig(config)` (the write config) while the very
next line reads from `hoodieTable.getMetaClient().getTableConfig()`. Could you
source the flags from the table config here too (the persisted source of
truth)? Otherwise, if the write config's META_FIELDS_EXCLUDE_LIST is not in
sync with the table config, the log block's key field name and the merge
handle's keyGen check (which uses table config in `HoodieAbstractMergeHandle`)
could disagree.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSelectiveMetaFieldPopulation.java:
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.SparkAdapterSupport$;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Spark-datasource tests for the META_FIELDS_EXCLUDE_LIST table property.
+ *
+ * <p>The supported way to set the exclude list on an existing table is via
+ * {@link HoodieTableConfig#update}, mirroring what the {@code hudi-cli table
update-configs}
+ * command does. Setting it as a writer option would be rejected by
+ * {@code HoodieWriterUtils.validateTableConfig} as a config conflict.
Subsequent writes
+ * must pass the same exclude list as a writer option to satisfy that same
conflict check.
+ *
+ * <p>Covers:
+ * <ul>
+ * <li>Incremental query error path: {@code _hoodie_commit_time} excluded ⇒
query fails.</li>
+ * <li>Incremental query happy path: only {@code _hoodie_commit_time}
populated, CoW + MoR.</li>
+ * <li>MoR upsert + snapshot + compaction + snapshot with only {@code
_hoodie_commit_time}.</li>
+ * </ul>
+ */
+class TestSelectiveMetaFieldPopulation extends
SparkClientFunctionalTestHarness {
+
+ private static final String EXCLUDE_ALL_EXCEPT_COMMIT_TIME =
+ HoodieRecord.COMMIT_SEQNO_METADATA_FIELD + ","
+ + HoodieRecord.RECORD_KEY_METADATA_FIELD + ","
+ + HoodieRecord.PARTITION_PATH_METADATA_FIELD + ","
+ + HoodieRecord.FILENAME_METADATA_FIELD;
+
+ private static final String EXCLUDE_ALL =
+ HoodieRecord.COMMIT_TIME_METADATA_FIELD + ","
+ + HoodieRecord.COMMIT_SEQNO_METADATA_FIELD + ","
+ + HoodieRecord.RECORD_KEY_METADATA_FIELD + ","
+ + HoodieRecord.PARTITION_PATH_METADATA_FIELD + ","
+ + HoodieRecord.FILENAME_METADATA_FIELD;
+
+ /**
+ * Incremental query must fail when _hoodie_commit_time is in the exclusion
list.
+ * Exercises CoW V1 / V2 and MoR V1 / V2 incremental relations via
table-version parameterization
+ * (version 6 -> V1, versions 8 and 9 -> V2).
+ */
+ @ParameterizedTest
+ @CsvSource({
+ "6,COPY_ON_WRITE", "8,COPY_ON_WRITE", "9,COPY_ON_WRITE",
+ "6,MERGE_ON_READ", "8,MERGE_ON_READ", "9,MERGE_ON_READ"})
+ void testIncrementalQueryFailsWhenCommitTimeExcluded(String tableVersion,
String tableType) {
+ String path = basePath();
+ Map<String, String> writeOptions = baseWriteOptions(tableVersion,
tableType);
+ StructType schema = simpleSchema();
+
+ // Create the table with two commits using all-meta-fields-populated.
+ writeRows(Collections.singletonList(RowFactory.create("k1", "p1", "v1")),
schema, writeOptions, path);
+ writeRows(Collections.singletonList(RowFactory.create("k2", "p1", "v2")),
schema, writeOptions, path);
+
+ // Persist the exclude list (covering commit_time) directly via
HoodieTableConfig.update -
+ // the CLI-equivalent path. Writer-option route would be rejected by
validateTableConfig.
+ persistMetaFieldsExcludeList(path, EXCLUDE_ALL);
+
+ HoodieException ex = assertThrows(HoodieException.class, () ->
+ spark().read().format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE().key(),
+ DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
+ .option(DataSourceReadOptions.START_COMMIT().key(), "000")
+ .load(path)
+ .collect());
+
+ String message = ex.getMessage() == null ? "" : ex.getMessage();
+ assertTrue(
+ message.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
+ || message.toLowerCase().contains("incremental"),
+ "Expected error to mention commit_time exclusion / incremental, was: "
+ message);
+ }
+
+ /**
+ * Incremental query happy path on CoW: exclude every meta field except
_hoodie_commit_time
+ * and verify the query still returns the right rows.
+ */
+ @ParameterizedTest
+ @CsvSource({"6,COPY_ON_WRITE", "8,COPY_ON_WRITE", "9,COPY_ON_WRITE"})
+ void testIncrementalQueryServesDataWithOnlyCommitTimePopulated_CoW(String
tableVersion,
+ String
tableType) {
+ runIncrementalHappyPath(tableVersion, tableType);
+ }
+
+ /**
+ * Incremental query happy path on MoR. Excludes every meta field except
+ * _hoodie_commit_time so the merge of base + log files must still resolve
correctly.
+ */
+ @ParameterizedTest
+ @CsvSource({"6,MERGE_ON_READ", "8,MERGE_ON_READ", "9,MERGE_ON_READ"})
+ void testIncrementalQueryServesDataWithOnlyCommitTimePopulated_MoR(String
tableVersion,
+ String
tableType) {
+ runIncrementalHappyPath(tableVersion, tableType);
+ }
+
+ /**
+ * MoR write -> snapshot -> compaction -> snapshot, with only
_hoodie_commit_time populated.
+ * Ensures (a) snapshot reads merging base + log files do not assume the
four excluded
+ * fields are present and (b) compaction preserves the null-meta-field
invariant.
+ */
+ @ParameterizedTest
+ @CsvSource({"8", "9"})
+ void testMoRSnapshotAndCompactionWithOnlyCommitTimePopulated(String
tableVersion) {
+ String path = basePath();
+ Map<String, String> writeOptions = baseWriteOptions(tableVersion,
"MERGE_ON_READ");
+ writeOptions.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
+ StructType schema = simpleSchema();
+
+ // Bootstrap the table with one delta commit so it exists. Then flip the
exclude list.
+ writeRows(Collections.singletonList(RowFactory.create("k0", "p1", "v0")),
schema, writeOptions, path);
+ persistMetaFieldsExcludeList(path, EXCLUDE_ALL_EXCEPT_COMMIT_TIME);
+
+ // Subsequent writes must echo the same exclude list to satisfy the
writer-side conflict check.
+ Map<String, String> writeOptionsWithExclude =
withExcludeList(writeOptions, EXCLUDE_ALL_EXCEPT_COMMIT_TIME);
+
+ // delta commit 2: insert.
+ writeRows(Arrays.asList(
+ RowFactory.create("k1", "p1", "v1"),
+ RowFactory.create("k2", "p1", "v2")), schema, writeOptionsWithExclude,
path);
+
+ // delta commit 3: update k1, insert k3.
+ writeRows(Arrays.asList(
+ RowFactory.create("k1", "p1", "v1-updated"),
+ RowFactory.create("k3", "p1", "v3")), schema, writeOptionsWithExclude,
path);
+
+ Dataset<Row> preCompactSnapshot = spark().read().format("hudi").load(path);
+ assertEquals(4, preCompactSnapshot.count(),
+ "Snapshot read pre-compaction must surface all 4 keys (k0, k1 updated,
k2, k3)");
+ assertEquals(1, preCompactSnapshot.filter("column1 = 'k1' and column3 =
'v1-updated'").count());
+ // Excluded fields must be null for rows written post-flip (k1, k2, k3).
k0 was written before
+ // the flip so its excluded fields may be populated -- assert at least 3
are null.
+
assertTrue(preCompactSnapshot.filter(HoodieRecord.RECORD_KEY_METADATA_FIELD + "
is null").count() >= 3,
+ "At least 3 rows (post-flip writes) should have null
_hoodie_record_key");
+ // _hoodie_commit_time must always be populated.
+ assertEquals(4,
preCompactSnapshot.filter(HoodieRecord.COMMIT_TIME_METADATA_FIELD + " is not
null").count());
+
+ // delta commit 4 + inline compaction.
+ Map<String, String> compactOptions = new
HashMap<>(writeOptionsWithExclude);
+ compactOptions.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
+
compactOptions.put(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"1");
+ writeRows(Collections.singletonList(RowFactory.create("k2", "p1",
"v2-updated")), schema, compactOptions, path);
+
+ Dataset<Row> postCompactSnapshot =
spark().read().format("hudi").load(path);
+ assertEquals(4, postCompactSnapshot.count());
+ assertEquals(1, postCompactSnapshot.filter("column1 = 'k1' and column3 =
'v1-updated'").count());
+ assertEquals(1, postCompactSnapshot.filter("column1 = 'k2' and column3 =
'v2-updated'").count());
+ assertEquals(4,
postCompactSnapshot.filter(HoodieRecord.COMMIT_TIME_METADATA_FIELD + " is not
null").count());
+
+ HoodieTableMetaClient metaClient =
+
HoodieTableMetaClient.builder().setBasePath(path).setConf(storageConf()).build();
+
assertTrue(metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants()
+ .getInstantsAsStream().anyMatch(i ->
i.getAction().equals("commit")),
+ "Expected a completed compaction (commit) instant on the timeline");
+ }
+
+ private void runIncrementalHappyPath(String tableVersion, String tableType) {
+ String path = basePath();
+ Map<String, String> writeOptions = baseWriteOptions(tableVersion,
tableType);
+ StructType schema = simpleSchema();
+
+ // Bootstrap with one commit (under default populate=all) so the table
exists, then flip.
+ writeRows(Collections.singletonList(RowFactory.create("k0", "p1", "v0")),
schema, writeOptions, path);
+ persistMetaFieldsExcludeList(path, EXCLUDE_ALL_EXCEPT_COMMIT_TIME);
+
+ Map<String, String> writeOptionsWithExclude =
withExcludeList(writeOptions, EXCLUDE_ALL_EXCEPT_COMMIT_TIME);
+
+ // Commits 2, 3, 4 are written under the new exclude-list state.
+ writeRows(Arrays.asList(
+ RowFactory.create("k1", "p1", "v1"),
+ RowFactory.create("k2", "p1", "v2")), schema, writeOptionsWithExclude,
path);
+ writeRows(Collections.singletonList(RowFactory.create("k3", "p1", "v3")),
schema, writeOptionsWithExclude, path);
+ writeRows(Collections.singletonList(RowFactory.create("k4", "p1", "v4")),
schema, writeOptionsWithExclude, path);
+
+ HoodieTableMetaClient metaClient =
+
HoodieTableMetaClient.builder().setBasePath(path).setConf(storageConf()).build();
+ // Use the completed-commit timeline so each commit shows up exactly once.
Active-timeline
+ // getInstants() returns requested/inflight/completed states separately,
which would interleave
+ // and break positional indexing below.
+ List<Pair<HoodieInstant, String>> sortedInstants =
metaClient.getActiveTimeline()
+ .getCommitsTimeline().filterCompletedInstants().getInstants()
+ .stream()
+ .map(instant -> Pair.of(
+ instant, tableVersion.equals("6") ? instant.requestedTime() :
instant.getCompletionTime()))
+ .collect(Collectors.toList());
+
+ // Snapshot sanity: 5 rows total, the four excluded fields are null on the
4 post-flip rows.
+ Dataset<Row> snapshot = spark().read().format("hudi").load(path);
+ assertEquals(5, snapshot.count());
+ assertTrue(snapshot.filter(HoodieRecord.RECORD_KEY_METADATA_FIELD + " is
null").count() >= 4);
+ assertTrue(snapshot.filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + "
is null").count() >= 4);
+ assertTrue(snapshot.filter(HoodieRecord.FILENAME_METADATA_FIELD + " is
null").count() >= 4);
+ assertTrue(snapshot.filter(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD + " is
null").count() >= 4);
+ assertEquals(5, snapshot.filter(HoodieRecord.COMMIT_TIME_METADATA_FIELD +
" is not null").count());
+
+ // Incremental query covering commits 3 and 4 (post-flip).
+ Dataset<Row> incremental = spark().read().format("hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE().key(),
+ DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
+ .option(DataSourceReadOptions.START_COMMIT().key(),
sortedInstants.get(1).getRight())
Review Comment:
🤖 This NOTE describes a real-sounding failure mode where `df.count()` /
`df.filter(...).count()` return 0 instead of the actual row count on
incremental queries when `_hoodie_record_key` is all-null. If that reproduces
with user-facing queries (not just this test), it's a significant correctness
concern for any aggregate against a table with selective exclusion. Could you
confirm whether this is reproducible via the production read path, and if so,
whether something on the incremental relation needs to avoid pushing predicates
onto the excluded (all-null) columns? @yihua any thoughts on read-path
implications here?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]