voonhous commented on code in PR #18837:
URL: https://github.com/apache/hudi/pull/18837#discussion_r3467022363


##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java:
##########
@@ -397,4 +420,104 @@ public static Schema 
getLatestTableSchema(HoodieTableMetaClient metaClient, Stri
             throw new TrinoException(HUDI_FILESYSTEM_ERROR, e);
         }
     }
+
+    /**
+     * Returns the column handles that must be present in the read schema for 
the file group reader to merge
+     * correctly: the ordering columns, plus the mandatory merge columns 
declared by a configured custom record
+     * merger (via {@link HoodieRecordMerger#getMandatoryFieldsForMerging}).
+     * <p>
+     * For COMMIT_TIME / EVENT_TIME tables this is exactly the ordering 
columns (so behavior is unchanged). For a
+     * CUSTOM merge mode with a registered merger, it additionally includes 
any data columns the merger reads at
+     * merge time (e.g. an arbitrary decision column) so that those columns 
are read from the base file even when
+     * the query does not project them -- without this the merger would see 
null for an un-projected column.
+     */
+    public static List<HiveColumnHandle> getMergeRequiredColumnHandles(
+            Table table,
+            TypeManager typeManager,
+            Lazy<HoodieTableMetaClient> lazyMetaClient,
+            List<String> recordMergerImpls,
+            HiveTimestampPrecision timestampPrecision)
+    {
+        HoodieTableMetaClient metaClient = lazyMetaClient.get();
+        HoodieTableConfig tableConfig = metaClient.getTableConfig();
+        RecordMergeMode recordMergeMode = tableConfig.getRecordMergeMode();
+
+        LinkedHashSet<String> requiredColumnNames = new LinkedHashSet<>();
+        if (recordMergeMode != null && recordMergeMode != 
RecordMergeMode.COMMIT_TIME_ORDERING) {
+            requiredColumnNames.addAll(tableConfig.getOrderingFields());
+        }
+
+        // For a CUSTOM merge mode, ask the configured merger which fields it 
needs at merge time and include them
+        // so they are read even when not projected. Only the merger's 
declared columns are added (not all columns),
+        // so non-custom tables and mergers that only use the key/ordering 
fields incur no extra reads.
+        if (recordMergeMode == RecordMergeMode.CUSTOM && recordMergerImpls != 
null && !recordMergerImpls.isEmpty()) {
+            Option<HoodieRecordMerger> merger = 
HoodieRecordUtils.createValidRecordMerger(
+                    EngineType.JAVA, String.join(",", recordMergerImpls), 
tableConfig.getRecordMergeStrategyId());
+            if (merger.isPresent()) {
+                TypedProperties props = new TypedProperties();
+                props.putAll(tableConfig.getProps());
+                props.setProperty(RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY, 
String.join(",", recordMergerImpls));
+                HoodieSchema tableSchema;
+                try {
+                    tableSchema = new 
TableSchemaResolver(metaClient).getTableSchema();
+                }
+                catch (Exception e) {
+                    throw new TrinoException(HUDI_SCHEMA_ERROR, "Failed to 
resolve table schema for merge column resolution", e);
+                }
+                String[] mandatoryFields = 
merger.get().getMandatoryFieldsForMerging(tableSchema, tableConfig, props);
+                if (mandatoryFields != null) {
+                    Collections.addAll(requiredColumnNames, mandatoryFields);
+                }
+            }
+        }
+
+        if (requiredColumnNames.isEmpty()) {
+            return Collections.emptyList();
+        }
+        return buildColumnHandles(table, typeManager, requiredColumnNames, 
timestampPrecision);
+    }
+
+    /**
+     * Builds {@link HiveColumnHandle}s, preserving physical (data-column) 
index, for the data columns whose names
+     * appear in {@code columnNames}. Names that are not data columns (e.g. 
Hudi meta fields) or whose types are not
+     * supported by the storage format are skipped.
+     */
+    private static List<HiveColumnHandle> buildColumnHandles(Table table, 
TypeManager typeManager, Set<String> columnNames, HiveTimestampPrecision 
timestampPrecision)
+    {
+        ImmutableList.Builder<HiveColumnHandle> columns = 
ImmutableList.builder();
+        int hiveColumnIndex = 0;
+        for (Column field : table.getDataColumns()) {
+            // ignore unsupported types rather than failing
+            if (columnNames.contains(field.getName())) {
+                HiveType hiveType = field.getType();
+                if (typeSupported(hiveType.getTypeInfo(), 
table.getStorage().getStorageFormat())) {
+                    columns.add(createBaseColumn(field.getName(), 
hiveColumnIndex, hiveType, getType(hiveType, typeManager, timestampPrecision), 
REGULAR, field.getComment()));
+                }
+            }
+            hiveColumnIndex++;
+        }
+        return columns.build();
+    }

Review Comment:
   Done, moved the comment next to the `typeSupported(...)` guard it actually 
describes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to