jerryshao commented on code in PR #8450:
URL: https://github.com/apache/gravitino/pull/8450#discussion_r2340571995
##########
core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java:
##########
@@ -79,13 +90,21 @@ public class LancePartitionStatisticStorage implements
PartitionStatisticStorage
private static final int DEFAULT_MAX_ROWS_PER_GROUP = 1000000; // 1M
private static final String READ_BATCH_SIZE = "readBatchSize";
private static final int DEFAULT_READ_BATCH_SIZE = 10000; // 10K
+ private static final String DATASET_CACHE_SIZE = "datasetCacheSize";
+ private static final int DEFAULT_DATASET_CACHE_SIZE = 0;
+ private static final String METADATA_FILE_CACHE_SIZE =
"metadataFileCacheSizeBytes";
+ private static final long DEFAULT_METADATA_FILE_CACHE_SIZE = 100L * 1024 *
1024; // 100MB
+ private static final String INDEX_CACHE_SIZE = "indexCacheSizeBytes";
+ private static final long DEFAULT_INDEX_CACHE_SIZE = 100L * 1024 * 1024; //
100MB
// The schema is `table_id`, `partition_name`, `statistic_name`,
`statistic_value`, `audit_info`
private static final String TABLE_ID_COLUMN = "table_id";
private static final String PARTITION_NAME_COLUMN = "partition_name";
private static final String STATISTIC_NAME_COLUMN = "statistic_name";
private static final String STATISTIC_VALUE_COLUMN = "statistic_value";
private static final String AUDIT_INFO_COLUMN = "audit_info";
+ private final Cache<Long, Dataset> cache;
Review Comment:
Better renaming to `datasetCache`.
##########
core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java:
##########
@@ -354,70 +447,100 @@ private static String getPartitionFilter(PartitionRange
range) {
private List<PersistedPartitionStatistics> listStatisticsImpl(
Long tableId, String partitionFilter) {
- String fileName = getFilePath(tableId);
-
- try (Dataset dataset = open(fileName)) {
-
- String filter = "table_id = " + tableId + partitionFilter;
-
- try (LanceScanner scanner =
- dataset.newScan(
- new ScanOptions.Builder()
- .columns(
- Arrays.asList(
- TABLE_ID_COLUMN,
- PARTITION_NAME_COLUMN,
- STATISTIC_NAME_COLUMN,
- STATISTIC_VALUE_COLUMN,
- AUDIT_INFO_COLUMN))
- .withRowId(true)
- .batchSize(readBatchSize)
- .filter(filter)
- .build())) {
- Map<String, List<PersistedStatistic>> partitionStatistics =
Maps.newConcurrentMap();
- try (ArrowReader reader = scanner.scanBatches()) {
- while (reader.loadNextBatch()) {
- VectorSchemaRoot root = reader.getVectorSchemaRoot();
- List<FieldVector> fieldVectors = root.getFieldVectors();
- VarCharVector partitionNameVector = (VarCharVector)
fieldVectors.get(1);
- VarCharVector statisticNameVector = (VarCharVector)
fieldVectors.get(2);
- LargeVarCharVector statisticValueVector = (LargeVarCharVector)
fieldVectors.get(3);
- VarCharVector auditInfoNameVector = (VarCharVector)
fieldVectors.get(4);
-
- for (int i = 0; i < root.getRowCount(); i++) {
- String partitionName = new String(partitionNameVector.get(i),
StandardCharsets.UTF_8);
- String statisticName = new String(statisticNameVector.get(i),
StandardCharsets.UTF_8);
- String statisticValueStr =
- new String(statisticValueVector.get(i),
StandardCharsets.UTF_8);
- String auditInoStr = new String(auditInfoNameVector.get(i),
StandardCharsets.UTF_8);
-
- StatisticValue<?> statisticValue =
- JsonUtils.anyFieldMapper().readValue(statisticValueStr,
StatisticValue.class);
- AuditInfo auditInfo =
- JsonUtils.anyFieldMapper().readValue(auditInoStr,
AuditInfo.class);
-
- PersistedStatistic persistedStatistic =
- PersistedStatistic.of(statisticName, statisticValue,
auditInfo);
-
- partitionStatistics
- .computeIfAbsent(partitionName, k -> Lists.newArrayList())
- .add(persistedStatistic);
- }
- }
- return partitionStatistics.entrySet().stream()
- .map(entry -> PersistedPartitionStatistics.of(entry.getKey(),
entry.getValue()))
- .collect(Collectors.toList());
+ Dataset dataset = getDataset(tableId);
+
+ String filter = "table_id = " + tableId + partitionFilter;
+
+ try (LanceScanner scanner =
+ dataset.newScan(
+ new ScanOptions.Builder()
+ .columns(
+ Arrays.asList(
+ TABLE_ID_COLUMN,
+ PARTITION_NAME_COLUMN,
+ STATISTIC_NAME_COLUMN,
+ STATISTIC_VALUE_COLUMN,
+ AUDIT_INFO_COLUMN))
+ .withRowId(true)
+ .batchSize(readBatchSize)
+ .filter(filter)
+ .build())) {
+ Map<String, List<PersistedStatistic>> partitionStatistics =
Maps.newConcurrentMap();
+ try (ArrowReader reader = scanner.scanBatches()) {
+ while (reader.loadNextBatch()) {
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+ List<FieldVector> fieldVectors = root.getFieldVectors();
+ VarCharVector partitionNameVector = (VarCharVector)
fieldVectors.get(1);
+ VarCharVector statisticNameVector = (VarCharVector)
fieldVectors.get(2);
+ LargeVarCharVector statisticValueVector = (LargeVarCharVector)
fieldVectors.get(3);
+ VarCharVector auditInfoNameVector = (VarCharVector)
fieldVectors.get(4);
+
+ for (int i = 0; i < root.getRowCount(); i++) {
+ String partitionName = new String(partitionNameVector.get(i),
StandardCharsets.UTF_8);
+ String statisticName = new String(statisticNameVector.get(i),
StandardCharsets.UTF_8);
+ String statisticValueStr =
+ new String(statisticValueVector.get(i),
StandardCharsets.UTF_8);
+ String auditInoStr = new String(auditInfoNameVector.get(i),
StandardCharsets.UTF_8);
+
+ StatisticValue<?> statisticValue =
+ JsonUtils.anyFieldMapper().readValue(statisticValueStr,
StatisticValue.class);
+ AuditInfo auditInfo =
+ JsonUtils.anyFieldMapper().readValue(auditInoStr,
AuditInfo.class);
+
+ PersistedStatistic persistedStatistic =
+ PersistedStatistic.of(statisticName, statisticValue,
auditInfo);
+
+ partitionStatistics
+ .computeIfAbsent(partitionName, k -> Lists.newArrayList())
+ .add(persistedStatistic);
+ }
}
- } catch (Exception e) {
- throw new RuntimeException(e);
+
+ return partitionStatistics.entrySet().stream()
+ .map(entry -> PersistedPartitionStatistics.of(entry.getKey(),
entry.getValue()))
+ .collect(Collectors.toList());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (cache == null && dataset != null) {
+ dataset.close();
}
}
}
+ private Dataset getDataset(Long tableId) {
+ AtomicBoolean newlyCreated = new AtomicBoolean(false);
+ if (cache != null) {
+ Dataset dataset =
+ cache.get(
+ tableId,
+ id -> {
+ newlyCreated.set(true);
+ return open(getFilePath(id));
+ });
+
+ // ensure that the dataset is the latest version
+ if (!newlyCreated.get()) {
+ dataset.checkoutLatest();
+ }
Review Comment:
This code has a threading issue, only one thread needs to check out the
lastest snapshot.
##########
gradle/libs.versions.toml:
##########
@@ -29,7 +29,7 @@ guava = "32.1.3-jre"
lombok = "1.18.20"
slf4j = "2.0.9"
log4j = "2.24.3"
-lance = "0.31.0"
+lance = "0.34.0"
Review Comment:
If this lance version introduce some different dependencies, we should check
it.
##########
core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java:
##########
@@ -133,7 +154,49 @@ public LancePartitionStatisticStorage(Map<String, String>
properties) {
properties.getOrDefault(READ_BATCH_SIZE,
String.valueOf(DEFAULT_READ_BATCH_SIZE)));
Preconditions.checkArgument(
readBatchSize > 0, "Lance partition statistics storage readBatchSize
must be positive");
+ int datasetCacheSize =
+ Integer.parseInt(
+ properties.getOrDefault(
+ DATASET_CACHE_SIZE,
String.valueOf(DEFAULT_DATASET_CACHE_SIZE)));
+ Preconditions.checkArgument(
+ datasetCacheSize >= 0,
+ "Lance partition statistics storage datasetCacheSize must be greater
than or equal to 0");
+ this.metadataFileCacheSize =
+ Long.parseLong(
+ properties.getOrDefault(
+ METADATA_FILE_CACHE_SIZE,
String.valueOf(DEFAULT_METADATA_FILE_CACHE_SIZE)));
+ Preconditions.checkArgument(
+ metadataFileCacheSize > 0,
+ "Lance partition statistics storage metadataFileCacheSizeBytes must be
positive");
+ this.indexCacheSize =
+ Long.parseLong(
+ properties.getOrDefault(INDEX_CACHE_SIZE,
String.valueOf(DEFAULT_INDEX_CACHE_SIZE)));
+ Preconditions.checkArgument(
+ indexCacheSize > 0,
+ "Lance partition statistics storage indexCacheSizeBytes must be
positive");
+
this.properties = properties;
+ if (datasetCacheSize != 0) {
+ this.cache =
+ Caffeine.newBuilder()
+ .maximumSize(datasetCacheSize)
+ .scheduler(
+ Scheduler.forScheduledExecutorService(
+ new ScheduledThreadPoolExecutor(
+ 1,
+ newDaemonThreadFactory(
+
"lance-partition-statistic-storage-cache-cleaner"))))
+ .evictionListener(
+ (RemovalListener<Long, Dataset>)
+ (key, value, cause) -> {
+ if (value != null) {
+ value.close();
+ }
+ })
+ .build();
+ } else {
+ cache = null;
Review Comment:
Better to use `Optional` instead of `null` check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]