hudi-agent commented on code in PR #18894:
URL: https://github.com/apache/hudi/pull/18894#discussion_r3343938821
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java:
##########
@@ -162,21 +174,27 @@ public void onCheckpointComplete(Correspondent
correspondent, long completedChec
}
private void reloadMetadataTable() {
- this.metadataTable =
metaClient.getTableFormat().getMetadataFactory().create(
- HoodieFlinkEngineContext.DEFAULT,
- metaClient.getStorage(),
- StreamerUtil.metadataConfig(conf),
- conf.get(FlinkOptions.PATH));
+ this.tableMetadata =
+ new HoodieBackedTableMetadata(
+ HoodieFlinkEngineContext.DEFAULT,
+ metaClient.getStorage(),
+ StreamerUtil.metadataConfig(conf),
+ conf.get(FlinkOptions.PATH));
+ if (!tableMetadata.enabled()) {
+ if (metaClient.getTableConfig().isMetadataTableAvailable()) {
Review Comment:
🤖 Throwing `RuntimeException` here loses information — could you use
`HoodieException` (or `HoodieMetadataException`) to be consistent with the rest
of the file (e.g. the `close()` method below)? Also, since
`reloadMetadataTable()` overwrites `this.tableMetadata` without closing the
previous instance, if this method is called more than once (e.g. from
`onCheckpointComplete`), the prior MDT reader will be leaked. Worth either
closing the old one before reassigning, or documenting that this is only called
once.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java:
##########
@@ -162,21 +174,27 @@ public void onCheckpointComplete(Correspondent
correspondent, long completedChec
}
private void reloadMetadataTable() {
- this.metadataTable =
metaClient.getTableFormat().getMetadataFactory().create(
- HoodieFlinkEngineContext.DEFAULT,
- metaClient.getStorage(),
- StreamerUtil.metadataConfig(conf),
- conf.get(FlinkOptions.PATH));
+ this.tableMetadata =
Review Comment:
🤖 This change drops the use of
`metaClient.getTableFormat().getMetadataFactory().create(...)` and instantiates
`HoodieBackedTableMetadata` directly. Was that intentional? It bypasses the
table-format abstraction, so any table format that swaps in a different
metadata implementation would be broken here. If you only need access to
`enabled()`, would it be cleaner to keep using the factory and cast/check at
the interface level, or expose `enabled()` on `HoodieTableMetadata`?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java:
##########
@@ -160,11 +158,11 @@ private Processor initRecordProcessor() {
public void process(List<HoodieFlinkInternalRow> records,
Collector<HoodieFlinkInternalRow> out) throws Exception {
List<String> recordKeys =
records.stream().map(HoodieFlinkInternalRow::getRecordKey).collect(Collectors.toList());
MinibatchIndexBackend minibatchIndexBackend =
(MinibatchIndexBackend) delegateFunction.getIndexBackend();
- // get record locations by minibatch
- Map<String, HoodieRecordGlobalLocation> recordLocations =
minibatchIndexBackend.get(recordKeys);
+ // warm up the in-memory cache for record level index
+ minibatchIndexBackend.get(recordKeys);
Review Comment:
🤖 nit: `get(recordKeys)` is called here purely for its cache-warming side
effect, but the name `get` signals a pure read — a future reader will wonder
whether the return value was accidentally dropped. It might be worth adding a
dedicated `warmUp` or `prefetch` method on `MinibatchIndexBackend` to make the
intent explicit.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java:
##########
@@ -291,8 +291,8 @@ private static DataStream<HoodieFlinkInternalRow>
streamBootstrap(
boolean bounded) {
DataStream<HoodieFlinkInternalRow> dataStream1 =
rowDataToHoodieRecord(conf, rowType, dataStream);
- if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
- boolean isRliBootstrap = OptionsResolver.isGlobalRecordLevelIndex(conf);
+ boolean isRliBootstrap = OptionsResolver.isGlobalRecordLevelIndex(conf);
Review Comment:
🤖 nit: `isRliBootstrap` reads as "is bootstrap enabled for RLI", but the
value is just a check on the index *type* — could you rename it to `isRliIndex`
(or `isGlobalRecordLevelIndex`) to avoid confusing it with the
`INDEX_BOOTSTRAP_ENABLED` flag on the line directly below?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java:
##########
@@ -93,6 +95,14 @@ public static int computeNumShardsAssigned(int taskIndex,
int numPartitions, int
static int getNumFileGroupsForRecordIndexPartition(Configuration conf) {
String tablePath = conf.get(FlinkOptions.PATH);
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ // For flink adaptive batch execution, writer coordinator is not started
yet, so metadata table
+ // is not initialized for a new table.
+ if
(!metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX))
{
+ // Get the minimum file group count used to initialize global record
level index from
+ // the write config, since the config value is customized for the flink
writer.
+ HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(conf);
+ return writeConfig.getGlobalRecordLevelIndexMinFileGroupCount();
Review Comment:
🤖 @danny0405 Could you confirm that the value returned from
`writeConfig.getGlobalRecordLevelIndexMinFileGroupCount()` here will exactly
match the file group count that the MDT writer will use to initialize the RLI
partition? If the writer ends up initializing the RLI partition with a
different number of file groups (e.g. due to auto-tuning or a different
max-count config taking effect), the partitioner's sharding decisions made
before MDT initialization could disagree with the post-initialization layout
and route the same key to the wrong shard.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java:
##########
@@ -81,7 +83,9 @@ public void registerMetrics(MetricGroup metricGroup) {
@Override
public HoodieRecordGlobalLocation get(String recordKey) throws IOException {
- throw new UnsupportedOperationException(this.getClass().getSimpleName() +
" doesn't support lookup with a single key.");
+ // note: always fetch record location from the cache, since this backend
is only used for minibatch mode,
+ // and the cache has been warmed up by calling `get(List<String>
recordKeys)` previously.
+ return recordIndexCache.get(recordKey);
Review Comment:
🤖 The contract for single-key `get()` changed from "throw
UnsupportedOperationException" to "return whatever is in the cache". That works
for the minibatch flow where `get(List)` warms up the cache first, but it now
silently returns `null` (treated as INSERT in `processChangingRecord`) if
anyone calls this without warmup. Is there any non-minibatch path that could
hit this and incorrectly classify an UPDATE as an INSERT? If not, would a
defensive assertion-style check (or keeping it as a single-key delegating call
into `get(List.of(key))`) make the contract harder to misuse?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -3738,6 +3740,18 @@ private static Stream<Arguments>
indexAndPartitioningParams() {
return Stream.of(data).map(Arguments::of);
}
+ /**
+ * Return test params => (index type, boolean).
+ */
Review Comment:
🤖 nit: `indexAndBooleanParams` doesn't convey what the boolean represents —
`indexAndBootstrapParams` would tie it back to `bootstrapEnabled` without
needing to open the method body.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]