hudi-agent commented on code in PR #18837:
URL: https://github.com/apache/hudi/pull/18837#discussion_r3313066596


##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java:
##########
@@ -397,4 +413,62 @@ public static Schema 
getLatestTableSchema(HoodieTableMetaClient metaClient, Stri
             throw new TrinoException(HUDI_FILESYSTEM_ERROR, e);
         }
     }
+
+    public static List<HiveColumnHandle> getOrderingColumnHandles(Table table, 
TypeManager typeManager, Lazy<HoodieTableMetaClient> lazyMetaClient, 
HiveTimestampPrecision timestampPrecision)
+    {
+        RecordMergeMode recordMergeMode = 
lazyMetaClient.get().getTableConfig().getRecordMergeMode();
+        if (Objects.isNull(recordMergeMode) || 
recordMergeMode.equals(RecordMergeMode.COMMIT_TIME_ORDERING)) {
+            // if commit time ordering is enabled, return empty list
+            return Collections.emptyList();
+        }
+
+        ImmutableList.Builder<HiveColumnHandle> columns = 
ImmutableList.builder();
+        List<String> orderingColumnNames = 
lazyMetaClient.get().getTableConfig().getOrderingFields();
+
+        int hiveColumnIndex = 0;
+        for (Column field : table.getDataColumns()) {
+            // ignore unsupported types rather than failing
+            if (orderingColumnNames.contains(field.getName())) {
+                HiveType hiveType = field.getType();
+                if (typeSupported(hiveType.getTypeInfo(), 
table.getStorage().getStorageFormat())) {
+                    columns.add(createBaseColumn(field.getName(), 
hiveColumnIndex, hiveType, getType(hiveType, typeManager, timestampPrecision), 
REGULAR, field.getComment()));
+                }
+            }
+            hiveColumnIndex++;
+        }
+
+        return columns.build();
+    }
+
+    /**
+     * Converts the given {@link HoodiePairData} into a {@link Map}.
+     * <p>
+     * Special handling is applied for null keys:
+     * <ul>
+     *   <li>If a key is null, it is stored in the map as a {@code null} 
entry.</li>
+     *   <li>If multiple entries share the same key (including null), the 
latest value overwrites the previous one.</li>
+     * </ul>
+     *
+     * @param pairData the HoodiePairData containing key-value pairs
+     * @param <K> the type of keys maintained by the resulting map
+     * @param <V> the type of mapped values
+     * @return a {@link Map} containing all key-value pairs from the input data
+     */
+    public static <K, V> Map<K, V> collectAsMap(HoodiePairData<K, V> pairData)
+    {
+        // Map each pair to (Option<Pair.key>, V) to handle null keys uniformly
+        // If there are multiple entries sharing the same key, use the 
incoming one
+        return pairData.mapToPair(pair ->
+                        Pair.of(

Review Comment:
   🤖 nit: the `mapToPair` step wraps keys in `Option.ofNullable` only to 
immediately unwrap them with `.orElse(null)` — `HashMap` natively supports null 
keys, so this indirection implies a special contract that isn't actually there. 
Could this be simplified to 
`pairData.collectAsList().stream().collect(HashMap::new, (m, p) -> 
m.put(p.getKey(), p.getValue()), HashMap::putAll)`?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSource.java:
##########
@@ -51,11 +54,29 @@ public HudiPageSource(
     {
         this.pageSource = pageSource;
         this.fileGroupReader = fileGroupReader;
-        this.initFileGroupReader();
         this.readerContext = readerContext;
         this.columnHandles = columnHandles;

Review Comment:
   🤖 This catches only `IOException`, but 
`fileGroupReader.getClosableIterator()` ultimately calls 
`initRecordIterators()` which can throw unchecked exceptions too (e.g. 
`HoodieIOException` extends `RuntimeException`, plus NPE/IAE from schema/file 
validation). If any of those fires, `fileGroupReader` and `pageSource` are 
never closed and leak. Could you widen this to `catch (Throwable e)` (or at 
least add `RuntimeException`) so the cleanup ladder runs in those paths too?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/util/FileOperationAssertions.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi.util;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+import io.airlift.log.Logger;
+import io.trino.plugin.hudi.util.FileOperationUtils.FileOperation;
+import io.trino.testing.QueryRunner;
+import org.intellij.lang.annotations.Language;
+
+import java.util.Comparator;
+import java.util.function.Supplier;
+
+import static 
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getCacheOperationSpans;
+import static 
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getFileLocation;
+import static 
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.isTrinoSchemaOrPermissions;
+import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
+import static java.util.stream.Collectors.toCollection;
+
+public final class FileOperationAssertions
+{
+    private static final Logger log = 
Logger.get(FileOperationAssertions.class);
+
+    private FileOperationAssertions() {}
+
+    /**
+     * Asserts that file system accesses match expected operations.
+     * This version uses manual filtering for Input/InputFile operations.
+     * Logs detailed comparison at WARN level for debugging test failures.
+     */
+    public static void assertFileSystemAccesses(
+            QueryRunner queryRunner,
+            @Language("SQL") String query,
+            Multiset<FileOperation> expectedCacheAccesses)
+            throws InterruptedException
+    {
+        queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
+        // Async table-stats computation can outlive the synchronous query and 
emit spans into
+        // the exporter after execute returns. A fixed Thread.sleep races with 
this: when stats
+        // from query N is still running while query N+1's measurement 
happens, spans leak
+        // across the boundary and counts get scrambled. Poll until the span 
set is stable for
+        // two consecutive reads.
+        Multiset<FileOperation> actualCacheAccesses = waitForStableSpans(() -> 
getFileOperations(queryRunner));
+        printFileAccessDebugInfo(queryRunner, actualCacheAccesses, 
expectedCacheAccesses);
+        assertMultisetsEqual(actualCacheAccesses, expectedCacheAccesses);
+    }
+
+    /**
+     * Asserts that file system accesses match expected operations for Alluxio 
cache tests.
+     * This version uses getCacheOperationSpans for filtering.
+     * Logs detailed comparison at WARN level for debugging test failures.
+     */
+    public static void assertAlluxioFileSystemAccesses(
+            QueryRunner queryRunner,
+            @Language("SQL") String query,
+            Multiset<FileOperation> expectedCacheAccesses)
+            throws InterruptedException
+    {
+        queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
+        // See assertFileSystemAccesses for the rationale behind polling 
instead of a fixed sleep.
+        Multiset<FileOperation> actualCacheAccesses = waitForStableSpans(() -> 
getAlluxioFileOperations(queryRunner));
+        printFileAccessDebugInfo(queryRunner, actualCacheAccesses, 
expectedCacheAccesses);
+        assertMultisetsEqual(actualCacheAccesses, expectedCacheAccesses);
+    }
+
+    /**
+     * Returns the span set once two consecutive reads (200ms apart) agree. 
Bounded by a
+     * 30-second ceiling so a runaway test fails loudly instead of hanging.
+     */
+    private static Multiset<FileOperation> 
waitForStableSpans(Supplier<Multiset<FileOperation>> reader)
+            throws InterruptedException
+    {
+        long deadlineMillis = System.currentTimeMillis() + 30_000L;
+        Multiset<FileOperation> previous = null;
+        while (System.currentTimeMillis() < deadlineMillis) {
+            Thread.sleep(200L);
+            Multiset<FileOperation> current = reader.get();
+            if (previous != null && current.equals(previous)) {
+                return current;
+            }
+            previous = current;
+        }
+        return previous != null ? previous : reader.get();
+    }
+
+    /**
+     * Gets file operations from query runner spans using manual filtering.
+     */
+    public static Multiset<FileOperation> getFileOperations(QueryRunner 
queryRunner)
+    {
+        return queryRunner.getSpans().stream()
+                .filter(span -> span.getName().startsWith("Input.") || 
span.getName().startsWith("InputFile.") || 
span.getName().startsWith("FileSystemCache."))
+                .filter(span -> 
!span.getName().startsWith("InputFile.newInput"))
+                .filter(span -> !span.getName().startsWith("InputFile.exists"))
+                .filter(span -> 
!isTrinoSchemaOrPermissions(getFileLocation(span)))
+                .map(FileOperation::create)
+                .collect(toCollection(HashMultiset::create));
+    }
+
+    /**
+     * Gets file operations for Alluxio cache tests using 
getCacheOperationSpans.
+     */
+    public static Multiset<FileOperation> getAlluxioFileOperations(QueryRunner 
queryRunner)
+    {
+        return getCacheOperationSpans(queryRunner)
+                .stream()
+                .filter(span -> !span.getName().startsWith("InputFile.exists"))
+                .map(FileOperation::create)
+                .collect(toCollection(HashMultiset::create));
+    }
+
+    private static void printFileAccessDebugInfo(
+            QueryRunner queryRunner,
+            Multiset<FileOperation> actualCacheAccesses,
+            Multiset<FileOperation> expectedCacheAccesses)
+    {
+        // Log all file paths accessed for debugging
+        log.warn("=== All File Paths Accessed ===");
+        queryRunner.getSpans().stream()
+                .filter(span -> 
span.getName().equals("InputFile.lastModified") || 
span.getName().equals("InputFile.length"))

Review Comment:
   🤖 nit: `printFileAccessDebugInfo` is called unconditionally before 
`assertMultisetsEqual`, so it logs multiple WARN-level messages on every test 
invocation — including fully-passing ones. Using `log.warn` for diagnostic 
context that's only useful on failure means CI logs will be flooded with WARN 
noise on green runs. Could these be `log.debug` calls instead, or gated on the 
assertion actually failing?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableMetadataReader.java:
##########
@@ -55,60 +48,29 @@ public class TableMetadataReader
      * @return a map from column name to their corresponding {@link 
HoodieColumnRangeMetadata}
      * @throws HoodieMetadataException if an error occurs while fetching the 
column statistics
      */
-    Map<String, HoodieColumnRangeMetadata> getColumnStats(List<Pair<String, 
String>> partitionNameFileNameList, List<String> columnNames)
+    public Map<String, HoodieColumnRangeMetadata> 
getColumnsRange(List<Pair<String, String>> partitionNameFileNameList, 
List<String> columnNames)
             throws HoodieMetadataException
     {
-        return 
computeFileToColumnStatsMap(computeColumnStatsLookupKeys(partitionNameFileNameList,
 columnNames));
-    }
-
-    /**
-     * @param partitionNameFileNameList a list of partition and file name 
pairs for which column stats need to be retrieved
-     * @param columnNames list of column names for which stats are needed
-     * @return a list of column stats keys to look up in the metadata table 
col_stats partition.
-     */
-    private List<String> computeColumnStatsLookupKeys(
-            final List<Pair<String, String>> partitionNameFileNameList,
-            final List<String> columnNames)
-    {
-        return columnNames.stream()
-                .flatMap(columnName -> partitionNameFileNameList.stream()
-                        .map(partitionNameFileNamePair -> 
HoodieMetadataPayload.getColumnStatsIndexKey(
-                                new 
PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionNameFileNamePair.getLeft())),
-                                new 
FileIndexID(partitionNameFileNamePair.getRight()),
-                                new ColumnIndexID(columnName))))
-                .toList();
-    }
-
-    /**
-     * @param columnStatsLookupKeys a map from column stats key to partition 
and file name pair
-     * @return a map from column name to merged HoodieMetadataColumnStats
-     */
-    private Map<String, HoodieColumnRangeMetadata> 
computeFileToColumnStatsMap(List<String> columnStatsLookupKeys)
-    {
-        HoodieTimer timer = HoodieTimer.start();
-        Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
-                getRecordsByKeys(columnStatsLookupKeys, 
MetadataPartitionType.COLUMN_STATS.getPartitionPath());
-        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_COLUMN_STATS_METADATA_STR, 
timer.endTimer()));
-        return hoodieRecords.values().stream()
-                .collect(Collectors.groupingBy(
-                        r -> 
r.getData().getColumnStatMetadata().get().getColumnName(),
-                        Collectors.mapping(r -> 
r.getData().getColumnStatMetadata().get(), Collectors.toList())))
+        Map<Pair<String, String>, List<HoodieMetadataColumnStats>> 
columnStatsMap = getColumnStats(partitionNameFileNameList, columnNames);
+        return 
columnStatsMap.values().stream().flatMap(Collection::stream).collect(Collectors.groupingBy(
+                HoodieMetadataColumnStats::getColumnName,
+                Collectors.mapping(colStats -> colStats, Collectors.toList())))

Review Comment:
   🤖 nit: `Collectors.mapping(colStats -> colStats, Collectors.toList())` is an 
identity mapping — could it just be `Collectors.toList()` as the downstream 
collector in `groupingBy`?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to