hudi-agent commented on code in PR #18837:
URL: https://github.com/apache/hudi/pull/18837#discussion_r3298609449
##########
pom.xml:
##########
@@ -130,6 +129,12 @@
<hive.avro.version>1.11.4</hive.avro.version>
<presto.version>0.273</presto.version>
<trino.version>390</trino.version>
+ <!-- Trino SPI version that hudi-trino-plugin main code compiles against.
-->
+ <trino.connector.version>482-SNAPSHOT</trino.connector.version>
+ <!-- Trino version used only for *-tests.jar classifier deps. Trino does
not publish
+ test-jars for tagged releases, so this tracks a snapshot. Built
locally via the
Review Comment:
🤖 Depending on `480-SNAPSHOT` for test-jars (and `482-SNAPSHOT` for main)
means CI and downstream contributors need these snapshots in their local m2 —
Apache snapshot repos won't have Trino artifacts. Could you describe the plan
for pinning these to released versions before merge, or how CI is expected to
obtain them? Also worth noting: main and test versions diverging (482 vs 480)
is a subtle hazard if test SPI assumptions drift from main SPI.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableMetadataReader.java:
##########
@@ -55,60 +48,29 @@ public class TableMetadataReader
* @return a map from column name to their corresponding {@link
HoodieColumnRangeMetadata}
* @throws HoodieMetadataException if an error occurs while fetching the
column statistics
*/
- Map<String, HoodieColumnRangeMetadata> getColumnStats(List<Pair<String,
String>> partitionNameFileNameList, List<String> columnNames)
+ public Map<String, HoodieColumnRangeMetadata>
getColumnsRange(List<Pair<String, String>> partitionNameFileNameList,
List<String> columnNames)
Review Comment:
🤖 The method visibility changed from package-private (`getColumnStats`) to
`public` (`getColumnsRange`). Is the wider visibility intentional? Since
`TableMetadataReader` extends `HoodieBackedTableMetadata` (which already
exposes a public `getColumnStats` returning a different shape), having two
public methods with very similar names/semantics on the subclass could be
confusing for future callers — they may accidentally pick the wrong one.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java:
##########
@@ -104,9 +104,24 @@ public HudiSplitSource(
HoodieTableMetaClient metaClient = tableHandle.getMetaClient();
HoodieEngineContext engineContext = new
HoodieLocalEngineContext(metaClient.getStorage().getConf());
- HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(
- engineContext,
- tableHandle.getMetaClient().getStorage(), metadataConfig,
metaClient.getBasePath().toString(), true);
+ HoodieTableMetadata tableMetadata;
+ if (enableMetadataTable) {
+ HoodieBackedTableMetadata mdt = new HoodieBackedTableMetadata(
+ engineContext, metaClient.getStorage(),
metadataConfig, metaClient.getBasePath().toString(), true);
+ if (mdt.isMetadataTableInitialized()) {
+ tableMetadata = mdt;
+ }
+ else {
+ log.warn("Metadata table not initialized on disk for %s;
falling back to FileSystemBackedTableMetadata",
+ tableHandle.getSchemaTableName());
Review Comment:
🤖 When `mdt.isMetadataTableInitialized()` returns false, the
`HoodieBackedTableMetadata` instance is discarded without being closed.
`HoodieBackedTableMetadata` opens reader/cache resources in its constructor —
could we `mdt.close()` before falling back to `FileSystemBackedTableMetadata`?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java:
##########
@@ -35,39 +38,53 @@
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.type.TypeManager;
import io.trino.spi.type.VarcharType;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.util.Lazy;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
Review Comment:
🤖 This reduces the prepended meta-columns from all five
`HoodieRecord.HOODIE_META_COLUMNS` (commit time, seqno, key, partition path,
file name) to just record-key and partition-path. For MoR splits the file-group
reader / merger may rely on `_hoodie_commit_time` for event-time ordering and
on the full set when projecting the requested schema. Was this reduction
intentional, and have MoR + ordering-field queries been verified end-to-end?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableMetadataReader.java:
##########
@@ -55,60 +48,29 @@ public class TableMetadataReader
* @return a map from column name to their corresponding {@link
HoodieColumnRangeMetadata}
* @throws HoodieMetadataException if an error occurs while fetching the
column statistics
*/
- Map<String, HoodieColumnRangeMetadata> getColumnStats(List<Pair<String,
String>> partitionNameFileNameList, List<String> columnNames)
+ public Map<String, HoodieColumnRangeMetadata>
getColumnsRange(List<Pair<String, String>> partitionNameFileNameList,
List<String> columnNames)
throws HoodieMetadataException
{
- return
computeFileToColumnStatsMap(computeColumnStatsLookupKeys(partitionNameFileNameList,
columnNames));
- }
-
- /**
- * @param partitionNameFileNameList a list of partition and file name
pairs for which column stats need to be retrieved
- * @param columnNames list of column names for which stats are needed
- * @return a list of column stats keys to look up in the metadata table
col_stats partition.
- */
- private List<String> computeColumnStatsLookupKeys(
- final List<Pair<String, String>> partitionNameFileNameList,
- final List<String> columnNames)
- {
- return columnNames.stream()
- .flatMap(columnName -> partitionNameFileNameList.stream()
- .map(partitionNameFileNamePair ->
HoodieMetadataPayload.getColumnStatsIndexKey(
- new
PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionNameFileNamePair.getLeft())),
- new
FileIndexID(partitionNameFileNamePair.getRight()),
- new ColumnIndexID(columnName))))
- .toList();
- }
-
- /**
- * @param columnStatsLookupKeys a map from column stats key to partition
and file name pair
- * @return a map from column name to merged HoodieMetadataColumnStats
- */
- private Map<String, HoodieColumnRangeMetadata>
computeFileToColumnStatsMap(List<String> columnStatsLookupKeys)
- {
- HoodieTimer timer = HoodieTimer.start();
- Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
- getRecordsByKeys(columnStatsLookupKeys,
MetadataPartitionType.COLUMN_STATS.getPartitionPath());
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_COLUMN_STATS_METADATA_STR,
timer.endTimer()));
- return hoodieRecords.values().stream()
- .collect(Collectors.groupingBy(
- r ->
r.getData().getColumnStatMetadata().get().getColumnName(),
- Collectors.mapping(r ->
r.getData().getColumnStatMetadata().get(), Collectors.toList())))
+ Map<Pair<String, String>, List<HoodieMetadataColumnStats>>
columnStatsMap = getColumnStats(partitionNameFileNameList, columnNames);
+ return
columnStatsMap.values().stream().flatMap(Collection::stream).collect(Collectors.groupingBy(
+ HoodieMetadataColumnStats::getColumnName,
+ Collectors.mapping(colStats -> colStats, Collectors.toList())))
Review Comment:
🤖 nit: `Collectors.mapping(colStats -> colStats, Collectors.toList())` with
an identity mapper is equivalent to `Collectors.toList()` directly — could you
simplify it?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSource.java:
##########
@@ -115,8 +126,32 @@ public long getMemoryUsage()
public void close()
throws IOException
{
- fileGroupReader.close();
- pageSource.close();
+ IOException closeException = null;
+
+ recordIterator.close();
Review Comment:
🤖 `recordIterator.close()` is called outside any try/catch — if it throws
(checked or unchecked), `fileGroupReader.close()` and `pageSource.close()` are
skipped and we leak the file handles / scanner buffers. Could you wrap it the
same way as the other two closes and aggregate via `addSuppressed`?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoReaderContext.java:
##########
@@ -130,98 +149,16 @@ public IndexedRecord next()
}
@Override
- public IndexedRecord convertAvroRecord(IndexedRecord record)
+ protected Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode
mergeMode, String mergeStrategyId, String mergeImplClasses)
{
- return record;
+ return Option.of(new HoodiePreCombineAvroRecordMerger());
Review Comment:
🤖 `getRecordMerger` ignores `mergeMode`/`mergeStrategyId`/`mergeImplClasses`
and always returns `HoodiePreCombineAvroRecordMerger`. For tables configured
with `COMMIT_TIME_ORDERING` (or a custom merger), this can produce wrong merge
semantics on MoR reads. Should this dispatch on mergeMode like the other engine
readers do?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java:
##########
@@ -321,37 +341,33 @@ public static Schema.Field getFieldFromSchema(String
columnName, Schema schema)
"Failed to get column " + columnName + " from table schema");
}
- public static List<HiveColumnHandle>
prependHudiMetaColumns(List<HiveColumnHandle> dataColumns)
+ public static List<HiveColumnHandle>
prependHudiMetaAndOrderingColumns(HudiTableHandle tableHandle,
List<HiveColumnHandle> dataColumns)
{
- //For efficient lookup
- Set<String> dataColumnNames = dataColumns.stream()
+ Set<String> existingColumns = dataColumns.stream()
.map(HiveColumnHandle::getName)
- .collect(Collectors.toSet());
-
- // If all Hudi meta columns are already present, return the original
list
- if (dataColumnNames.containsAll(HOODIE_META_COLUMNS)) {
- return dataColumns;
- }
-
- // Identify only the meta columns that are missing from dataColumns to
avoid duplicates
- List<String> missingMetaColumns = HOODIE_META_COLUMNS.stream()
- .filter(metaColumn -> !dataColumnNames.contains(metaColumn))
- .toList();
+ .collect(Collectors.toCollection(HashSet::new));
List<HiveColumnHandle> columns = new ArrayList<>();
- // Create and prepend the new HiveColumnHandles for the missing meta
columns
- columns.addAll(IntStream.range(0, missingMetaColumns.size())
- .boxed()
- .map(i -> new HiveColumnHandle(
- missingMetaColumns.get(i),
+ // Add missing Hudi meta columns first
+ for (int i = 0; i < HOODIE_META_COLUMNS.size(); i++) {
+ String metaColumn = HOODIE_META_COLUMNS.get(i);
+ if (existingColumns.add(metaColumn)) { // add() returns false if
already present
+ columns.add(new HiveColumnHandle(
+ metaColumn,
i,
Review Comment:
🤖 Inside the meta-column loop, the new `HiveColumnHandle` is constructed
with `i` (the index within `HOODIE_META_COLUMNS`) as the hive column index. If
one of the meta columns is already present in `dataColumns` at a different
index, the duplicate is skipped — but the indexes used for the remaining meta
columns then no longer match their position in the resulting list. Is the
hiveColumnIndex value used downstream for ordering/position lookups, or is it
only a logical identifier here?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiConfig.java:
##########
@@ -55,7 +55,7 @@ public class HudiConfig
private boolean queryPartitionFilterRequired;
private boolean ignoreAbsentPartitions;
private Duration dynamicFilteringWaitTimeout = new Duration(1, SECONDS);
- private boolean resolveColumnNameCasingEnabled = true;
+ private boolean resolveColumnNameCasingEnabled;
Review Comment:
🤖 Default for `resolveColumnNameCasingEnabled` flipped from `true` to
`false`. Existing users with case-insensitive column lookups will silently
start seeing different behavior on upgrade. Is the flip intentional, and if so,
should it be called out in release notes?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/util/FileOperationAssertions.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi.util;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+import io.airlift.log.Logger;
+import io.trino.plugin.hudi.util.FileOperationUtils.FileOperation;
+import io.trino.testing.QueryRunner;
+import org.intellij.lang.annotations.Language;
+
+import java.util.Comparator;
+import java.util.function.Supplier;
+
+import static
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getCacheOperationSpans;
+import static
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getFileLocation;
+import static
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.isTrinoSchemaOrPermissions;
+import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
+import static java.util.stream.Collectors.toCollection;
+
+public final class FileOperationAssertions
+{
+ private static final Logger log =
Logger.get(FileOperationAssertions.class);
+
+ private FileOperationAssertions() {}
+
+ /**
+ * Asserts that file system accesses match expected operations.
+ * This version uses manual filtering for Input/InputFile operations.
+ * Logs detailed comparison at WARN level for debugging test failures.
+ */
+ public static void assertFileSystemAccesses(
+ QueryRunner queryRunner,
+ @Language("SQL") String query,
+ Multiset<FileOperation> expectedCacheAccesses)
+ throws InterruptedException
+ {
+ queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
+ // Async table-stats computation can outlive the synchronous query and
emit spans into
+ // the exporter after execute returns. A fixed Thread.sleep races with
this: when stats
+ // from query N is still running while query N+1's measurement
happens, spans leak
+ // across the boundary and counts get scrambled. Poll until the span
set is stable for
+ // two consecutive reads.
+ Multiset<FileOperation> actualCacheAccesses = waitForStableSpans(() ->
getFileOperations(queryRunner));
+ printFileAccessDebugInfo(queryRunner, actualCacheAccesses,
expectedCacheAccesses);
Review Comment:
🤖 nit: `printFileAccessDebugInfo` fires unconditionally at WARN before the
assertion, so every passing test run will dump the full span listing to CI logs
too. Could you move it so it only runs on failure (e.g. wrap
`assertMultisetsEqual` in a try-catch and log there), or at least drop to
`log.debug`?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java:
##########
@@ -397,4 +413,62 @@ public static Schema
getLatestTableSchema(HoodieTableMetaClient metaClient, Stri
throw new TrinoException(HUDI_FILESYSTEM_ERROR, e);
}
}
+
+ public static List<HiveColumnHandle> getOrderingColumnHandles(Table table,
TypeManager typeManager, Lazy<HoodieTableMetaClient> lazyMetaClient,
HiveTimestampPrecision timestampPrecision)
+ {
+ RecordMergeMode recordMergeMode =
lazyMetaClient.get().getTableConfig().getRecordMergeMode();
Review Comment:
🤖 nit: `Objects.isNull(recordMergeMode)` in a plain `if` reads a bit oddly —
`recordMergeMode == null` is the idiomatic Java form here.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]