hudi-agent commented on code in PR #19006:
URL: https://github.com/apache/hudi/pull/19006#discussion_r3428521763


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/BaseRecordLevelIndex.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.HoodieDataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.record.HoodieRecordIndex;
+import org.apache.hudi.keygen.KeyGenUtils;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.util.StreamerUtil;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Base index support that leverages Record Level Index to prune file slices.
+ */
+@Slf4j
+public abstract class BaseRecordLevelIndex implements FlinkMetadataIndex {
+  private static final long serialVersionUID = 1L;
+
+  private final String basePath;
+  protected final Configuration conf;
+  protected final List<String> hoodieKeysFromFilter;
+  private final HoodieTableMetaClient metaClient;
+  private HoodieTableMetadata metadataTable;
+
+  @VisibleForTesting
+  BaseRecordLevelIndex(
+      String basePath,
+      Configuration conf,
+      HoodieTableMetaClient metaClient,
+      List<String> hoodieKeysFromFilter) {
+    this.basePath = basePath;
+    this.conf = conf;
+    this.metaClient = metaClient;
+    this.hoodieKeysFromFilter = hoodieKeysFromFilter;
+  }
+
+  @Override
+  public String getIndexPartitionName() {
+    return HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX;
+  }
+
+  @Override
+  public boolean isIndexAvailable() {
+    return metaClient.getTableConfig().isMetadataTableAvailable()
+        && 
metaClient.getTableConfig().getMetadataPartitions().contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX);
+  }
+
+  public HoodieTableMetadata getMetadataTable() {
+    // initialize the metadata table lazily
+    if (this.metadataTable == null) {
+      this.metadataTable = 
metaClient.getTableFormat().getMetadataFactory().create(
+          HoodieFlinkEngineContext.DEFAULT,
+          metaClient.getStorage(),
+          StreamerUtil.metadataConfig(conf),
+          basePath);
+    }
+    return this.metadataTable;
+  }
+
+  public List<FileSlice> computeCandidateFileSlices(List<FileSlice> 
fileSlices) {
+    if (!isIndexAvailable()) {
+      return fileSlices;
+    }
+
+    try {
+      Option<Set<HoodieFileGroupId>> candidateFileGroupIds =
+          lookupCandidateFileGroupIds(fileSlices);
+      return candidateFileGroupIds.map(candidates -> fileSlices.stream()
+          .filter(fileSlice -> candidates.contains(fileSlice.getFileGroupId()))
+          .collect(Collectors.toList()))
+          .orElse(fileSlices);
+    } catch (Throwable e) {
+      log.error("Failed to read metadata index: {} for data skipping", 
getIndexPartitionName(), e);
+      return fileSlices;
+    }
+  }
+
+  protected abstract Option<Set<HoodieFileGroupId>> 
lookupCandidateFileGroupIds(List<FileSlice> fileSlices);
+
+  protected static Set<HoodieFileGroupId> getFileGroupIds(
+      HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData) {
+    List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations =
+        HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
+    return recordIndexLocations.stream()
+        .map(pair -> new HoodieFileGroupId(pair.getValue().getPartitionPath(), 
pair.getValue().getFileId()))
+        .collect(Collectors.toSet());
+  }
+
+  public static Option<BaseRecordLevelIndex> create(
+      String basePath,
+      Configuration conf,
+      HoodieTableMetaClient metaClient,
+      List<ExpressionEvaluators.Evaluator> evaluators,
+      RowType rowType) {
+    if (evaluators.isEmpty() || 
!FlinkOptions.QUERY_TYPE_SNAPSHOT.equalsIgnoreCase(conf.get(FlinkOptions.QUERY_TYPE)))
 {
+      return Option.empty();
+    }
+    if (metaClient == null) {
+      metaClient = StreamerUtil.createMetaClient(conf);

Review Comment:
   🤖 nit: could you avoid reassigning the `metaClient` parameter here? 
Reassigning method parameters tends to surprise readers and tools like SpotBugs 
flag it. Something like `HoodieTableMetaClient resolvedMetaClient = metaClient 
!= null ? metaClient : StreamerUtil.createMetaClient(conf);` and then using 
`resolvedMetaClient` for the remainder of the method would be cleaner.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java:
##########
@@ -674,6 +674,17 @@ private static String toStringSafely(Object obj) {
     return obj == null ? "null" : obj.toString();
   }
 
+  /**
+   * Assert that expected row strings and actual collection of rows are equal 
regardless of the order.
+   *
+   * @param rows     Actual result rows
+   * @param expected Expected row strings
+   */
+  public static void assertRowsEqualsUnordered(List<Row> rows, List<String> 
expected) {
+    List<String> actualRows = 
rows.stream().map(Row::toString).collect(Collectors.toList());
+    assertEquals(new HashSet<>(expected), new HashSet<>(actualRows));

Review Comment:
   🤖 nit: `HashSet` comparison silently drops duplicate rows — if actual is 
`[a, a]` and expected is `[a]` the assertion passes. The method name implies 
all rows must be present (just in any order), so it might be worth using a 
sorted-list comparison instead: sort both lists, then `assertEquals`.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to