yuqi1129 commented on code in PR #8450:
URL: https://github.com/apache/gravitino/pull/8450#discussion_r2336580603


##########
core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java:
##########
@@ -133,7 +153,45 @@ public LancePartitionStatisticStorage(Map<String, String> 
properties) {
             properties.getOrDefault(READ_BATCH_SIZE, 
String.valueOf(DEFAULT_READ_BATCH_SIZE)));
     Preconditions.checkArgument(
         readBatchSize > 0, "Lance partition statistics storage readBatchSize 
must be positive");
+    int datasetCacheSize =
+        Integer.parseInt(
+            properties.getOrDefault(
+                DATASET_CACHE_SIZE, 
String.valueOf(DEFAULT_DATASET_CACHE_SIZE)));
+    Preconditions.checkArgument(
+        datasetCacheSize > 0,
+        "Lance partition statistics storage datasetCacheSize must be 
positive");
+    this.metadataFileCacheSize =
+        Long.parseLong(
+            properties.getOrDefault(
+                METADATA_FILE_CACHE_SIZE, 
String.valueOf(DEFAULT_METADATA_FILE_CACHE_SIZE)));
+    Preconditions.checkArgument(
+        metadataFileCacheSize > 0,
+        "Lance partition statistics storage metadataFileCacheSizeBytes must be 
positive");
+    this.indexCacheSize =
+        Long.parseLong(
+            properties.getOrDefault(INDEX_CACHE_SIZE, 
String.valueOf(DEFAULT_INDEX_CACHE_SIZE)));
+    Preconditions.checkArgument(
+        indexCacheSize > 0,
+        "Lance partition statistics storage indexCacheSizeBytes must be 
positive");
+
     this.properties = properties;
+
+    this.cache =
+        Caffeine.newBuilder()
+            .maximumSize(datasetCacheSize)

Review Comment:
   @jerqi cc



##########
core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java:
##########
@@ -197,123 +255,131 @@ public void updateStatistics(
     }
   }
 
-  private void appendStatisticsImpl(Long tableId, 
List<PartitionStatisticsUpdate> updates) {
-    String fileName = getFilePath(tableId);
-    try (Dataset datasetRead = open(fileName)) {
-      List<FragmentMetadata> fragmentMetas;
-      int count = 0;
-      try (VectorSchemaRoot root = VectorSchemaRoot.create(SCHEMA, allocator)) 
{
-        for (PartitionStatisticsUpdate update : updates) {
-          count += update.statistics().size();
-        }
+  private void appendStatisticsImpl(Long tableId, 
List<PartitionStatisticsUpdate> updates)
+      throws JsonProcessingException {
+    Dataset datasetRead = getDataset(tableId);
+    List<FragmentMetadata> fragmentMetas = createFragmentMetadata(tableId, 
updates);
+
+    Transaction appendTxn =
+        datasetRead
+            .newTransactionBuilder()
+            .operation(Append.builder().fragments(fragmentMetas).build())
+            .transactionProperties(Collections.emptyMap())
+            .build();
+    Dataset newDataset = appendTxn.commit();
+    cache.put(tableId, newDataset);
+  }
 
-        for (FieldVector vector : root.getFieldVectors()) {
-          vector.setInitialCapacity(count);
-        }
-        root.allocateNew();
-        int index = 0;
-
-        for (PartitionStatisticsUpdate updatePartitionStatistic : updates) {
-          String partitionName = updatePartitionStatistic.partitionName();
-          for (Map.Entry<String, StatisticValue<?>> statistic :
-              updatePartitionStatistic.statistics().entrySet()) {
-            String statisticName = statistic.getKey();
-            String statisticValue =
-                
JsonUtils.anyFieldMapper().writeValueAsString(statistic.getValue());
-
-            UInt8Vector tableIdVector = (UInt8Vector) 
root.getVector(TABLE_ID_COLUMN);
-            VarCharVector partitionNameVector =
-                (VarCharVector) root.getVector(PARTITION_NAME_COLUMN);
-            VarCharVector statisticNameVector =
-                (VarCharVector) root.getVector(STATISTIC_NAME_COLUMN);
-            LargeVarCharVector statisticValueVector =
-                (LargeVarCharVector) root.getVector(STATISTIC_VALUE_COLUMN);
-            VarCharVector auditInfoVector = (VarCharVector) 
root.getVector(AUDIT_INFO_COLUMN);
-
-            tableIdVector.set(index, tableId);
-            partitionNameVector.setSafe(index, 
partitionName.getBytes(StandardCharsets.UTF_8));
-            statisticNameVector.setSafe(index, 
statisticName.getBytes(StandardCharsets.UTF_8));
-            statisticValueVector.setSafe(index, 
statisticValue.getBytes(StandardCharsets.UTF_8));
-            AuditInfo auditInfo =
-                AuditInfo.builder()
-                    .withCreator(PrincipalUtils.getCurrentUserName())
-                    .withCreateTime(Instant.now())
-                    .withLastModifier(PrincipalUtils.getCurrentUserName())
-                    .withLastModifiedTime(Instant.now())
-                    .build();
-            auditInfoVector.setSafe(
-                index,
-                JsonUtils.anyFieldMapper()
-                    .writeValueAsString(auditInfo)
-                    .getBytes(StandardCharsets.UTF_8));
-
-            index++;
-          }
-        }
+  private void dropStatisticsImpl(Long tableId, List<PartitionStatisticsDrop> 
drops) {
+    Dataset dataset = getDataset(tableId);
+    List<String> partitionSQLs = Lists.newArrayList();
+    for (PartitionStatisticsDrop drop : drops) {
+      List<String> statistics = drop.statisticNames();
+      String partition = drop.partitionName();
+      partitionSQLs.add(
+          "table_id = "
+              + tableId
+              + " AND partition_name = '"
+              + partition
+              + "' AND statistic_name IN ("
+              + statistics.stream().map(str -> "'" + str + 
"'").collect(Collectors.joining(", "))
+              + ")");
+    }
 
-        root.setRowCount(index);
-
-        fragmentMetas =
-            Fragment.create(
-                getFilePath(tableId),
-                allocator,
-                root,
-                new WriteParams.Builder()
-                    .withMaxRowsPerFile(maxRowsPerFile)
-                    .withMaxBytesPerFile(maxBytesPerFile)
-                    .withMaxRowsPerGroup(maxRowsPerGroup)
-                    .withStorageOptions(properties)
-                    .build());
-        FragmentOperation.Append appendOp = new 
FragmentOperation.Append(fragmentMetas);
-        Dataset.commit(
-                allocator,
-                getFilePath(tableId),
-                appendOp,
-                Optional.of(datasetRead.version()),
-                properties)
-            .close();
-      }
+    if (partitionSQLs.size() == 1) {
+      dataset.delete(partitionSQLs.get(0));
+    } else if (partitionSQLs.size() > 1) {
+      String filterSQL =
+          partitionSQLs.stream().map(str -> "(" + str + 
")").collect(Collectors.joining(" OR "));
+      dataset.delete(filterSQL);
+    }
+  }
 
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException("Failed to serialize statistic value", e);
+  @Override
+  public void close() throws IOException {
+    if (allocator != null) {
+      allocator.close();
     }
+
+    if (cache != null) {
+      cache.invalidateAll();
+    }
+  }
+
+  @VisibleForTesting
+  Cache<Long, Dataset> getCache() {
+    return cache;
   }
 
   private String getFilePath(Long tableId) {
     return location + "/" + tableId + ".lance";
   }
 
-  private void dropStatisticsImpl(Long tableId, List<PartitionStatisticsDrop> 
drops) {
-    String fileName = getFilePath(tableId);
-    try (Dataset dataset = open(fileName)) {
-      List<String> partitionSQLs = Lists.newArrayList();
-      for (PartitionStatisticsDrop drop : drops) {
-        List<String> statistics = drop.statisticNames();
-        String partition = drop.partitionName();
-        partitionSQLs.add(
-            "table_id = "
-                + tableId
-                + " AND partition_name = '"
-                + partition
-                + "' AND statistic_name IN ("
-                + statistics.stream().map(str -> "'" + str + 
"'").collect(Collectors.joining(", "))
-                + ")");
+  private List<FragmentMetadata> createFragmentMetadata(
+      Long tableId, List<PartitionStatisticsUpdate> updates) throws 
JsonProcessingException {
+    List<FragmentMetadata> fragmentMetas;
+    int count = 0;
+    try (VectorSchemaRoot root = VectorSchemaRoot.create(SCHEMA, allocator)) {
+      for (PartitionStatisticsUpdate update : updates) {
+        count += update.statistics().size();
       }
 
-      if (partitionSQLs.size() == 1) {
-        dataset.delete(partitionSQLs.get(0));
-      } else if (partitionSQLs.size() > 1) {
-        String filterSQL =
-            partitionSQLs.stream().map(str -> "(" + str + 
")").collect(Collectors.joining(" OR "));
-        dataset.delete(filterSQL);
+      for (FieldVector vector : root.getFieldVectors()) {
+        vector.setInitialCapacity(count);
+      }
+      root.allocateNew();
+      int index = 0;
+
+      for (PartitionStatisticsUpdate updatePartitionStatistic : updates) {
+        String partitionName = updatePartitionStatistic.partitionName();
+        for (Map.Entry<String, StatisticValue<?>> statistic :
+            updatePartitionStatistic.statistics().entrySet()) {
+          String statisticName = statistic.getKey();
+          String statisticValue =
+              
JsonUtils.anyFieldMapper().writeValueAsString(statistic.getValue());
+
+          UInt8Vector tableIdVector = (UInt8Vector) 
root.getVector(TABLE_ID_COLUMN);
+          VarCharVector partitionNameVector = (VarCharVector) 
root.getVector(PARTITION_NAME_COLUMN);
+          VarCharVector statisticNameVector = (VarCharVector) 
root.getVector(STATISTIC_NAME_COLUMN);
+          LargeVarCharVector statisticValueVector =
+              (LargeVarCharVector) root.getVector(STATISTIC_VALUE_COLUMN);
+          VarCharVector auditInfoVector = (VarCharVector) 
root.getVector(AUDIT_INFO_COLUMN);

Review Comment:
   The lines can be moved out of the for loop according to the code below. Is 
it expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to