hudi-agent commented on code in PR #18837:
URL: https://github.com/apache/hudi/pull/18837#discussion_r3300471017
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java:
##########
@@ -66,12 +67,13 @@ public HudiTableHandle(
@JsonProperty("basePath") String basePath,
@JsonProperty("tableType") HoodieTableType tableType,
@JsonProperty("partitionColumns") List<HiveColumnHandle>
partitionColumns,
+ @JsonProperty("orderingColumns") List<HiveColumnHandle>
orderingColumns,
@JsonProperty("partitionPredicates") TupleDomain<HiveColumnHandle>
partitionPredicates,
@JsonProperty("regularPredicates") TupleDomain<HiveColumnHandle>
regularPredicates,
@JsonProperty("tableSchemaStr") String tableSchemaStr,
@JsonProperty("latestCommitTime") String latestCommitTime)
{
- this(Optional.empty(), Optional.empty(), schemaName, tableName,
basePath, tableType, partitionColumns, ImmutableSet.of(),
+ this(Optional.empty(), Optional.empty(), schemaName, tableName,
basePath, tableType, partitionColumns, Lazy.lazily(() -> orderingColumns),
ImmutableSet.of(),
partitionPredicates, regularPredicates,
buildTableSchema(tableSchemaStr), () -> latestCommitTime);
Review Comment:
🤖 nit: wrapping an already-materialized `orderingColumns` in `Lazy.lazily(()
-> orderingColumns)` adds no laziness — could you pass it through directly, or
have the other constructor accept a `List<HiveColumnHandle>` and do the `Lazy`
wrap once at the single call site that actually needs it?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java:
##########
@@ -104,9 +104,24 @@ public HudiSplitSource(
HoodieTableMetaClient metaClient = tableHandle.getMetaClient();
HoodieEngineContext engineContext = new
HoodieLocalEngineContext(metaClient.getStorage().getConf());
- HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(
- engineContext,
- tableHandle.getMetaClient().getStorage(), metadataConfig,
metaClient.getBasePath().toString(), true);
+ HoodieTableMetadata tableMetadata;
Review Comment:
🤖 This inlined branch duplicates what `NativeTableMetadataFactory.create()`
(reached via `HoodieTableMetadata.create(...)`) already does — create
`HoodieBackedTableMetadata`, check `isMetadataTableInitialized()`, fall back to
`FileSystemBackedTableMetadata` if not. Was there a reason to re-implement it
here instead of calling the factory? Calling
`HoodieTableMetadata.create(engineContext, metaClient.getStorage(),
metadataConfig, metaClient.getBasePath().toString(), true)` would also avoid
the orphaned-MDT leak flagged earlier, since any future fix to the factory
would be inherited automatically.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/util/FileOperationAssertions.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi.util;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+import io.airlift.log.Logger;
+import io.trino.plugin.hudi.util.FileOperationUtils.FileOperation;
+import io.trino.testing.QueryRunner;
+import org.intellij.lang.annotations.Language;
+
+import java.util.Comparator;
+import java.util.function.Supplier;
+
+import static
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getCacheOperationSpans;
+import static
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getFileLocation;
+import static
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.isTrinoSchemaOrPermissions;
+import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
+import static java.util.stream.Collectors.toCollection;
+
+public final class FileOperationAssertions
+{
+ private static final Logger log =
Logger.get(FileOperationAssertions.class);
+
+ private FileOperationAssertions() {}
+
+ /**
+ * Asserts that file system accesses match expected operations.
+ * This version uses manual filtering for Input/InputFile operations.
+ * Logs detailed comparison at WARN level for debugging test failures.
+ */
+ public static void assertFileSystemAccesses(
+ QueryRunner queryRunner,
+ @Language("SQL") String query,
+ Multiset<FileOperation> expectedCacheAccesses)
+ throws InterruptedException
+ {
+ queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
+ // Async table-stats computation can outlive the synchronous query and
emit spans into
+ // the exporter after execute returns. A fixed Thread.sleep races with
this: when stats
+ // from query N is still running while query N+1's measurement
happens, spans leak
+ // across the boundary and counts get scrambled. Poll until the span
set is stable for
+ // two consecutive reads.
+ Multiset<FileOperation> actualCacheAccesses = waitForStableSpans(() ->
getFileOperations(queryRunner));
+ printFileAccessDebugInfo(queryRunner, actualCacheAccesses,
expectedCacheAccesses);
+ assertMultisetsEqual(actualCacheAccesses, expectedCacheAccesses);
+ }
+
+ /**
+ * Asserts that file system accesses match expected operations for Alluxio
cache tests.
+ * This version uses getCacheOperationSpans for filtering.
+ * Logs detailed comparison at WARN level for debugging test failures.
+ */
+ public static void assertAlluxioFileSystemAccesses(
+ QueryRunner queryRunner,
+ @Language("SQL") String query,
+ Multiset<FileOperation> expectedCacheAccesses)
+ throws InterruptedException
+ {
+ queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
+ // See assertFileSystemAccesses for the rationale behind polling
instead of a fixed sleep.
+ Multiset<FileOperation> actualCacheAccesses = waitForStableSpans(() ->
getAlluxioFileOperations(queryRunner));
+ printFileAccessDebugInfo(queryRunner, actualCacheAccesses,
expectedCacheAccesses);
+ assertMultisetsEqual(actualCacheAccesses, expectedCacheAccesses);
+ }
+
+ /**
+ * Returns the span set once two consecutive reads (200ms apart) agree.
Bounded by a
+ * 30-second ceiling so a runaway test fails loudly instead of hanging.
+ */
+ private static Multiset<FileOperation>
waitForStableSpans(Supplier<Multiset<FileOperation>> reader)
+ throws InterruptedException
+ {
+ long deadlineMillis = System.currentTimeMillis() + 30_000L;
+ Multiset<FileOperation> previous = null;
+ while (System.currentTimeMillis() < deadlineMillis) {
+ Thread.sleep(200L);
+ Multiset<FileOperation> current = reader.get();
+ if (previous != null && current.equals(previous)) {
+ return current;
+ }
+ previous = current;
+ }
+ return previous != null ? previous : reader.get();
+ }
+
+ /**
+ * Gets file operations from query runner spans using manual filtering.
+ */
+ public static Multiset<FileOperation> getFileOperations(QueryRunner
queryRunner)
+ {
+ return queryRunner.getSpans().stream()
+ .filter(span -> span.getName().startsWith("Input.") ||
span.getName().startsWith("InputFile.") ||
span.getName().startsWith("FileSystemCache."))
+ .filter(span ->
!span.getName().startsWith("InputFile.newInput"))
+ .filter(span -> !span.getName().startsWith("InputFile.exists"))
+ .filter(span ->
!isTrinoSchemaOrPermissions(getFileLocation(span)))
+ .map(FileOperation::create)
+ .collect(toCollection(HashMultiset::create));
+ }
+
+ /**
+ * Gets file operations for Alluxio cache tests using
getCacheOperationSpans.
+ */
+ public static Multiset<FileOperation> getAlluxioFileOperations(QueryRunner
queryRunner)
+ {
+ return getCacheOperationSpans(queryRunner)
+ .stream()
+ .filter(span -> !span.getName().startsWith("InputFile.exists"))
Review Comment:
🤖 nit: `printFileAccessDebugInfo` runs on every assertion and logs at WARN —
this will be noisy on green runs. Consider dropping to DEBUG, or only emitting
the diff blocks when actual ≠ expected.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
pom.xml:
##########
@@ -59,7 +59,6 @@
<module>packaging/hudi-utilities-bundle</module>
<module>packaging/hudi-utilities-slim-bundle</module>
<module>packaging/hudi-timeline-server-bundle</module>
- <module>packaging/hudi-trino-bundle</module>
<module>hudi-examples</module>
Review Comment:
🤖 Removing the `hudi-trino-bundle` module leaves orphaned references that
will break: `docker/hoodie/hadoop/trinobase/pom.xml` (lines 61-62) copies
`packaging/hudi-trino-bundle/target/hudi-trino-bundle-${project.version}.jar`
and the Dockerfile sets `HUDI_TRINO_BUNDLE` from it, plus
`scripts/release/validate_staged_bundles.sh` (line 40) still lists
`hudi-trino-bundle` in its expected-bundle array. Could these be cleaned up (or
updated to point at the new `hudi-trino-plugin`) before merge so the trinobase
docker build and release validation don't break?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java:
##########
@@ -397,4 +413,62 @@ public static Schema
getLatestTableSchema(HoodieTableMetaClient metaClient, Stri
throw new TrinoException(HUDI_FILESYSTEM_ERROR, e);
}
}
+
+ public static List<HiveColumnHandle> getOrderingColumnHandles(Table table,
TypeManager typeManager, Lazy<HoodieTableMetaClient> lazyMetaClient,
HiveTimestampPrecision timestampPrecision)
+ {
Review Comment:
🤖 nit: `Objects.isNull(recordMergeMode) ||
recordMergeMode.equals(RecordMergeMode.COMMIT_TIME_ORDERING)` reads more
naturally as `recordMergeMode == null || recordMergeMode ==
RecordMergeMode.COMMIT_TIME_ORDERING` (enum identity).
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSource.java:
##########
@@ -115,8 +126,32 @@ public long getMemoryUsage()
public void close()
throws IOException
{
- fileGroupReader.close();
- pageSource.close();
+ IOException closeException = null;
+
+ recordIterator.close();
Review Comment:
🤖 nit: `recordIterator.close()` sits outside any try block, so the manual
addSuppressed ladder below it only runs when the iterator closes cleanly.
Consider using `Closer` (or chained try-with-resources) so all three closes are
attempted and exceptions are aggregated uniformly.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableMetadataReader.java:
##########
@@ -55,60 +48,29 @@ public class TableMetadataReader
* @return a map from column name to their corresponding {@link
HoodieColumnRangeMetadata}
* @throws HoodieMetadataException if an error occurs while fetching the
column statistics
*/
- Map<String, HoodieColumnRangeMetadata> getColumnStats(List<Pair<String,
String>> partitionNameFileNameList, List<String> columnNames)
+ public Map<String, HoodieColumnRangeMetadata>
getColumnsRange(List<Pair<String, String>> partitionNameFileNameList,
List<String> columnNames)
Review Comment:
🤖 nit: `getColumnsRange` reads a bit oddly — since the return is a map of
(column → range), `getColumnRanges` would be more grammatical and match how
callers think about it.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]