hudi-agent commented on code in PR #18724:
URL: https://github.com/apache/hudi/pull/18724#discussion_r3235379375
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoReaderContext.java:
##########
@@ -130,98 +146,14 @@ public IndexedRecord next()
}
@Override
- public IndexedRecord convertAvroRecord(IndexedRecord record)
+ protected Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode
mergeMode, String mergeStrategyId, String mergeImplClasses)
{
- return record;
+ return Option.of(new HoodiePreCombineAvroRecordMerger());
Review Comment:
🤖 Could you confirm whether `HoodiePreCombineAvroRecordMerger` is the
intended merger here? Its javadoc says "It should only be used for
deduplication among incoming records," yet this context drives read-time
merging of base + log records. Other reader contexts
(`HoodieAvroReaderContext`, `HiveHoodieReaderContext`,
`FlinkRowDataReaderContext`) switch on `mergeMode` and use
`HoodieAvroRecordMerger` for EVENT_TIME_ORDERING and
`OverwriteWithLatestMerger` for COMMIT_TIME_ORDERING — without this branching,
tables with COMMIT_TIME_ORDERING (or custom mergers) may not get the expected
merge semantics. @yihua could you weigh in here?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSource.java:
##########
@@ -115,8 +125,32 @@ public long getMemoryUsage()
public void close()
throws IOException
{
- fileGroupReader.close();
- pageSource.close();
+ IOException closeException = null;
+
+ recordIterator.close();
Review Comment:
🤖 `recordIterator.close()` is called outside the try-catch chain. The inner
`HoodieFileGroupReaderIterator.close()` wraps `IOException` as
`HoodieIOException` (a RuntimeException) — if it throws,
`fileGroupReader.close()` and `pageSource.close()` will be skipped and Trino's
`ConnectorPageSource` will leak. Could you wrap this call in a try-catch (or
use try-with-resources) so the remaining closes still execute?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java:
##########
@@ -104,9 +106,9 @@ public HudiSplitSource(
HoodieTableMetaClient metaClient = tableHandle.getMetaClient();
HoodieEngineContext engineContext = new
HoodieLocalEngineContext(metaClient.getStorage().getConf());
- HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(
- engineContext,
- tableHandle.getMetaClient().getStorage(), metadataConfig,
metaClient.getBasePath().toString(), true);
+ HoodieTableMetadata tableMetadata = enableMetadataTable &&
tableHandle.getMetaClient().getTableConfig().isMetadataTableAvailable() ?
Review Comment:
🤖 nit: `metaClient` is already assigned a few lines above — could you use
that variable here instead of calling `tableHandle.getMetaClient()` three more
times? It would make it immediately obvious these all refer to the same object.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/util/FileOperationAssertions.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi.util;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+import io.airlift.log.Logger;
+import io.trino.plugin.hudi.util.FileOperationUtils.FileOperation;
+import io.trino.testing.QueryRunner;
+import org.intellij.lang.annotations.Language;
+
+import java.util.Comparator;
+
+import static
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getCacheOperationSpans;
+import static
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getFileLocation;
+import static
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.isTrinoSchemaOrPermissions;
+import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
+import static java.util.stream.Collectors.toCollection;
+
+public final class FileOperationAssertions
+{
+ private static final Logger log =
Logger.get(FileOperationAssertions.class);
+
+ private FileOperationAssertions() {}
+
+ /**
+ * Asserts that file system accesses match expected operations.
+ * This version uses manual filtering for Input/InputFile operations.
+ * Logs detailed comparison at WARN level for debugging test failures.
+ */
+ public static void assertFileSystemAccesses(
+ QueryRunner queryRunner,
+ @Language("SQL") String query,
+ Multiset<FileOperation> expectedCacheAccesses)
+ throws InterruptedException
+ {
+ queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
+ // Allow time for table stats computation to finish before validation.
+ Thread.sleep(1000L);
+ Multiset<FileOperation> actualCacheAccesses =
getFileOperations(queryRunner);
+ printFileAccessDebugInfo(queryRunner, actualCacheAccesses,
expectedCacheAccesses);
+ assertMultisetsEqual(actualCacheAccesses, expectedCacheAccesses);
+ }
+
+ /**
+ * Asserts that file system accesses match expected operations for Alluxio
cache tests.
+ * This version uses getCacheOperationSpans for filtering.
+ * Logs detailed comparison at WARN level for debugging test failures.
+ */
+ public static void assertAlluxioFileSystemAccesses(
+ QueryRunner queryRunner,
+ @Language("SQL") String query,
+ Multiset<FileOperation> expectedCacheAccesses)
+ throws InterruptedException
+ {
+ queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
+ // Allow time for table stats computation to finish before validation.
+ Thread.sleep(1000L);
+ Multiset<FileOperation> actualCacheAccesses =
getAlluxioFileOperations(queryRunner);
+ printFileAccessDebugInfo(queryRunner, actualCacheAccesses,
expectedCacheAccesses);
+ assertMultisetsEqual(actualCacheAccesses, expectedCacheAccesses);
+ }
+
+ /**
+ * Gets file operations from query runner spans using manual filtering.
+ */
+ public static Multiset<FileOperation> getFileOperations(QueryRunner
queryRunner)
+ {
+ return queryRunner.getSpans().stream()
+ .filter(span -> span.getName().startsWith("Input.") ||
span.getName().startsWith("InputFile.") ||
span.getName().startsWith("FileSystemCache."))
+ .filter(span ->
!span.getName().startsWith("InputFile.newInput"))
+ .filter(span -> !span.getName().startsWith("InputFile.exists"))
+ .filter(span ->
!isTrinoSchemaOrPermissions(getFileLocation(span)))
+ .map(FileOperation::create)
+ .collect(toCollection(HashMultiset::create));
+ }
+
+ /**
+ * Gets file operations for Alluxio cache tests using
getCacheOperationSpans.
+ */
+ public static Multiset<FileOperation> getAlluxioFileOperations(QueryRunner
queryRunner)
+ {
+ return getCacheOperationSpans(queryRunner)
+ .stream()
+ .filter(span -> !span.getName().startsWith("InputFile.exists"))
+ .map(FileOperation::create)
+ .collect(toCollection(HashMultiset::create));
+ }
+
+ private static void printFileAccessDebugInfo(
+ QueryRunner queryRunner,
+ Multiset<FileOperation> actualCacheAccesses,
+ Multiset<FileOperation> expectedCacheAccesses)
+ {
+ // Log all file paths accessed for debugging
+ log.warn("=== All File Paths Accessed ===");
Review Comment:
🤖 nit: `printFileAccessDebugInfo` fires on every test run (including passing
ones), logging several WARN lines each time. Have you considered only calling
it on failure — e.g. inside a `catch` block after the assert throws — or
dropping the level to DEBUG so it doesn't pollute CI output?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableMetadataReader.java:
##########
@@ -55,60 +48,29 @@ public class TableMetadataReader
* @return a map from column name to their corresponding {@link
HoodieColumnRangeMetadata}
* @throws HoodieMetadataException if an error occurs while fetching the
column statistics
*/
- Map<String, HoodieColumnRangeMetadata> getColumnStats(List<Pair<String,
String>> partitionNameFileNameList, List<String> columnNames)
+ public Map<String, HoodieColumnRangeMetadata>
getColumnsRange(List<Pair<String, String>> partitionNameFileNameList,
List<String> columnNames)
throws HoodieMetadataException
{
- return
computeFileToColumnStatsMap(computeColumnStatsLookupKeys(partitionNameFileNameList,
columnNames));
- }
-
- /**
- * @param partitionNameFileNameList a list of partition and file name
pairs for which column stats need to be retrieved
- * @param columnNames list of column names for which stats are needed
- * @return a list of column stats keys to look up in the metadata table
col_stats partition.
- */
- private List<String> computeColumnStatsLookupKeys(
- final List<Pair<String, String>> partitionNameFileNameList,
- final List<String> columnNames)
- {
- return columnNames.stream()
- .flatMap(columnName -> partitionNameFileNameList.stream()
- .map(partitionNameFileNamePair ->
HoodieMetadataPayload.getColumnStatsIndexKey(
- new
PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionNameFileNamePair.getLeft())),
- new
FileIndexID(partitionNameFileNamePair.getRight()),
- new ColumnIndexID(columnName))))
- .toList();
- }
-
- /**
- * @param columnStatsLookupKeys a map from column stats key to partition
and file name pair
- * @return a map from column name to merged HoodieMetadataColumnStats
- */
- private Map<String, HoodieColumnRangeMetadata>
computeFileToColumnStatsMap(List<String> columnStatsLookupKeys)
- {
- HoodieTimer timer = HoodieTimer.start();
- Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
- getRecordsByKeys(columnStatsLookupKeys,
MetadataPartitionType.COLUMN_STATS.getPartitionPath());
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_COLUMN_STATS_METADATA_STR,
timer.endTimer()));
- return hoodieRecords.values().stream()
- .collect(Collectors.groupingBy(
- r ->
r.getData().getColumnStatMetadata().get().getColumnName(),
- Collectors.mapping(r ->
r.getData().getColumnStatMetadata().get(), Collectors.toList())))
+ Map<Pair<String, String>, List<HoodieMetadataColumnStats>>
columnStatsMap = getColumnStats(partitionNameFileNameList, columnNames);
+ return
columnStatsMap.values().stream().flatMap(Collection::stream).collect(Collectors.groupingBy(
+ HoodieMetadataColumnStats::getColumnName,
+ Collectors.mapping(colStats -> colStats, Collectors.toList())))
.entrySet().stream()
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- e -> {
- long valueCount = 0L;
- long nullCount = 0L;
- long totalSize = 0L;
- long totalUncompressedSize = 0L;
- for (HoodieMetadataColumnStats stats :
e.getValue()) {
- valueCount += stats.getValueCount();
- nullCount += stats.getNullCount();
- totalSize += stats.getTotalSize();
- totalUncompressedSize +=
stats.getTotalUncompressedSize();
- }
- return HoodieColumnRangeMetadata.create(
- "", e.getKey(), null, null, nullCount,
valueCount, totalSize, totalUncompressedSize);
- }));
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
Review Comment:
🤖 nit: `Collectors.mapping(colStats -> colStats, Collectors.toList())` is
the identity mapping, so it could just be `Collectors.toList()` directly.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]