This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 73ff4ea3aa4f perf(metadata): Parse RLI instant time once per batch
instead of per … (#18965)
73ff4ea3aa4f is described below
commit 73ff4ea3aa4fb0c6b7b3e5be1c0210c72faa532c
Author: voonhous <[email protected]>
AuthorDate: Sat Jun 13 17:37:16 2026 +0800
perf(metadata): Parse RLI instant time once per batch instead of per …
(#18965)
* perf(metadata): Parse RLI instant time once per batch instead of per
record
createRecordIndexUpdate parsed the commit instant time string into epoch
millis for every record although the instant is constant for the whole
commit. Add a millis-based overload plus a public
parseRecordIndexInstantTime helper and hoist the parse out of the
per-record loops in base-file RLI generation, revived-key processing,
record key iterators, RLI initialization and the Flink index write
path. RecordIndexMapper memoizes per write status since its delegate
locations can carry different instants.
On the read side, getLocationFromRecordIndexInfo formatted the instant
millis back to a string per looked-up record. Cache the formatting in a
per-thread bounded map that is invalidated when the JVM default time
zone changes, since formatDate depends on it.
* review: Print raw epoch millis in the invalid-fileId error message
Reformatting the millis through HoodieInstantTimeGenerator.formatDate
raised a timezone question and is not always byte-identical to the
original instant string (legacy second-granularity instants gain the
compatibility-parse millis suffix), so print the unambiguous raw
millis instead on this error path.
* review: Drop the read-side formatted-instant cache
Restore the direct per-call formatting in getLocationFromRecordIndexInfo;
the cache needed zone-change invalidation to stay correct, which is more
machinery than the formatting cost justifies. The PR is now write-side
only (parse hoisting).
* review: Print instantTime, not raw millis, in the invalid-fileId error
Log the human-readable instant time in the error message instead of raw
epoch millis. Reconstruct it only on the cold catch path via
TimelineUtils.formatDate(new Date(instantTimeMillis)) (the inverse of
parseDateFromInstantTime), so the hot per-record path keeps using the
pre-parsed millis and pays nothing.
* docs(metadata): Document how instantTimeMillis must be parsed and its
timezone
The millis overload of createRecordIndexUpdate now documents that
instantTimeMillis
must come from parseRecordIndexInstantTime(String) (delegating to
TimelineUtils.parseDateFromInstantTime), not hand-constructed, and that the
instant-time string is interpreted in the JVM default time zone
(ZoneId.systemDefault()), yielding epoch millis since 1970-01-01T00:00:00Z.
---
.../metadata/HoodieBackedTableMetadataWriter.java | 4 +-
.../apache/hudi/metadata/RecordIndexMapper.java | 9 +++-
.../hudi/metadata/BaseFileRecordParsingUtils.java | 3 +-
.../hudi/metadata/HoodieMetadataPayload.java | 42 ++++++++++++++--
.../hudi/metadata/HoodieTableMetadataUtil.java | 10 ++--
.../hudi/metadata/TestHoodieTableMetadataUtil.java | 58 ++++++++++++++++++++++
.../hudi/sink/partitioner/index/IndexRowUtils.java | 4 +-
.../sink/partitioner/index/IndexWriteFunction.java | 4 +-
8 files changed, 120 insertions(+), 14 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 2131080a0ac2..7e75b71b14f4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1011,11 +1011,11 @@ public abstract class
HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
.withShouldUseRecordPosition(false)
.withProps(metaClient.getTableConfig().getProps())
.build();
- String baseFileInstantTime = fileSlice.getBaseInstantTime();
+ long baseFileInstantTimeMillis =
HoodieMetadataPayload.parseRecordIndexInstantTime(fileSlice.getBaseInstantTime());
return new
CloseableMappingIterator<>(fileGroupReader.getClosableIterator(), record -> {
String recordKey =
readerContext.getRecordContext().getRecordKey(record, requestedSchema);
return HoodieMetadataPayload.createRecordIndexUpdate(recordKey,
partition, fileId,
- baseFileInstantTime, 0);
+ baseFileInstantTimeMillis, 0);
});
});
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/RecordIndexMapper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/RecordIndexMapper.java
index 329e74e60676..4b736768ebc2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/RecordIndexMapper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/RecordIndexMapper.java
@@ -30,7 +30,9 @@ import org.apache.hudi.exception.HoodieMetadataException;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Mapper for Record Level Index (RLI).
@@ -45,6 +47,8 @@ public class RecordIndexMapper extends MetadataIndexMapper {
@Override
protected List<HoodieRecord> generateRecords(WriteStatus writeStatus) {
List<HoodieRecord> allRecords = new ArrayList<>();
+ // delegates of one write status share at most a few distinct instants, so
memoize the parse
+ Map<String, Long> instantTimeMillisCache = new HashMap<>();
for (HoodieRecordDelegate recordDelegate :
writeStatus.getIndexStats().getWrittenRecordDelegates()) {
if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
if (recordDelegate.isIgnoreIndexUpdate()) {
@@ -68,7 +72,10 @@ public class RecordIndexMapper extends MetadataIndexMapper {
// Insert new record case
hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate(
recordDelegate.getRecordKey(),
recordDelegate.getPartitionPath(),
- newLocation.get().getFileId(),
newLocation.get().getInstantTime(), dataWriteConfig.getWritesFileIdEncoding());
+ newLocation.get().getFileId(),
+ instantTimeMillisCache.computeIfAbsent(
+ newLocation.get().getInstantTime(),
HoodieMetadataPayload::parseRecordIndexInstantTime),
+ dataWriteConfig.getWritesFileIdEncoding());
allRecords.add(hoodieRecord);
}
} else {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java
index e4d83111ae88..6b92f9dcf39b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java
@@ -77,9 +77,10 @@ public class BaseFileRecordParsingUtils {
recordStatuses);
List<HoodieRecord> hoodieRecords = new ArrayList<>();
if (recordStatusListMap.containsKey(RecordStatus.INSERT)) {
+ long instantTimeMillis =
HoodieMetadataPayload.parseRecordIndexInstantTime(instantTime);
hoodieRecords.addAll(recordStatusListMap.get(RecordStatus.INSERT).stream()
.map(recordKey -> (HoodieRecord)
HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partition, fileId,
- instantTime, writesFileIdEncoding)).collect(toList()));
+ instantTimeMillis, writesFileIdEncoding)).collect(toList()));
}
if (recordStatusListMap.containsKey(RecordStatus.DELETE)) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index ac509fce962e..325a7a5af6ba 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -60,6 +60,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -649,14 +650,46 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
*/
public static HoodieRecord<HoodieMetadataPayload>
createRecordIndexUpdate(String recordKey, String partition,
String fileId, String instantTime, int fileIdEncoding) {
+ return createRecordIndexUpdate(recordKey, partition, fileId,
parseRecordIndexInstantTime(instantTime), fileIdEncoding);
+ }
- HoodieKey key = new HoodieKey(recordKey,
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
- long instantTimeMillis = -1;
+ /**
+ * Parses an instant time into the epoch millis stored in record index
entries.
+ * <p>
+ * Callers creating record index updates for many records of the same commit
should parse the
+ * instant time once with this method and use the millis-based
+ * {@link #createRecordIndexUpdate(String, String, String, long, int)}
overload per record.
+ */
+ public static long parseRecordIndexInstantTime(String instantTime) {
try {
- instantTimeMillis =
TimelineUtils.parseDateFromInstantTime(instantTime).getTime();
+ return TimelineUtils.parseDateFromInstantTime(instantTime).getTime();
} catch (Exception e) {
throw new HoodieMetadataException("Failed to create metadata payload for
record index. Instant time parsing for " + instantTime + " failed ", e);
}
+ }
+
+ /**
+ * Create and return a {@code HoodieMetadataPayload} to insert or update an
entry for the record index.
+ * <p>
+ * Same as {@link #createRecordIndexUpdate(String, String, String, String,
int)} but takes the
+ * instant time already parsed to epoch millis, so per-commit callers parse
it only once.
+ * <p>
+ * {@code instantTimeMillis} must be obtained from {@link
#parseRecordIndexInstantTime(String)} (which
+ * delegates to {@link TimelineUtils#parseDateFromInstantTime(String)}); it
should not be constructed by
+ * hand, so that the value stored here matches what the String overload
would write. The instant-time
+ * string is interpreted in the JVM default time zone ({@link
java.time.ZoneId#systemDefault()}) and the
+ * result is epoch milliseconds (since 1970-01-01T00:00:00Z).
+ *
+ * @param recordKey Key of the record
+ * @param partition Name of the partition which contains the record
+ * @param fileId fileId which contains the record
+ * @param instantTimeMillis epoch millis of the instant when the record was
added, as returned by
+ * {@link #parseRecordIndexInstantTime(String)}
+ */
+ public static HoodieRecord<HoodieMetadataPayload>
createRecordIndexUpdate(String recordKey, String partition,
+
String fileId, long instantTimeMillis, int fileIdEncoding) {
+
+ HoodieKey key = new HoodieKey(recordKey,
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
if (fileIdEncoding == 0) {
// Data file names have a -D suffix to denote the index (D = integer) of
the file written
// In older HUID versions the file index was missing
@@ -672,8 +705,9 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
fileIndex = Integer.parseInt(fileId.substring(index + 1));
}
} catch (Exception e) {
+ // reconstruct the instant time only on this cold error path; the hot
per-record path keeps the pre-parsed millis
throw new HoodieMetadataException(String.format("Invalid UUID or
index: fileID=%s, partition=%s, instantTime=%s",
- fileId, partition, instantTime), e);
+ fileId, partition, TimelineUtils.formatDate(new
Date(instantTimeMillis))), e);
}
HoodieMetadataPayload payload = new HoodieMetadataPayload(recordKey,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 0ede3ae5015e..23f20fa4efea 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -144,6 +144,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -940,8 +941,9 @@ public class HoodieTableMetadataUtil {
Set<String> revivedKeys = revivedAndDeletedKeys.getLeft();
Set<String> deletedKeys = revivedAndDeletedKeys.getRight();
// Process revived keys to create updates
+ long instantTimeMillis =
HoodieMetadataPayload.parseRecordIndexInstantTime(instantTime);
List<HoodieRecord> revivedRecords = revivedKeys.stream()
- .map(recordKey ->
HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partitionPath, fileId,
instantTime, writesFileIdEncoding))
+ .map(recordKey ->
HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partitionPath, fileId,
instantTimeMillis, writesFileIdEncoding))
.collect(Collectors.toList());
// Process deleted keys to create deletes
List<HoodieRecord> deletedRecords = deletedKeys.stream()
@@ -2499,7 +2501,7 @@ public class HoodieTableMetadataUtil {
fileId = originalFileId;
}
- final java.util.Date instantDate = new java.util.Date(instantTime);
+ final Date instantDate = new Date(instantTime);
return new HoodieRecordGlobalLocation(partition,
HoodieInstantTimeGenerator.formatDate(instantDate), fileId);
}
@@ -2615,6 +2617,8 @@ public class HoodieTableMetadataUtil {
String
instantTime,
boolean isPartitionedRLI
) {
+ // the delete iterator never reads the instant time, so only the update
path pays the parse
+ final long instantTimeMillis = forDelete ? -1L :
HoodieMetadataPayload.parseRecordIndexInstantTime(instantTime);
return new ClosableIterator<HoodieRecord>() {
@Override
public void close() {
@@ -2630,7 +2634,7 @@ public class HoodieTableMetadataUtil {
public HoodieRecord next() {
return forDelete
?
HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next(),
partition, isPartitionedRLI)
- :
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(),
partition, fileId, instantTime, 0);
+ :
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(),
partition, fileId, instantTimeMillis, 0);
}
};
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 95023eebe695..6cab9b9067c3 100644
---
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -21,22 +21,27 @@ package org.apache.hudi.metadata;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.util.Option;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TimeZone;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS;
@@ -352,4 +357,57 @@ class TestHoodieTableMetadataUtil {
"STRING should remain supported for record type " + recordType);
}
}
+
+ @Test
+ void testCreateRecordIndexUpdateMillisOverloadMatchesStringOverload() {
+ String instantTime = "20260610153045678";
+ long instantTimeMillis =
HoodieMetadataPayload.parseRecordIndexInstantTime(instantTime);
+
+ // uuid-encoded fileId (encoding 0)
+ HoodieRecord<HoodieMetadataPayload> fromString =
HoodieMetadataPayload.createRecordIndexUpdate(
+ "rk1", "p1", "49b8b3c8-9e5d-4731-9d51-a2d8e9b5c7f3-0", instantTime, 0);
+ HoodieRecord<HoodieMetadataPayload> fromMillis =
HoodieMetadataPayload.createRecordIndexUpdate(
+ "rk1", "p1", "49b8b3c8-9e5d-4731-9d51-a2d8e9b5c7f3-0",
instantTimeMillis, 0);
+ assertEquals(fromString.getKey(), fromMillis.getKey());
+ assertEquals(fromString.getData(), fromMillis.getData());
+
+ // raw fileId (encoding 1)
+ fromString = HoodieMetadataPayload.createRecordIndexUpdate(
+ "rk1", "p1", "some-raw-file-id", instantTime, 1);
+ fromMillis = HoodieMetadataPayload.createRecordIndexUpdate(
+ "rk1", "p1", "some-raw-file-id", instantTimeMillis, 1);
+ assertEquals(fromString.getKey(), fromMillis.getKey());
+ assertEquals(fromString.getData(), fromMillis.getData());
+ }
+
+ @Test
+ void testGetLocationFromRecordIndexInfoFormatsInstantConsistently() {
+ long instantMillis1 =
HoodieMetadataPayload.parseRecordIndexInstantTime("20260610153045678");
+ long instantMillis2 =
HoodieMetadataPayload.parseRecordIndexInstantTime("20260610163045678");
+ String expected1 = HoodieInstantTimeGenerator.formatDate(new
Date(instantMillis1));
+ String expected2 = HoodieInstantTimeGenerator.formatDate(new
Date(instantMillis2));
+ // repeated and alternating instants must format consistently
+ for (long instantMillis : new long[] {instantMillis1, instantMillis1,
instantMillis2, instantMillis1}) {
+ HoodieRecordGlobalLocation location =
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(
+ "p1", 1, -1L, -1L, -1, "some-raw-file-id", instantMillis);
+ assertEquals(instantMillis == instantMillis1 ? expected1 : expected2,
location.getInstantTime());
+ assertEquals("p1", location.getPartitionPath());
+ assertEquals("some-raw-file-id", location.getFileId());
+ }
+
+ // formatDate follows the JVM default time zone, so the decoded location
must track a zone
+ // change; the two switches differ in offset, so at least one changes the
formatted string
+ TimeZone originalTimeZone = TimeZone.getDefault();
+ try {
+ for (String zoneId : new String[] {"UTC", "Asia/Kolkata"}) {
+ TimeZone.setDefault(TimeZone.getTimeZone(zoneId));
+ String expectedInZone = HoodieInstantTimeGenerator.formatDate(new
Date(instantMillis1));
+ HoodieRecordGlobalLocation location =
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(
+ "p1", 1, -1L, -1L, -1, "some-raw-file-id", instantMillis1);
+ assertEquals(expectedInZone, location.getInstantTime());
+ }
+ } finally {
+ TimeZone.setDefault(originalTimeZone);
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexRowUtils.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexRowUtils.java
index 4ea6f06e7551..5e8a04c2e15c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexRowUtils.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexRowUtils.java
@@ -70,7 +70,7 @@ public class IndexRowUtils {
return indexRow;
}
- public static HoodieRecord convertToHoodieRecord(String instant, RowData
indexRow, HoodieWriteConfig dataWriteConfig) {
+ public static HoodieRecord convertToHoodieRecord(long instantMillis, RowData
indexRow, HoodieWriteConfig dataWriteConfig) {
if (indexRow.getByte(INDEX_TYPE_ORD) == RLI_TYPE) {
switch (indexRow.getRowKind()) {
case INSERT:
@@ -78,7 +78,7 @@ public class IndexRowUtils {
String.valueOf(indexRow.getString(KEY_ORD)),
String.valueOf(indexRow.getString(PARTITION_ORD)),
String.valueOf(indexRow.getString(FILE_ID_ORD)),
- instant,
+ instantMillis,
dataWriteConfig.getWritesFileIdEncoding());
case DELETE:
return HoodieMetadataPayload.createRecordIndexDelete(
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
index 8e64d58a97b3..066f5aa62eb6 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.exception.MemoryPagesExhaustedException;
@@ -174,11 +175,12 @@ public class IndexWriteFunction extends
AbstractStreamWriteFunction<RowData> {
HoodieWriteConfig writeConfig = writeClient.getConfig();
// deduplicate the index records using commit time ordering.
Map<String, HoodieRecord> keyAndRecordMap = new LinkedHashMap<>();
+ long currentInstantMillis =
HoodieMetadataPayload.parseRecordIndexInstantTime(this.currentInstant);
while (rowItr.hasNext()) {
RowData indexRow = rowItr.next();
keyAndRecordMap.put(
dedupKeyExtractor.apply(indexRow),
- IndexRowUtils.convertToHoodieRecord(this.currentInstant, indexRow,
writeConfig));
+ IndexRowUtils.convertToHoodieRecord(currentInstantMillis, indexRow,
writeConfig));
dataPartitions.add(IndexRowUtils.getPartition(indexRow));
}
return Pair.of(new ArrayList<>(keyAndRecordMap.values()), dataPartitions);