hudi-agent commented on code in PR #18837:
URL: https://github.com/apache/hudi/pull/18837#discussion_r3466040693


##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java:
##########
@@ -430,15 +438,8 @@ private static TupleDomain<HiveColumnHandle> 
getCombinedPredicate(HudiSplit hudi
     private static List<HiveColumnHandle> getHiveColumns(List<ColumnHandle> 
columns,

Review Comment:
   🤖 nit: `getHiveColumns` no longer references `isBaseFileOnly` after this 
simplification — could you drop the now-unused parameter (and its argument at 
the call site) so the signature doesn't imply behavior that isn't there?
   
   <sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java:
##########
@@ -397,4 +420,104 @@ public static Schema 
getLatestTableSchema(HoodieTableMetaClient metaClient, Stri
             throw new TrinoException(HUDI_FILESYSTEM_ERROR, e);
         }
     }
+
+    /**
+     * Returns the column handles that must be present in the read schema for 
the file group reader to merge
+     * correctly: the ordering columns, plus the mandatory merge columns 
declared by a configured custom record
+     * merger (via {@link HoodieRecordMerger#getMandatoryFieldsForMerging}).
+     * <p>
+     * For COMMIT_TIME / EVENT_TIME tables this is exactly the ordering 
columns (so behavior is unchanged). For a
+     * CUSTOM merge mode with a registered merger, it additionally includes 
any data columns the merger reads at
+     * merge time (e.g. an arbitrary decision column) so that those columns 
are read from the base file even when
+     * the query does not project them -- without this the merger would see 
null for an un-projected column.
+     */
+    public static List<HiveColumnHandle> getMergeRequiredColumnHandles(
+            Table table,
+            TypeManager typeManager,
+            Lazy<HoodieTableMetaClient> lazyMetaClient,
+            List<String> recordMergerImpls,
+            HiveTimestampPrecision timestampPrecision)
+    {
+        HoodieTableMetaClient metaClient = lazyMetaClient.get();
+        HoodieTableConfig tableConfig = metaClient.getTableConfig();
+        RecordMergeMode recordMergeMode = tableConfig.getRecordMergeMode();
+
+        LinkedHashSet<String> requiredColumnNames = new LinkedHashSet<>();
+        if (recordMergeMode != null && recordMergeMode != 
RecordMergeMode.COMMIT_TIME_ORDERING) {
+            requiredColumnNames.addAll(tableConfig.getOrderingFields());
+        }
+
+        // For a CUSTOM merge mode, ask the configured merger which fields it 
needs at merge time and include them
+        // so they are read even when not projected. Only the merger's 
declared columns are added (not all columns),
+        // so non-custom tables and mergers that only use the key/ordering 
fields incur no extra reads.
+        if (recordMergeMode == RecordMergeMode.CUSTOM && recordMergerImpls != 
null && !recordMergerImpls.isEmpty()) {
+            Option<HoodieRecordMerger> merger = 
HoodieRecordUtils.createValidRecordMerger(
+                    EngineType.JAVA, String.join(",", recordMergerImpls), 
tableConfig.getRecordMergeStrategyId());
+            if (merger.isPresent()) {
+                TypedProperties props = new TypedProperties();
+                props.putAll(tableConfig.getProps());
+                props.setProperty(RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY, 
String.join(",", recordMergerImpls));
+                HoodieSchema tableSchema;
+                try {
+                    tableSchema = new 
TableSchemaResolver(metaClient).getTableSchema();
+                }
+                catch (Exception e) {
+                    throw new TrinoException(HUDI_SCHEMA_ERROR, "Failed to 
resolve table schema for merge column resolution", e);
+                }
+                String[] mandatoryFields = 
merger.get().getMandatoryFieldsForMerging(tableSchema, tableConfig, props);
+                if (mandatoryFields != null) {
+                    Collections.addAll(requiredColumnNames, mandatoryFields);
+                }
+            }
+        }
+
+        if (requiredColumnNames.isEmpty()) {
+            return Collections.emptyList();
+        }
+        return buildColumnHandles(table, typeManager, requiredColumnNames, 
timestampPrecision);
+    }
+
+    /**
+     * Builds {@link HiveColumnHandle}s, preserving physical (data-column) 
index, for the data columns whose names
+     * appear in {@code columnNames}. Names that are not data columns (e.g. 
Hudi meta fields) or whose types are not
+     * supported by the storage format are skipped.
+     */
+    private static List<HiveColumnHandle> buildColumnHandles(Table table, 
TypeManager typeManager, Set<String> columnNames, HiveTimestampPrecision 
timestampPrecision)
+    {
+        ImmutableList.Builder<HiveColumnHandle> columns = 
ImmutableList.builder();
+        int hiveColumnIndex = 0;
+        for (Column field : table.getDataColumns()) {
+            // ignore unsupported types rather than failing
+            if (columnNames.contains(field.getName())) {
+                HiveType hiveType = field.getType();
+                if (typeSupported(hiveType.getTypeInfo(), 
table.getStorage().getStorageFormat())) {
+                    columns.add(createBaseColumn(field.getName(), 
hiveColumnIndex, hiveType, getType(hiveType, typeManager, timestampPrecision), 
REGULAR, field.getComment()));
+                }
+            }
+            hiveColumnIndex++;
+        }
+        return columns.build();
+    }

Review Comment:
   🤖 nit: the `// ignore unsupported types rather than failing` comment sits 
above the `columnNames.contains(...)` check but actually explains the inner 
`typeSupported(...)` guard — could you move it next to that line so it reads 
correctly?
   
   <sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/testing/IncrementalCustomMergerHudiTablesInitializer.java:
##########
@@ -0,0 +1,483 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi.testing;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.metastore.Column;
+import io.trino.metastore.HiveMetastore;
+import io.trino.metastore.HiveMetastoreFactory;
+import io.trino.metastore.HiveType;
+import io.trino.metastore.PrincipalPrivileges;
+import io.trino.metastore.StorageFormat;
+import io.trino.metastore.Table;
+import io.trino.plugin.hudi.HudiConnector;
+import io.trino.spi.security.ConnectorIdentity;
+import io.trino.testing.QueryRunner;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.bootstrap.index.NoOpBootstrapIndex;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.io.MoreFiles.deleteRecursively;
+import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
+import static io.trino.hive.formats.HiveClassNames.HUDI_PARQUET_INPUT_FORMAT;
+import static 
io.trino.hive.formats.HiveClassNames.HUDI_PARQUET_REALTIME_INPUT_FORMAT;
+import static 
io.trino.hive.formats.HiveClassNames.MAPRED_PARQUET_OUTPUT_FORMAT_CLASS;
+import static io.trino.hive.formats.HiveClassNames.PARQUET_HIVE_SERDE_CLASS;
+import static io.trino.metastore.HiveType.HIVE_BOOLEAN;
+import static io.trino.metastore.HiveType.HIVE_DOUBLE;
+import static io.trino.metastore.HiveType.HIVE_INT;
+import static io.trino.metastore.HiveType.HIVE_LONG;
+import static io.trino.metastore.HiveType.HIVE_STRING;
+import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE;
+import static java.lang.String.format;
+import static java.nio.file.Files.createTempDirectory;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Creates a non-partitioned Merge-On-Read table configured with the {@link 
MaxRankRecordMerger} custom merger
+ * ({@link RecordMergeMode#CUSTOM}) and drives a controlled sequence of 
commits so an end-to-end test can read the
+ * table through Trino after every commit and validate the merged result.
+ * <p>
+ * The schema has 30 data columns (record key, the {@code merge_rank} decision 
column, an ordering field, and 27
+ * additional columns spanning string/long/int/double/boolean types). {@link 
#NUM_RECORDS} record keys are
+ * bulk-inserted in the first commit; each subsequent commit upserts roughly 
two thirds of the keys with freshly
+ * derived values. Inline compaction is disabled so every delta commit 
survives as a log file and must be merged at
+ * read time.
+ * <p>
+ * Every record column is a pure deterministic function of {@code 
(recordIndex, commitNumber)} (see
+ * {@link #valueFor}). Because the merge keeps the record with the maximum 
{@code merge_rank} (ties to the newer
+ * commit), the winning commit for each key is known in closed form, so the 
full expected row for the real-time
+ * table is reproduced exactly by {@link #expectedRows()} without reading 
anything back.
+ * <p>
+ * Two metastore tables are registered: a read-optimized table (base files 
only) and a real-time table (suffix
+ * {@code _rt}) that merges base + log files through the file group reader. 
The Hudi write client writes to a local
+ * staging directory; after each commit the staging directory is mirrored into 
the Trino filesystem location the
+ * connector reads from (see {@link #syncToTrino()}).
+ */
+public class IncrementalCustomMergerHudiTablesInitializer
+        implements HudiTablesInitializer
+{
+    public static final String TABLE_NAME = "custom_merger_e2e";
+    public static final String RT_TABLE_NAME = TABLE_NAME + "_rt";
+
+    public static final int TOTAL_COMMITS = 20;
+    public static final int NUM_RECORDS = 10_000;
+
+    private static final String RECORD_KEY_FIELD = "key";
+    private static final String RANK_FIELD = MaxRankRecordMerger.RANK_COLUMN;
+    private static final String ORDERING_FIELD = "ts";
+    private static final String PARTITION_PATH = "";
+
+    private enum Kind
+    {
+        STRING, LONG, INT, DOUBLE, BOOLEAN
+    }
+
+    private record ColumnSpec(String name, Kind kind) {}
+
+    /** The 30 data columns, in the order they appear in the schema and in 
query projections. */
+    private static final List<ColumnSpec> COLUMN_SPECS = buildColumnSpecs();
+
+    private static final List<Column> DATA_COLUMNS = buildDataColumns();
+    private static final Schema AVRO_SCHEMA = buildAvroSchema();
+
+    // Mutable state driven across commits.
+    private TrinoFileSystem fileSystem;
+    private Location tableLocation;
+    private java.nio.file.Path stagingDir;
+    private Path stagingTablePath;
+    private HoodieJavaWriteClient<HoodieAvroPayload> writeClient;
+
+    /** Winning commit per record index under the max-rank merge policy (-1 = 
not yet inserted). */
+    private final int[] winningCommit = new int[NUM_RECORDS];
+    /** Most recent commit that touched each record index (used to measure 
divergence from newest-wins). */
+    private final int[] latestCommit = new int[NUM_RECORDS];
+    private int currentCommit;
+
+    @Override
+    public void initializeTables(QueryRunner queryRunner, Location 
externalLocation, String schemaName)
+            throws Exception
+    {
+        fileSystem = ((HudiConnector) 
queryRunner.getCoordinator().getConnector("hudi")).getInjector()
+                .getInstance(TrinoFileSystemFactory.class)
+                .create(ConnectorIdentity.ofUser("test"));
+        HiveMetastore metastore = ((HudiConnector) 
queryRunner.getCoordinator().getConnector("hudi")).getInjector()
+                .getInstance(HiveMetastoreFactory.class)
+                .createMetastore(Optional.empty());
+
+        tableLocation = externalLocation.appendPath(TABLE_NAME);
+        stagingDir = createTempDirectory("custom-merger-e2e");
+        stagingTablePath = new Path(stagingDir.resolve(TABLE_NAME).toUri());
+
+        java.util.Arrays.fill(winningCommit, -1);
+        java.util.Arrays.fill(latestCommit, -1);
+
+        initTable();
+        writeClient = createWriteClient();
+
+        // First commit: bulk insert all keys (produces the base parquet files 
the read-optimized table reads).
+        currentCommit = 1;
+        String firstCommit = writeClient.startCommit();
+        List<WriteStatus> statuses = 
writeClient.bulkInsert(buildRecords(currentCommit), firstCommit);
+        writeClient.commit(firstCommit, statuses);
+        recordCommit(currentCommit);
+        syncToTrino();
+
+        metastore.createTable(createTableDefinition(schemaName, TABLE_NAME, 
tableLocation, false), PrincipalPrivileges.NO_PRIVILEGES);
+        metastore.createTable(createTableDefinition(schemaName, RT_TABLE_NAME, 
tableLocation, true), PrincipalPrivileges.NO_PRIVILEGES);
+    }
+
+    /**
+     * Writes the next delta commit (upsert of ~2/3 of the keys) and mirrors 
it into the Trino filesystem so the
+     * connector observes the new commit on the next query. Must be called 
after {@link #initializeTables}.
+     */
+    public void writeAndSyncNextCommit()
+    {
+        currentCommit++;
+        String commitTime = writeClient.startCommit();
+        List<WriteStatus> statuses = 
writeClient.upsert(buildRecords(currentCommit), commitTime);
+        writeClient.commit(commitTime, statuses);
+        recordCommit(currentCommit);
+        syncToTrino();
+    }
+
+    /** Ordered data column names (record key first), matching {@link 
#expectedRows()} value positions. */
+    public List<String> dataColumnNames()
+    {
+        return 
COLUMN_SPECS.stream().map(ColumnSpec::name).collect(toImmutableList());
+    }
+
+    /** Expected real-time (merged) rows after the commits written so far: key 
-> values in column order. */
+    public Map<String, Object[]> expectedRows()
+    {
+        Map<String, Object[]> rows = new LinkedHashMap<>();
+        for (int ki = 0; ki < NUM_RECORDS; ki++) {
+            if (winningCommit[ki] < 0) {
+                continue;
+            }
+            Object[] row = rowFor(ki, winningCommit[ki]);
+            rows.put((String) row[0], row);
+        }
+        return rows;
+    }
+
+    /** Expected read-optimized (base-file-only) rows: always the first-commit 
values for every key. */
+    public Map<String, Object[]> baseRows()
+    {
+        Map<String, Object[]> rows = new LinkedHashMap<>();
+        for (int ki = 0; ki < NUM_RECORDS; ki++) {
+            Object[] row = rowFor(ki, 1);
+            rows.put((String) row[0], row);
+        }
+        return rows;
+    }
+
+    /** Number of keys whose merged winner is not their most recently 
committed version (custom != newest-wins). */
+    public int divergentKeyCount()
+    {
+        int count = 0;
+        for (int ki = 0; ki < NUM_RECORDS; ki++) {
+            if (winningCommit[ki] >= 0 && winningCommit[ki] != 
latestCommit[ki]) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    public void close()
+            throws IOException
+    {
+        if (writeClient != null) {
+            writeClient.close();
+            writeClient = null;
+        }
+        if (stagingDir != null) {
+            deleteRecursively(stagingDir, ALLOW_INSECURE);
+            stagingDir = null;
+        }
+    }
+
+    private void recordCommit(int commit)
+    {
+        for (int ki = 0; ki < NUM_RECORDS; ki++) {
+            if (!isUpdated(ki, commit)) {
+                continue;
+            }
+            latestCommit[ki] = commit;
+            int prev = winningCommit[ki];
+            // Mirror MaxRankRecordMerger: the incoming (newer) record wins on 
a tie or a strictly larger rank.
+            if (prev < 0 || mergeRank(ki, commit) >= mergeRank(ki, prev)) {
+                winningCommit[ki] = commit;
+            }
+        }
+    }
+
+    private List<HoodieRecord<HoodieAvroPayload>> buildRecords(int commit)
+    {
+        List<HoodieRecord<HoodieAvroPayload>> records = new ArrayList<>();
+        for (int ki = 0; ki < NUM_RECORDS; ki++) {
+            if (isUpdated(ki, commit)) {
+                records.add(record(ki, commit));
+            }
+        }
+        return records;
+    }
+
+    private static boolean isUpdated(int recordIndex, int commit)
+    {
+        // The first commit inserts every key; later commits upsert ~2/3 of 
the keys, rotating which third is skipped.
+        return commit == 1 || Math.floorMod(recordIndex + commit, 3) != 0;
+    }
+
+    private static HoodieRecord<HoodieAvroPayload> record(int recordIndex, int 
commit)
+    {
+        GenericRecord record = new GenericData.Record(AVRO_SCHEMA);
+        for (int i = 0; i < COLUMN_SPECS.size(); i++) {
+            record.put(COLUMN_SPECS.get(i).name(), valueFor(i, recordIndex, 
commit));
+        }
+        HoodieKey hoodieKey = new HoodieKey(key(recordIndex), PARTITION_PATH);
+        return new HoodieAvroRecord<>(hoodieKey, new 
HoodieAvroPayload(Option.of(record)), null);
+    }
+
+    private static Object[] rowFor(int recordIndex, int commit)
+    {
+        Object[] row = new Object[COLUMN_SPECS.size()];
+        for (int i = 0; i < row.length; i++) {
+            row[i] = valueFor(i, recordIndex, commit);
+        }
+        return row;
+    }
+
+    /**
+     * The single source of truth for every column value, used both when 
writing records and when reproducing the
+     * expected rows. Returns boxed types matching how Trino surfaces the 
column (String/Long/Integer/Double/Boolean).
+     */
+    private static Object valueFor(int columnIndex, int recordIndex, int 
commit)
+    {
+        ColumnSpec spec = COLUMN_SPECS.get(columnIndex);
+        if (spec.name().equals(RECORD_KEY_FIELD)) {
+            return key(recordIndex);
+        }
+        if (spec.name().equals(RANK_FIELD)) {
+            return mergeRank(recordIndex, commit);
+        }
+        if (spec.name().equals(ORDERING_FIELD)) {
+            return (long) commit;
+        }
+        return switch (spec.kind()) {
+            case STRING -> "v_" + columnIndex + "_" + recordIndex + "_" + 
commit;
+            case LONG -> recordIndex * 1_000_003L + (long) commit * 
columnIndex;
+            case INT -> (int) Math.floorMod(recordIndex * 31L + (long) commit 
* columnIndex, 1_000_000L);
+            case DOUBLE -> recordIndex + commit * 0.5 + columnIndex * 0.125;
+            case BOOLEAN -> (recordIndex + commit + columnIndex) % 2 == 0;
+        };
+    }
+
+    private static String key(int recordIndex)
+    {
+        return format("key%05d", recordIndex);
+    }
+
+    /** Deterministic, non-monotonic rank in [0, 100000) so the winning commit 
is rarely the latest one. */
+    private static long mergeRank(int recordIndex, int commit)
+    {
+        return Math.floorMod(recordIndex * 2_654_435_761L + commit * 40_503L, 
100_000L);
+    }
+
+    private void syncToTrino()
+    {
+        try {
+            if (fileSystem.directoryExists(tableLocation).orElse(false)) {
+                fileSystem.deleteDirectory(tableLocation);
+            }
+            
ResourceHudiTablesInitializer.copyDir(stagingDir.resolve(TABLE_NAME), 
fileSystem, tableLocation);
+        }
+        catch (IOException e) {
+            throw new RuntimeException("Failed to sync staged Hudi table to 
Trino filesystem", e);
+        }
+    }
+
+    private void initTable()
+    {
+        Configuration conf = new Configuration();
+        try {
+            HoodieTableMetaClient.newTableBuilder()
+                    .setTableType(HoodieTableType.MERGE_ON_READ)
+                    .setTableName(TABLE_NAME)
+                    .setTimelineLayoutVersion(1)
+                    .setBootstrapIndexClass(NoOpBootstrapIndex.class.getName())
+                    .setPayloadClassName(HoodieAvroPayload.class.getName())
+                    .setRecordKeyFields(RECORD_KEY_FIELD)
+                    .setOrderingFields(ORDERING_FIELD)
+                    .setRecordMergeMode(RecordMergeMode.CUSTOM)
+                    
.setRecordMergeStrategyId(MaxRankRecordMerger.MERGE_STRATEGY_ID)
+                    .initTable(new HadoopStorageConfiguration(conf), 
stagingTablePath.toString());
+        }
+        catch (IOException e) {
+            throw new RuntimeException("Could not init table " + TABLE_NAME, 
e);
+        }
+    }
+
+    private HoodieJavaWriteClient<HoodieAvroPayload> createWriteClient()
+    {
+        Configuration conf = new Configuration();
+        HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder()
+                .withPath(stagingTablePath.toString())
+                .withSchema(AVRO_SCHEMA.toString())
+                .withParallelism(2, 2)
+                .withDeleteParallelism(2)
+                .forTable(TABLE_NAME)
+                
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+                .withRecordMergeMode(RecordMergeMode.CUSTOM)
+                
.withRecordMergeStrategyId(MaxRankRecordMerger.MERGE_STRATEGY_ID)
+                
.withRecordMergeImplClasses(MaxRankRecordMerger.class.getName())
+                // Keep log files around so the custom merger runs at read 
time for every delta commit.
+                .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                        .withInlineCompaction(false)
+                        .withMaxNumDeltaCommitsBeforeCompaction(TOTAL_COMMITS 
+ 100)
+                        .build())
+                .withEmbeddedTimelineServerEnabled(false)
+                .withMarkersType(MarkerType.DIRECT.name())
+                // MDT writes require hbase deps not present in the Trino 
runtime; disable as other initializers do.
+                
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+                .build();
+        return new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(new 
HadoopStorageConfiguration(conf)), cfg);
+    }
+
+    private static Table createTableDefinition(String schemaName, String 
tableName, Location location, boolean isRtTable)
+    {
+        StorageFormat storageFormat = StorageFormat.create(
+                PARQUET_HIVE_SERDE_CLASS,
+                isRtTable ? HUDI_PARQUET_REALTIME_INPUT_FORMAT : 
HUDI_PARQUET_INPUT_FORMAT,
+                MAPRED_PARQUET_OUTPUT_FORMAT_CLASS);
+
+        return Table.builder()
+                .setDatabaseName(schemaName)
+                .setTableName(tableName)
+                .setTableType(EXTERNAL_TABLE.name())
+                .setOwner(Optional.of("public"))
+                .setDataColumns(DATA_COLUMNS)
+                .setParameters(ImmutableMap.of("serialization.format", "1", 
"EXTERNAL", "TRUE"))
+                .withStorage(storageBuilder -> storageBuilder
+                        .setStorageFormat(storageFormat)
+                        .setLocation(location.toString()))
+                .build();
+    }
+
+    private static List<ColumnSpec> buildColumnSpecs()
+    {
+        ImmutableList.Builder<ColumnSpec> specs = ImmutableList.builder();
+        specs.add(new ColumnSpec(RECORD_KEY_FIELD, Kind.STRING));
+        specs.add(new ColumnSpec(RANK_FIELD, Kind.LONG));
+        specs.add(new ColumnSpec(ORDERING_FIELD, Kind.LONG));
+        for (int i = 0; i < 7; i++) {
+            specs.add(new ColumnSpec("s" + i, Kind.STRING));
+        }
+        for (int i = 0; i < 7; i++) {
+            specs.add(new ColumnSpec("l" + i, Kind.LONG));
+        }
+        for (int i = 0; i < 6; i++) {
+            specs.add(new ColumnSpec("i" + i, Kind.INT));
+        }
+        for (int i = 0; i < 5; i++) {
+            specs.add(new ColumnSpec("d" + i, Kind.DOUBLE));
+        }
+        for (int i = 0; i < 2; i++) {
+            specs.add(new ColumnSpec("b" + i, Kind.BOOLEAN));
+        }
+        List<ColumnSpec> built = specs.build();
+        requireNonNull(built);

Review Comment:
   🤖 nit: `requireNonNull(built)` is a no-op here since 
`ImmutableList.Builder.build()` never returns null — could you drop it and keep 
just the size check?
   
   <sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to