vinothchandar commented on code in PR #13489:
URL: https://github.com/apache/hudi/pull/13489#discussion_r2195432392
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1258,11 +1260,20 @@ protected Pair<List<HoodieFileGroupId>,
HoodieData<HoodieRecord>> tagRecordsWith
});
}
+ // For each partition, determine the key format once and create the
appropriate mapping function
+ Map<String, SerializableBiFunction<String, Integer, Integer>>
partitionMappingFunctions = new HashMap<>();
Review Comment:
rename: indexToMappingFunctions
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java:
##########
@@ -72,6 +68,21 @@ public static void validateTableVersion(HoodieTableConfig
tableConfig, HoodieWri
}
}
+ public static boolean isValidTableVersionWriteVersionPair(HoodieTableVersion
tableVersion, HoodieTableVersion writeVersion) {
Review Comment:
rename: areTableVersionsCompatible.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -162,8 +177,11 @@ private void initIfNeeded() {
@Override
protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String
key, String partitionName) {
- Map<String, HoodieRecord<HoodieMetadataPayload>> recordsByKeys =
getRecordsByKeys(Collections.singletonList(key), partitionName);
- return Option.ofNullable(recordsByKeys.get(key));
+ List<HoodieRecord<HoodieMetadataPayload>> records = getRecordsByKeys(
+ HoodieListData.eager(Collections.singletonList(key)), partitionName,
Option.empty())
+ .values().collectAsList();
+ ValidationUtils.checkArgument(records.size() <= 1, "Found more than 1
records for record key " + key);
Review Comment:
typo: 1 record for record key.
##########
hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableSortingIterator.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Iterator that provides sorted elements from a source iterator.
+ * Elements are sorted using natural ordering if they implement Comparable.
+ * If sorting fails or elements are not Comparable, the original order is
preserved.
+ * This iterator is closable and will close the underlying iterator if it
implements AutoCloseable.
+ *
+ * @param <T> Type of elements
+ */
+public class ClosableSortingIterator<T> implements Iterator<T>, AutoCloseable {
+ private final Iterator<T> source;
+ private final boolean closable;
+ private Iterator<T> sortedIterator;
+ private boolean initialized = false;
+
+ public ClosableSortingIterator(Iterator<T> source) {
+ this.source = source;
+ this.closable = source instanceof AutoCloseable;
+ }
+
+ @Override
+ public boolean hasNext() {
+ initializeIfNeeded();
+ return sortedIterator.hasNext();
+ }
+
+ @Override
+ public T next() {
+ initializeIfNeeded();
+ return sortedIterator.next();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (closable) {
+ ((AutoCloseable) source).close();
+ }
+ }
+
+ private void initializeIfNeeded() {
+ if (!initialized) {
+ // Collect all elements from the source iterator
+ List<T> list = new ArrayList<>();
+ source.forEachRemaining(list::add);
+
+ // If the list is empty or has only one element, no sorting needed
+ if (list.size() <= 1) {
+ sortedIterator = list.iterator();
+ } else {
+ // Check if the first element is Comparable
+ T firstElement = list.get(0);
+ if (firstElement instanceof Comparable) {
+ try {
+ // Sort the list using natural ordering
+ Collections.sort((List) list);
+ } catch (Exception e) {
+ // If sorting fails (e.g., ClassCastException), keep original order
Review Comment:
so in this scenario this is not a "sorting iterator" per se? If we name it
"SortingIterator" - either we have it deterministically sort and change the
name?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java:
##########
@@ -269,4 +271,58 @@ public Configuration hadoopConfiguration() {
public <T> JavaRDD<T> emptyRDD() {
return javaSparkContext.emptyRDD();
}
+
+ @Override
+ public <S extends Comparable<S>, V extends Comparable<V>, R> HoodieData<R>
processValuesOfTheSameShards(
+ HoodiePairData<S, V> data, SerializableFunction<Iterator<V>,
Iterator<R>> func, List<S> shardIndices, boolean preservesPartitioning) {
+ HoodiePairData<S, V> repartitionedData = rangeBasedRepartitionForEachKey(
+ data, shardIndices, 0.1, 100000, System.nanoTime());
+ return repartitionedData.values().mapPartitions(func,
preservesPartitioning);
+ }
+
+ /**
+ * Performs range-based repartitioning of data based on key distribution to
optimize partition sizes.
+ *
+ * <p>This method achieves efficient data distribution by:</p>
+ * <ol>
+ * <li><strong>Sampling:</strong> Samples a fraction of data for each key
to understand the distribution
+ * without processing the entire dataset</li>
+ * <li><strong>Range Analysis:</strong> Analyzes the sampled data to
determine optimal partition boundaries
+ * that ensure each partition contains a balanced number of keys</li>
+ * <li><strong>Repartitioning:</strong> Redistributes the original data
across partitions based on the
+ * computed range boundaries, ensuring keys within the same range are
co-located</li>
+ * <li><strong>Sorting:</strong> Sorts data within each partition for
efficient processing</li>
+ * </ol>
+ *
+ * <p>The method is particularly useful for: Balancing workload across
partitions for better parallel processing</p>
+ *
+ * @param data The input data as key-value pairs where keys are integers and
values are of type V
+ * @param shardIndices The set must cover all possible keys of the given data
+ * @param sampleFraction The fraction of data to sample for each key
(between 0 and 1).
+ * A higher fraction provides better distribution
analysis but increases sampling overhead.
+ * It typically should be smaller than 0.05 for large
datasets.
+ * @param maxKeyPerBucket The maximum number of keys allowed per partition
to prevent partition skew
+ * @param seed The random seed for reproducible sampling results
+ * @param <V> Type of the value in the input data (must be Comparable)
+ * @return A repartitioned and sorted HoodiePairData with optimized key
distribution across partitions
+ * @throws IllegalArgumentException if sampleFraction is not between 0 and 1
+ */
+ public <S extends Comparable<S>, V extends Comparable<V>> HoodiePairData<S,
V> rangeBasedRepartitionForEachKey(
Review Comment:
review this fully
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1258,11 +1260,20 @@ protected Pair<List<HoodieFileGroupId>,
HoodieData<HoodieRecord>> tagRecordsWith
});
}
+ // For each partition, determine the key format once and create the
appropriate mapping function
+ Map<String, SerializableBiFunction<String, Integer, Integer>>
partitionMappingFunctions = new HashMap<>();
Review Comment:
done
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaPairRDDDynamicRepartition.java:
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.data;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTestDelayedTableMetadata;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import scala.Tuple2;
+import scala.Tuple3;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieJavaPairRDDDynamicRepartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieBackedTestDelayedTableMetadata.class);
+
+ private JavaSparkContext jsc;
+
+ /**
+ * Generates a random RDD with unbalanced data distribution across
partitions.
+ *
+ * @param sc Spark context
+ * @param maxValueByKey Map of key to maximum number of values
+ * @param partitionWeights List of weights for each partition
+ * @param seed seed used for randomization
+ * @return RDD with weighted partition distribution
+ */
+ public static JavaPairRDD<Integer, String>
generateRandomRDDWithWeightedPartitions(
+ JavaSparkContext sc,
+ Map<Integer, Long> maxValueByKey,
+ List<Double> partitionWeights,
+ long seed) {
+
+ // Generate all possible pairs of key and value in a single list.
+ List<Tuple2<Integer, String>> allPairs = new ArrayList<>();
+ for (Map.Entry<Integer, Long> e : maxValueByKey.entrySet()) {
+ for (long v = 1; v <= e.getValue(); v++) {
+ allPairs.add(new Tuple2<>(e.getKey(), Long.toString(v)));
+ }
+ }
+
+ Collections.shuffle(allPairs, new Random(seed));
+
+ int total = allPairs.size();
+ List<JavaPairRDD<Integer, String>> rdds = new ArrayList<>();
+ int start = 0;
+
+ // Split the list into partitions based on the weights.
+ for (int i = 0; i < partitionWeights.size(); i++) {
+ int end = (i == partitionWeights.size() - 1)
+ ? total
+ : Math.min(total, start + (int) Math.round(partitionWeights.get(i) *
total));
+
+ List<Tuple2<Integer, String>> slice = allPairs.subList(start, end);
+ JavaPairRDD<Integer, String> sliceRdd = sc.parallelize(slice,
1).mapToPair(t -> t);
+ rdds.add(sliceRdd);
+ start = end;
+ if (start >= total) {
+ break;
+ }
+ }
+
+ // Combine all the partitions into a single RDD.
+ JavaPairRDD<Integer, String> combined = rdds.get(0);
+ for (int i = 1; i < rdds.size(); i++) {
+ combined = combined.union(rdds.get(i));
+ }
+
+ return combined;
+ }
+
+ /**
+ * Validates various properties of a repartitioned RDD, including:
+ * 1. Each key is in exactly one partition.
+ * 2. The keys are sorted within each partition.
+ * 3. For partitions containing entries of the same key, the value ranges
are not overlapping.
+ * 4. Number of keys per partition is probably at most maxKeyPerBucket.
+ *
+ * @param originalRdd Original RDD
+ * @param repartitionedRdd Repartitioned RDD
+ * @param maxPartitionCountByKey Map of key to maximum number of partitions
+ * @throws AssertionError if any check fails
+ */
+ private static Map<Integer, Map<Integer, List<String>>>
validateRepartitionedRDDProperties(
+ HoodiePairData<Integer, String> originalRdd,
+ HoodiePairData<Integer, String> repartitionedRdd,
+ Option<Map<Integer, Integer>> maxPartitionCountByKey) {
+ JavaPairRDD<Integer, String> javaPairRDD =
HoodieJavaPairRDD.getJavaPairRDD(repartitionedRdd);
+
+ Map<Integer, Map<Integer, List<String>>> actualPartitionContents =
dumpRDDContent(javaPairRDD);
+
+ try {
+ // Values in each partition are sorted.
+ for (Map.Entry<Integer, Map<Integer, List<String>>> p :
actualPartitionContents.entrySet()) {
+ int partitionId = p.getKey();
+ Map<Integer, List<String>> keyToValues = p.getValue();
+
+ if (keyToValues.size() != 1) {
+ assertEquals(1, keyToValues.size(),
+ "Each partition should contain exactly one key, but found keys "
+ keyToValues.keySet()
+ + " in partition " + partitionId);
+ logRDDContent("validation failure, original rdd ", originalRdd);
+ logRDDContent("validation failure, repartitioned rdd ",
repartitionedRdd);
+ }
+
+ for (Map.Entry<Integer, List<String>> kv : keyToValues.entrySet()) {
+ List<String> values = kv.getValue();
+ List<String> sorted = new ArrayList<>(values);
+ Collections.sort(sorted);
+ if (!values.equals(sorted)) {
+ throw new AssertionError(
+ "Partition " + partitionId + ", key " + kv.getKey()
+ + " has unsorted values: " + values);
+ }
+ }
+ }
+
+ // Build key → list<(partitionId, min, max)>
+ Map<Integer, List<Tuple3<Integer, String, String>>> keyToPartitionRanges
= new HashMap<>();
+
+ for (Map.Entry<Integer, Map<Integer, List<String>>> p :
actualPartitionContents.entrySet()) {
+ int partitionId = p.getKey();
+ for (Map.Entry<Integer, List<String>> kv : p.getValue().entrySet()) {
+ List<String> sorted = new ArrayList<>(kv.getValue());
+ Collections.sort(sorted);
+ keyToPartitionRanges
+ .computeIfAbsent(kv.getKey(), k -> new ArrayList<>())
+ .add(new Tuple3<>(partitionId, sorted.get(0),
sorted.get(sorted.size() - 1)));
+ }
+ }
+
+ // Range-overlap check and expected-partition-count check
+ for (Map.Entry<Integer, List<Tuple3<Integer, String, String>>> e :
keyToPartitionRanges.entrySet()) {
+ int key = e.getKey();
+ List<Tuple3<Integer, String, String>> ranges = e.getValue();
+
+ // Confirm expected #partitions
+ if (maxPartitionCountByKey.isPresent()) {
+ Integer maxPartitionCnt = maxPartitionCountByKey.get().get(key);
+ if (maxPartitionCnt == null) {
+ throw new AssertionError("Unexpected key " + key
+ + " appeared in RDD but not in expectedPartitionsPerKey map");
+ }
+ if (ranges.size() > maxPartitionCnt) {
+ throw new AssertionError("Key " + key + " should occupy at most "
+ maxPartitionCnt
+ + " partitions but actually occupies " + ranges.size());
+ }
+ }
+
+ // Check that ranges do not overlap (string order)
+ ranges.sort(Comparator.comparing(t -> t._2())); // sort by min
+ for (int i = 1; i < ranges.size(); i++) {
+ Tuple3<Integer, String, String> prev = ranges.get(i - 1);
+ Tuple3<Integer, String, String> curr = ranges.get(i);
+ if (curr._2().compareTo(prev._3()) <= 0) {
+ throw new AssertionError(
+ String.format(
+ "Key %d has overlapping ranges: partition %d [%s-%s] vs
partition %d [%s-%s]",
+ key,
+ prev._1(), prev._2(), prev._3(),
+ curr._1(), curr._2(), curr._3()));
+ }
+ }
+ }
+
+ // Verify no key is missing from actual data
+ if (maxPartitionCountByKey.isPresent()) {
+ for (Integer expectedKey : maxPartitionCountByKey.get().keySet()) {
+ if (!keyToPartitionRanges.containsKey(expectedKey)) {
+ throw new AssertionError("Expected key " + expectedKey + " never
appeared in the RDD");
+ }
+ }
+ }
+ } catch (AssertionError e) {
+ logRDDContent("Original RDD", originalRdd);
+ logRDDContent("Repartitioned RDD", repartitionedRdd);
+ LOG.error("Validation failed: " + e.getMessage(), e);
+ throw e; // rethrow to fail the test
+ }
+ return actualPartitionContents; // handy for unit-test callers
+ }
+
+ /**
+ * Dumps the content of an RDD to a map of partition id to key to values.
+ *
+ * @param javaPairRDD RDD to dump
+ * @return Map of partition id to key to values
+ */
+ private static Map<Integer, Map<Integer, List<String>>>
dumpRDDContent(JavaPairRDD<Integer, String> javaPairRDD) {
+ Map<Integer, Map<Integer, List<String>>> actualPartitionContents = new
HashMap<>();
+
+ javaPairRDD
+ .mapPartitionsWithIndex((idx, iter) -> {
+ Map<Integer, List<String>> keyToValues = new HashMap<>();
+ while (iter.hasNext()) {
+ Tuple2<Integer, String> row = iter.next();
+ keyToValues
+ .computeIfAbsent(row._1(), k -> new ArrayList<>())
+ .add(row._2());
+ }
+ return Collections.singletonList(new Tuple2<>(idx,
keyToValues)).iterator();
+ }, true)
+ .collect()
+ .forEach(t -> actualPartitionContents.put(t._1(), t._2()));
+ return actualPartitionContents;
+ }
+
+ /**
+ * Logs the content of an RDD to the console.
+ *
+ * @param label Label for the RDD
+ * @param pairData RDD to log
+ */
+ private static void logRDDContent(String label, HoodiePairData<Integer,
String> pairData) {
+ JavaPairRDD<Integer, String> rdd =
HoodieJavaPairRDD.getJavaPairRDD(pairData);
+
+ LOG.info("===== {} =====", label);
+ rdd
+ .mapPartitionsWithIndex((idx, iter) -> {
+ StringBuilder builder = new StringBuilder();
+ builder.append("Partition ").append(idx).append(": [");
+
+ while (iter.hasNext()) {
+ Tuple2<Integer, String> kv = iter.next();
+ builder.append("(").append(kv._1).append(",
").append(kv._2).append(")").append(", ");
+ }
+ builder.append("]");
+ return Collections.singletonList(builder.toString()).iterator();
+ }, true)
+ .collect()
+ .forEach(LOG::info);
+ LOG.info("============================\n");
+ }
+
+ @BeforeEach
+ public void setUp() {
+ jsc = new JavaSparkContext("local[2]", "test");
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (jsc != null) {
+ jsc.stop();
+ jsc = null;
+ }
+ }
+
+ @Test
+ public void testRangeBasedRepartitionForEachKey() {
Review Comment:
should these tests be in `TestHoodieSparkEngineContext`..
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -162,8 +177,11 @@ private void initIfNeeded() {
@Override
protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String
key, String partitionName) {
- Map<String, HoodieRecord<HoodieMetadataPayload>> recordsByKeys =
getRecordsByKeys(Collections.singletonList(key), partitionName);
- return Option.ofNullable(recordsByKeys.get(key));
+ List<HoodieRecord<HoodieMetadataPayload>> records = getRecordsByKeys(
+ HoodieListData.eager(Collections.singletonList(key)), partitionName,
Option.empty())
Review Comment:
could we not pass an identity function here. more readable, less branching
down below.
##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.data.HoodieListPairData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for HoodieData operations.
+ */
+public class HoodieDataUtils {
+
+ /**
+ * Creates a {@link HoodieListPairData} from a {@link Map} with eager
execution semantic.
+ * Each key-value pair in the map becomes a single pair in the resulting
data structure.
+ *
+ * @param data the input map
+ * @param <K> type of the key
+ * @param <V> type of the value
+ * @return a new {@link HoodieListPairData} instance
+ */
+ public static <K, V> HoodieListPairData<K, V> eagerMapKV(Map<K, V> data) {
+ return HoodieListPairData.eager(
+ data.entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> Collections.singletonList(entry.getValue())
+ ))
+ );
+ }
+
+ /**
+ * Collects results of the underlying collection into a {@link Map<K, V>}
+ * If there are multiple pairs sharing the same key, the resulting map uses
the incoming value (overwrites existing).
+ *
+ * This is a terminal operation
+ *
+ * @param pairData the HoodiePairData to convert
+ * @param <K> type of the key
+ * @param <V> type of the value
+ * @return a Map containing the key-value pairs with overwrite strategy for
duplicates
+ */
+ public static <K, V> Map<K, V>
collectAsMapWithOverwriteStrategy(HoodiePairData<K, V> pairData) {
Review Comment:
This assumes that `pairData` can be collected as a List *before*
de-duplication.. Can we implement this is as processGroups(), where the
de-duping happens in a distributed fashion based on context..
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -202,11 +220,17 @@ public List<String>
getPartitionPathWithPathPrefixes(List<String> relativePathPr
}
@Override
- public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(List<String> keyPrefixes,
+ public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
String partitionName,
-
boolean shouldLoadInMemory) {
+
boolean shouldLoadInMemory,
+
Option<SerializableFunctionUnchecked<String, String>> keyEncodingFn) {
+ ValidationUtils.checkState(keyPrefixes instanceof HoodieListData,
"getRecordsByKeyPrefixes only support HoodieListData at the moment");
Review Comment:
I thought we were going to fix this?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -543,6 +545,10 @@ static HoodieIndexDefinition
getSecondaryOrExpressionIndexDefinition(HoodieTable
String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
: PARTITION_NAME_EXPRESSION_INDEX_PREFIX + userIndexName;
+ HoodieTableVersion tableVersion =
metaClient.getTableConfig().getTableVersion();
Review Comment:
Tracking for @yihua : expression index should have the same version as the
underlying index type? i.e expressions can be on say bloom or SI. and the index
version should be about the storage format not what the index is built on i.e
expr of bloom or SI.. I think we should treat all indexes as expression index,
where the expression can be identity.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -219,42 +243,14 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(L
return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) :
getEngineContext().parallelize(partitionFileSlices))
.flatMap(
- (SerializableFunction<FileSlice,
Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
- return getByKeyPrefixes(fileSlice, sortedKeyPrefixes,
partitionName);
- });
+ (SerializableFunction<FileSlice,
Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice ->
+ getByKeyPrefixes(fileSlice, sortedKeyPrefixes, partitionName));
}
private Iterator<HoodieRecord<HoodieMetadataPayload>>
getByKeyPrefixes(FileSlice fileSlice,
-
List<String> sortedKeyPrefixes,
+
List<String> sortedEncodedKeyPrefixes,
String partitionName) throws IOException {
- Option<HoodieInstant> latestMetadataInstant =
-
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
- String latestMetadataInstantTime =
-
latestMetadataInstant.map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP);
- Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
- // Only those log files which have a corresponding completed instant on
the dataset should be read
- // This is because the metadata table is updated before the dataset
instants are committed.
- Set<String> validInstantTimestamps = getValidInstantTimestamps();
- InstantRange instantRange = InstantRange.builder()
- .rangeType(InstantRange.RangeType.EXACT_MATCH)
- .explicitInstants(validInstantTimestamps).build();
- HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(
- storageConf,
- metadataMetaClient.getTableConfig(),
- Option.of(instantRange),
- Option.of(transformKeyPrefixesToPredicate(sortedKeyPrefixes)));
- HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>newBuilder()
- .withReaderContext(readerContext)
- .withHoodieTableMetaClient(metadataMetaClient)
- .withLatestCommitTime(latestMetadataInstantTime)
- .withFileSlice(fileSlice)
- .withDataSchema(schema)
- .withRequestedSchema(schema)
- .withProps(buildFileGroupReaderProperties(metadataConfig))
- .withStart(0)
- .withLength(Long.MAX_VALUE)
- .withShouldUseRecordPosition(false)
- .build();
+ HoodieFileGroupReader<IndexedRecord> fileGroupReader =
buildFileGroupReader(sortedEncodedKeyPrefixes, fileSlice, false);
Review Comment:
nice cleanup
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java:
##########
@@ -269,4 +271,58 @@ public Configuration hadoopConfiguration() {
public <T> JavaRDD<T> emptyRDD() {
return javaSparkContext.emptyRDD();
}
+
+ @Override
+ public <S extends Comparable<S>, V extends Comparable<V>, R> HoodieData<R>
processValuesOfTheSameShards(
Review Comment:
this specific impl here, should have docs around how it also repartitions
the input data as needed.. this will not be applicable for e.g to
HoodieLocalEngineContext
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java:
##########
@@ -209,6 +209,12 @@ public <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O
return data.collectAsList().stream().parallel().reduce(zeroValue,
seqOp::apply, combOp::apply);
}
+ @Override
+ public <S extends Comparable<S>, V extends Comparable<V>, R> HoodieData<R>
processValuesOfTheSameShards(HoodiePairData<S, V> data,
SerializableFunction<Iterator<V>, Iterator<R>> func,
Review Comment:
should we throw an unsupported exception
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/partitioner/ConditionalRangePartitioner.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.data.partitioner;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+/**
+ * Partitioner that partitions the data based on the value ranges of the keys.
+ *
+ * It is used to partition the data into a fixed number of partitions, so that:
+ * 1. The keys are sorted within each partition.
+ * 2. There is at most only 1 key per partition.
+ * 3. For partitions containing entries of the same key, the value ranges are
not overlapping.
+ */
+public class ConditionalRangePartitioner<S extends Comparable<S>, V extends
Comparable<V>> extends Partitioner {
Review Comment:
review this fully
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -291,21 +310,27 @@ public Option<HoodieIndexMetadata> getIndexMetadata() {
if (indexMetadataOpt.isPresent() &&
!indexMetadataOpt.get().getIndexDefinitions().isEmpty()) {
return indexMetadataOpt;
}
+ Option<HoodieIndexMetadata> indexDefOption = Option.empty();
if (tableConfig.getRelativeIndexDefinitionPath().isPresent() &&
StringUtils.nonEmpty(tableConfig.getRelativeIndexDefinitionPath().get())) {
- StoragePath indexDefinitionPath =
- new StoragePath(basePath,
tableConfig.getRelativeIndexDefinitionPath().get());
- try {
- Option<byte[]> bytesOpt = FileIOUtils.readDataFromPath(storage,
indexDefinitionPath, true);
- if (bytesOpt.isPresent()) {
- return Option.of(HoodieIndexMetadata.fromJson(new
String(bytesOpt.get())));
- } else {
- return Option.of(new HoodieIndexMetadata());
- }
- } catch (IOException e) {
- throw new HoodieIOException("Could not load index definition at path:
" + tableConfig.getRelativeIndexDefinitionPath().get(), e);
+ indexDefOption = loadIndexDefFromStorage(basePath,
tableConfig.getRelativeIndexDefinitionPath().get(), storage);
Review Comment:
confirm these usages are actually efficient .. and from driver
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java:
##########
@@ -134,6 +140,8 @@ public void
createOrUpdateColumnStatsIndexDefinition(HoodieTableMetaClient metaC
.withIndexType(PARTITION_NAME_COLUMN_STATS)
.withIndexFunction(PARTITION_NAME_COLUMN_STATS)
.withSourceFields(columnsToIndex)
+ // Use the existing version if exists, otherwise fall back to the
default version.
Review Comment:
During upgrade, we are going to add versions to all index defs..
So should n't we just
- create index def with current (default) version
- update index using existing version?
I also would like to audit and ensure we don't excessively read the index
defs from storage to. do these checks
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java:
##########
@@ -129,4 +129,21 @@ public abstract <I, K, V> List<V> reduceByKey(
public abstract <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp);
public abstract <T> ReaderContextFactory<T>
getReaderContextFactory(HoodieTableMetaClient metaClient);
+
+ /**
+ * Groups values by key and applies a function to each group of values.
+ * [1 iterator maps to 1 key] It only guarantees that items returned by the
same iterator shares to the same key.
+ * [exact once across iterators] The item returned by the same iterator will
not be returned by other iterators.
+ * [1 key maps to >= 1 iterators] Items belong to the same shard can be
load-balanced across multiple iterators. It's up to API implementations to
decide
+ * load balancing pattern and how many
iterators to split into.
+ *
+ * @param data The input pair<ShardIndex, Item> to process.
+ * @param func Function to apply to each group of items with the same shard
+ * @param maxShardIndex The range of ShardIndex in data parameter. If data
contain ShardIndex 1,2,6, any maxShardIndex >=6 is valid.
+ * @param preservesPartitioning whether to preserve partitioning in the
resulting collection.
Review Comment:
its hard to capture these into general criteria. we expect contributors to
apply judgement.
but the spirit of what I meant was, whatever we add here has to be close to
a method you'd find on a RDD or SparkContext. I will see if better naming
addresses some of this.
pushed some changes to make this more general/clearer. still may be we
should name this sth like `mapGroupsAndRepartition` or sth like that to be
consistent.. For now, happy calling it `processKeyGroups`
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java:
##########
@@ -129,4 +130,27 @@ public abstract <I, K, V> List<V> reduceByKey(
public abstract <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp);
public abstract <T> ReaderContextFactory<T>
getReaderContextFactory(HoodieTableMetaClient metaClient);
+
+ /**
+ * Groups values by key and applies a function to each group of values.
+ * [1 iterator maps to 1 key] It only guarantees that items returned by the
same iterator shares to the same key.
Review Comment:
this comment is kind of dense.. Can we uplevel this. and push down specific
implementation details to SparkEngineContext as needed?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaPairRDDDynamicRepartition.java:
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.data;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTestDelayedTableMetadata;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import scala.Tuple2;
+import scala.Tuple3;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieJavaPairRDDDynamicRepartition {
Review Comment:
there is fair bit of test code here.. Can we of this be simplified and
reused from how we generate data for tests in general..
I am worried about maintainability of the test.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaPairRDDDynamicRepartition.java:
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.data;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieBackedTestDelayedTableMetadata;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import scala.Tuple2;
+import scala.Tuple3;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieJavaPairRDDDynamicRepartition {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieBackedTestDelayedTableMetadata.class);
+
+ private JavaSparkContext jsc;
+
+ /**
+ * Generates a random RDD with unbalanced data distribution across
partitions.
+ *
+ * @param sc Spark context
+ * @param maxValueByKey Map of key to maximum number of values
+ * @param partitionWeights List of weights for each partition
+ * @param seed seed used for randomization
+ * @return RDD with weighted partition distribution
+ */
+ public static JavaPairRDD<Integer, String>
generateRandomRDDWithWeightedPartitions(
+ JavaSparkContext sc,
+ Map<Integer, Long> maxValueByKey,
+ List<Double> partitionWeights,
+ long seed) {
+
+ // Generate all possible pairs of key and value in a single list.
+ List<Tuple2<Integer, String>> allPairs = new ArrayList<>();
+ for (Map.Entry<Integer, Long> e : maxValueByKey.entrySet()) {
+ for (long v = 1; v <= e.getValue(); v++) {
+ allPairs.add(new Tuple2<>(e.getKey(), Long.toString(v)));
+ }
+ }
+
+ Collections.shuffle(allPairs, new Random(seed));
+
+ int total = allPairs.size();
+ List<JavaPairRDD<Integer, String>> rdds = new ArrayList<>();
+ int start = 0;
+
+ // Split the list into partitions based on the weights.
+ for (int i = 0; i < partitionWeights.size(); i++) {
+ int end = (i == partitionWeights.size() - 1)
+ ? total
+ : Math.min(total, start + (int) Math.round(partitionWeights.get(i) *
total));
+
+ List<Tuple2<Integer, String>> slice = allPairs.subList(start, end);
+ JavaPairRDD<Integer, String> sliceRdd = sc.parallelize(slice,
1).mapToPair(t -> t);
+ rdds.add(sliceRdd);
+ start = end;
+ if (start >= total) {
+ break;
+ }
+ }
+
+ // Combine all the partitions into a single RDD.
+ JavaPairRDD<Integer, String> combined = rdds.get(0);
+ for (int i = 1; i < rdds.size(); i++) {
+ combined = combined.union(rdds.get(i));
+ }
+
+ return combined;
+ }
+
+ /**
+ * Validates various properties of a repartitioned RDD, including:
+ * 1. Each key is in exactly one partition.
+ * 2. The keys are sorted within each partition.
+ * 3. For partitions containing entries of the same key, the value ranges
are not overlapping.
+ * 4. Number of keys per partition is probably at most maxKeyPerBucket.
+ *
+ * @param originalRdd Original RDD
+ * @param repartitionedRdd Repartitioned RDD
+ * @param maxPartitionCountByKey Map of key to maximum number of partitions
+ * @throws AssertionError if any check fails
+ */
+ private static Map<Integer, Map<Integer, List<String>>>
validateRepartitionedRDDProperties(
+ HoodiePairData<Integer, String> originalRdd,
+ HoodiePairData<Integer, String> repartitionedRdd,
+ Option<Map<Integer, Integer>> maxPartitionCountByKey) {
+ JavaPairRDD<Integer, String> javaPairRDD =
HoodieJavaPairRDD.getJavaPairRDD(repartitionedRdd);
+
+ Map<Integer, Map<Integer, List<String>>> actualPartitionContents =
dumpRDDContent(javaPairRDD);
+
+ try {
+ // Values in each partition are sorted.
+ for (Map.Entry<Integer, Map<Integer, List<String>>> p :
actualPartitionContents.entrySet()) {
+ int partitionId = p.getKey();
+ Map<Integer, List<String>> keyToValues = p.getValue();
+
+ if (keyToValues.size() != 1) {
+ assertEquals(1, keyToValues.size(),
+ "Each partition should contain exactly one key, but found keys "
+ keyToValues.keySet()
+ + " in partition " + partitionId);
+ logRDDContent("validation failure, original rdd ", originalRdd);
+ logRDDContent("validation failure, repartitioned rdd ",
repartitionedRdd);
+ }
+
+ for (Map.Entry<Integer, List<String>> kv : keyToValues.entrySet()) {
+ List<String> values = kv.getValue();
+ List<String> sorted = new ArrayList<>(values);
+ Collections.sort(sorted);
+ if (!values.equals(sorted)) {
+ throw new AssertionError(
+ "Partition " + partitionId + ", key " + kv.getKey()
+ + " has unsorted values: " + values);
+ }
+ }
+ }
+
+ // Build key → list<(partitionId, min, max)>
+ Map<Integer, List<Tuple3<Integer, String, String>>> keyToPartitionRanges
= new HashMap<>();
+
+ for (Map.Entry<Integer, Map<Integer, List<String>>> p :
actualPartitionContents.entrySet()) {
+ int partitionId = p.getKey();
+ for (Map.Entry<Integer, List<String>> kv : p.getValue().entrySet()) {
+ List<String> sorted = new ArrayList<>(kv.getValue());
+ Collections.sort(sorted);
+ keyToPartitionRanges
+ .computeIfAbsent(kv.getKey(), k -> new ArrayList<>())
+ .add(new Tuple3<>(partitionId, sorted.get(0),
sorted.get(sorted.size() - 1)));
+ }
+ }
+
+ // Range-overlap check and expected-partition-count check
+ for (Map.Entry<Integer, List<Tuple3<Integer, String, String>>> e :
keyToPartitionRanges.entrySet()) {
+ int key = e.getKey();
+ List<Tuple3<Integer, String, String>> ranges = e.getValue();
+
+ // Confirm expected #partitions
+ if (maxPartitionCountByKey.isPresent()) {
+ Integer maxPartitionCnt = maxPartitionCountByKey.get().get(key);
+ if (maxPartitionCnt == null) {
+ throw new AssertionError("Unexpected key " + key
+ + " appeared in RDD but not in expectedPartitionsPerKey map");
+ }
+ if (ranges.size() > maxPartitionCnt) {
+ throw new AssertionError("Key " + key + " should occupy at most "
+ maxPartitionCnt
+ + " partitions but actually occupies " + ranges.size());
+ }
+ }
+
+ // Check that ranges do not overlap (string order)
+ ranges.sort(Comparator.comparing(t -> t._2())); // sort by min
+ for (int i = 1; i < ranges.size(); i++) {
+ Tuple3<Integer, String, String> prev = ranges.get(i - 1);
+ Tuple3<Integer, String, String> curr = ranges.get(i);
+ if (curr._2().compareTo(prev._3()) <= 0) {
+ throw new AssertionError(
+ String.format(
+ "Key %d has overlapping ranges: partition %d [%s-%s] vs
partition %d [%s-%s]",
+ key,
+ prev._1(), prev._2(), prev._3(),
+ curr._1(), curr._2(), curr._3()));
+ }
+ }
+ }
+
+ // Verify no key is missing from actual data
+ if (maxPartitionCountByKey.isPresent()) {
+ for (Integer expectedKey : maxPartitionCountByKey.get().keySet()) {
+ if (!keyToPartitionRanges.containsKey(expectedKey)) {
+ throw new AssertionError("Expected key " + expectedKey + " never
appeared in the RDD");
+ }
+ }
+ }
+ } catch (AssertionError e) {
+ logRDDContent("Original RDD", originalRdd);
+ logRDDContent("Repartitioned RDD", repartitionedRdd);
+ LOG.error("Validation failed: " + e.getMessage(), e);
+ throw e; // rethrow to fail the test
+ }
+ return actualPartitionContents; // handy for unit-test callers
+ }
+
+ /**
+ * Dumps the content of an RDD to a map of partition id to key to values.
+ *
+ * @param javaPairRDD RDD to dump
+ * @return Map of partition id to key to values
+ */
+ private static Map<Integer, Map<Integer, List<String>>>
dumpRDDContent(JavaPairRDD<Integer, String> javaPairRDD) {
+ Map<Integer, Map<Integer, List<String>>> actualPartitionContents = new
HashMap<>();
+
+ javaPairRDD
+ .mapPartitionsWithIndex((idx, iter) -> {
+ Map<Integer, List<String>> keyToValues = new HashMap<>();
+ while (iter.hasNext()) {
+ Tuple2<Integer, String> row = iter.next();
+ keyToValues
+ .computeIfAbsent(row._1(), k -> new ArrayList<>())
+ .add(row._2());
+ }
+ return Collections.singletonList(new Tuple2<>(idx,
keyToValues)).iterator();
+ }, true)
+ .collect()
+ .forEach(t -> actualPartitionContents.put(t._1(), t._2()));
+ return actualPartitionContents;
+ }
+
+ /**
+ * Logs the content of an RDD to the console.
+ *
+ * @param label Label for the RDD
+ * @param pairData RDD to log
+ */
+ private static void logRDDContent(String label, HoodiePairData<Integer,
String> pairData) {
Review Comment:
we could do `df.show` ..instead of all this?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.data.HoodieListPairData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for HoodieData operations.
+ */
+public class HoodieDataUtils {
+
+ /**
+ * Creates a {@link HoodieListPairData} from a {@link Map} with eager
execution semantic.
+ * Each key-value pair in the map becomes a single pair in the resulting
data structure.
+ *
+ * @param data the input map
+ * @param <K> type of the key
+ * @param <V> type of the value
+ * @return a new {@link HoodieListPairData} instance
+ */
+ public static <K, V> HoodieListPairData<K, V> eagerMapKV(Map<K, V> data) {
+ return HoodieListPairData.eager(
+ data.entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> Collections.singletonList(entry.getValue())
+ ))
+ );
+ }
+
+ /**
+ * Collects results of the underlying collection into a {@link Map<K, V>}
+ * If there are multiple pairs sharing the same key, the resulting map uses
the incoming value (overwrites existing).
+ *
+ * This is a terminal operation
+ *
+ * @param pairData the HoodiePairData to convert
+ * @param <K> type of the key
+ * @param <V> type of the value
+ * @return a Map containing the key-value pairs with overwrite strategy for
duplicates
+ */
+ public static <K, V> Map<K, V>
collectAsMapWithOverwriteStrategy(HoodiePairData<K, V> pairData) {
Review Comment:
Can't we do a reduceByKey().. and then collect?
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java:
##########
@@ -129,4 +130,27 @@ public abstract <I, K, V> List<V> reduceByKey(
public abstract <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp);
public abstract <T> ReaderContextFactory<T>
getReaderContextFactory(HoodieTableMetaClient metaClient);
+
+ /**
+ * Groups values by key and applies a function to each group of values.
+ * [1 iterator maps to 1 key] It only guarantees that items returned by the
same iterator shares to the same key.
+ * [exact once across iterators] The item returned by the same iterator will
not be returned by other iterators.
+ * [1 key maps to >= 1 iterators] Items belong to the same shard can be
load-balanced across multiple iterators. It's up to API implementations to
decide
+ * load balancing pattern and how many
iterators to split into.
+ * [iterator return sorted values] Values returned via iterator is sorted.
+ *
+ * @param data The input pair<ShardIndex, Item> to process.
+ * @param func Function to apply to each group of items with the same shard
+ * @param shardIndices Set of all possible shard indices that may appear in
the data. This is used for efficient partitioning and load balancing.
+ * @param preservesPartitioning whether to preserve partitioning in the
resulting collection.
+ * @param <S> Type of the shard index (must be Comparable)
+ * @param <V> Type of the value in the input data (must be Comparable)
+ * @param <R> Type of the result
+ * @return Result of applying the function to each group
+ */
+ public <S extends Comparable<S>, V extends Comparable<V>, R> HoodieData<R>
processValuesOfTheSameShards(
+ HoodiePairData<S, V> data, SerializableFunction<Iterator<V>,
Iterator<R>> func, List<S> shardIndices, boolean preservesPartitioning) {
Review Comment:
should we enforce that `Iterator<V>` is sorted? in the first arg to the
SerializableFunction parameter?
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java:
##########
@@ -209,6 +209,12 @@ public <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O
return data.collectAsList().stream().parallel().reduce(zeroValue,
seqOp::apply, combOp::apply);
}
+ @Override
+ public <S extends Comparable<S>, V extends Comparable<V>, R> HoodieData<R>
processValuesOfTheSameShards(HoodiePairData<S, V> data,
SerializableFunction<Iterator<V>, Iterator<R>> func,
Review Comment:
done
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -162,8 +177,11 @@ private void initIfNeeded() {
@Override
protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String
key, String partitionName) {
- Map<String, HoodieRecord<HoodieMetadataPayload>> recordsByKeys =
getRecordsByKeys(Collections.singletonList(key), partitionName);
- return Option.ofNullable(recordsByKeys.get(key));
+ List<HoodieRecord<HoodieMetadataPayload>> records = getRecordsByKeys(
+ HoodieListData.eager(Collections.singletonList(key)), partitionName,
Option.empty())
+ .values().collectAsList();
+ ValidationUtils.checkArgument(records.size() <= 1, "Found more than 1
records for record key " + key);
Review Comment:
done
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java:
##########
@@ -72,6 +68,21 @@ public static void validateTableVersion(HoodieTableConfig
tableConfig, HoodieWri
}
}
+ public static boolean isValidTableVersionWriteVersionPair(HoodieTableVersion
tableVersion, HoodieTableVersion writeVersion) {
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]