sumitagrawl commented on code in PR #9040:
URL: https://github.com/apache/ozone/pull/9040#discussion_r2409634073


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/GlobalStatsValueCodec.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.spi.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.ozone.recon.tasks.GlobalStatsValue;
+
+/**
+ * Codec to serialize/deserialize {@link GlobalStatsValue}.
+ */
+public final class GlobalStatsValueCodec implements Codec<GlobalStatsValue> {
+
+  private static final Codec<GlobalStatsValue> INSTANCE = new 
GlobalStatsValueCodec();
+
+  public static Codec<GlobalStatsValue> get() {
+    return INSTANCE;
+  }
+
+  private GlobalStatsValueCodec() {
+    // singleton
+  }
+
+  @Override
+  public Class<GlobalStatsValue> getTypeClass() {
+    return GlobalStatsValue.class;
+  }
+
+  @Override
+  public byte[] toPersistedFormat(GlobalStatsValue value) {
+    Preconditions.checkNotNull(value, "Null object can't be converted to byte 
array.");
+
+    // Simple 8-byte format: just the value
+    Long val = value.getValue() != null ? value.getValue() : 0L;

Review Comment:
   We can use String format as for extension here.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/FileCountBySizeKeyCodec.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.spi.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.ozone.recon.tasks.FileSizeCountKey;
+
+/**
+ * Codec to serialize/deserialize {@link FileSizeCountKey}.
+ */
+public final class FileCountBySizeKeyCodec implements Codec<FileSizeCountKey> {
+

Review Comment:
   FileCountBySizeKeyCodec can be merged to FileClountBySizeKey class as static 
method
   Same for GlobalStatsValueCodec



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java:
##########
@@ -266,26 +271,18 @@ private void handleUpdateEvent(OMDBUpdateEvent<String, 
Object> event,
    * @param dataMap Map containing the updated count and size information.
    */
   private void writeDataToDB(Map<String, Long> dataMap) {
-    List<GlobalStats> insertGlobalStats = new ArrayList<>();
-    List<GlobalStats> updateGlobalStats = new ArrayList<>();
-
-    for (Entry<String, Long> entry : dataMap.entrySet()) {
-      Timestamp now =
-          using(sqlConfiguration).fetchValue(select(currentTimestamp()));
-      GlobalStats record = globalStatsDao.fetchOneByKey(entry.getKey());
-      GlobalStats newRecord
-          = new GlobalStats(entry.getKey(), entry.getValue(), now);
-
-      // Insert a new record for key if it does not exist
-      if (record == null) {
-        insertGlobalStats.add(newRecord);
-      } else {
-        updateGlobalStats.add(newRecord);
+    try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) {
+      long currentTime = System.currentTimeMillis();

Review Comment:
   remove currentTime, not used



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountKey.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+/**
+ * Key class used for grouping file size counts in RocksDB storage.
+ * Represents a composite key of (volume, bucket, fileSizeUpperBound) for
+ * FILE_COUNT_BY_SIZE column family.
+ */
+public class FileSizeCountKey {
+  private final String volume;
+  private final String bucket;
+  private final Long fileSizeUpperBound;
+
+  public FileSizeCountKey(String volume, String bucket, Long 
fileSizeUpperBound) {
+    this.volume = volume;
+    this.bucket = bucket;
+    this.fileSizeUpperBound = fileSizeUpperBound;
+  }
+
+  public String getVolume() {
+    return volume;
+  }
+
+  public String getBucket() {
+    return bucket;
+  }
+
+  public Long getFileSizeUpperBound() {
+    return fileSizeUpperBound;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof FileSizeCountKey) {
+      FileSizeCountKey other = (FileSizeCountKey) obj;
+      return volume.equals(other.volume) &&
+          bucket.equals(other.bucket) &&
+          fileSizeUpperBound.equals(other.fileSizeUpperBound);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {

Review Comment:
   can remove hashCode and equals



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java:
##########
@@ -45,134 +39,154 @@
  */
 public abstract class FileSizeCountTaskHelper {
   protected static final Logger LOG = 
LoggerFactory.getLogger(FileSizeCountTaskHelper.class);
-
-  // Static lock to guard table truncation.
+  
+  // Static lock object for table truncation synchronization
   private static final Object TRUNCATE_LOCK = new Object();
 
   /**
-   * Truncates the FILE_COUNT_BY_SIZE table if it has not been truncated yet.
-   * This method synchronizes on a static lock to ensure only one task 
truncates at a time.
-   * If an error occurs, the flag is reset to allow retrying the truncation.
-   *
-   * @param dslContext DSLContext for executing DB commands.
+   * Increments the count for a given key on a PUT event.
+   */
+  public static void handlePutKeyEvent(OmKeyInfo omKeyInfo,
+                                       Map<FileSizeCountKey, Long> 
fileSizeCountMap) {
+    FileSizeCountKey key = getFileSizeCountKey(omKeyInfo);
+    Long count = fileSizeCountMap.containsKey(key) ? fileSizeCountMap.get(key) 
+ 1L : 1L;
+    fileSizeCountMap.put(key, count);
+  }
+
+  /**
+   * Decrements the count for a given key on a DELETE event.
    */
-  public static void truncateTableIfNeeded(DSLContext dslContext) {
+  public static void handleDeleteKeyEvent(String key, OmKeyInfo omKeyInfo,
+                                          Map<FileSizeCountKey, Long> 
fileSizeCountMap) {
+    if (omKeyInfo == null) {
+      LOG.warn("Deleting a key not found while handling DELETE key event. Key 
not found in Recon OM DB: {}", key);
+    } else {
+      FileSizeCountKey countKey = getFileSizeCountKey(omKeyInfo);
+      Long count = fileSizeCountMap.containsKey(countKey) ? 
fileSizeCountMap.get(countKey) - 1L : -1L;
+      fileSizeCountMap.put(countKey, count);
+    }
+  }
+
+  /**
+   * Returns a FileSizeCountKey for the given OmKeyInfo.
+   */
+  public static FileSizeCountKey getFileSizeCountKey(OmKeyInfo omKeyInfo) {
+    return new FileSizeCountKey(omKeyInfo.getVolumeName(),
+        omKeyInfo.getBucketName(),
+        ReconUtils.getFileSizeUpperBound(omKeyInfo.getDataSize()));
+  }
+
+  /**
+   * Truncates the file count table if needed during reprocess.
+   * Uses a flag to ensure the table is truncated only once across all tasks.
+   */
+  public static void truncateFileCountTableIfNeeded(ReconFileMetadataManager 
reconFileMetadataManager,
+                                                    String taskName) {
     synchronized (TRUNCATE_LOCK) {
       if (ReconConstants.FILE_SIZE_COUNT_TABLE_TRUNCATED.compareAndSet(false, 
true)) {
         try {
-          int execute = dslContext.delete(FILE_COUNT_BY_SIZE).execute();
-          LOG.info("Deleted {} records from {}", execute, FILE_COUNT_BY_SIZE);
+          reconFileMetadataManager.clearFileCountTable();
+          LOG.info("Successfully truncated file count table for reprocess by 
task: {}", taskName);
         } catch (Exception e) {
-          // Reset the flag so that truncation can be retried
+          LOG.error("Failed to truncate file count table for task: {}", 
taskName, e);
+          // Reset flag on failure so another task can try
           ReconConstants.FILE_SIZE_COUNT_TABLE_TRUNCATED.set(false);
-          LOG.error("Error while truncating FILE_COUNT_BY_SIZE table, 
resetting flag.", e);
-          throw new RuntimeException("Table truncation failed", e);  // 
Propagate upwards
+          throw new RuntimeException("Failed to truncate file count table", e);
         }
       } else {
-        LOG.info("Table already truncated by another task; waiting for 
truncation to complete.");
+        LOG.debug("File count table already truncated by another task, 
skipping for task: {}", taskName);
       }
     }
   }
 
   /**
-   * Executes the reprocess method for the given task.
-   *
-   * @param omMetadataManager  OM metadata manager.
-   * @param dslContext         DSLContext for DB operations.
-   * @param fileCountBySizeDao DAO for file count table.
-   * @param bucketLayout       The bucket layout to process.
-   * @param taskName           The name of the task for logging.
-   * @return A Pair of task name and boolean indicating success.
+   * Executes the reprocess method using RocksDB for the given task.
    */
   public static ReconOmTask.TaskResult reprocess(OMMetadataManager 
omMetadataManager,
-                                                 DSLContext dslContext,
-                                                 FileCountBySizeDao 
fileCountBySizeDao,
+                                                 ReconFileMetadataManager 
reconFileMetadataManager,
                                                  BucketLayout bucketLayout,
                                                  String taskName) {
-    LOG.info("Starting Reprocess for {}", taskName);
+    LOG.info("Starting RocksDB Reprocess for {}", taskName);
     Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
     long startTime = Time.monotonicNow();
-    truncateTableIfNeeded(dslContext);
+    
+    // Ensure the file count table is truncated only once during reprocess
+    truncateFileCountTableIfNeeded(reconFileMetadataManager, taskName);
+    
     boolean status = reprocessBucketLayout(
-        bucketLayout, omMetadataManager, fileSizeCountMap, dslContext, 
fileCountBySizeDao, taskName);
+        bucketLayout, omMetadataManager, fileSizeCountMap, 
reconFileMetadataManager, taskName);
     if (!status) {
       return buildTaskResult(taskName, false);
     }
-    writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao);
+    
+    writeCountsToDB(fileSizeCountMap, reconFileMetadataManager);
+    
     long endTime = Time.monotonicNow();
-    LOG.info("{} completed Reprocess in {} ms.", taskName, (endTime - 
startTime));
+    LOG.info("{} completed RocksDB Reprocess in {} ms.", taskName, (endTime - 
startTime));
+    
     return buildTaskResult(taskName, true);
   }
 
   /**
-   * Iterates over the OM DB keys for the given bucket layout and updates the 
fileSizeCountMap.
-   *
-   * @param bucketLayout       The bucket layout to use.
-   * @param omMetadataManager  OM metadata manager.
-   * @param fileSizeCountMap   Map accumulating file size counts.
-   * @param dslContext         DSLContext for DB operations.
-   * @param fileCountBySizeDao DAO for file count table.
-   * @param taskName           The name of the task for logging.
-   * @return true if processing succeeds, false otherwise.
+   * Iterates over the OM DB keys for the given bucket layout and updates the 
fileSizeCountMap (RocksDB version).
    */
   public static boolean reprocessBucketLayout(BucketLayout bucketLayout,
                                               OMMetadataManager 
omMetadataManager,
                                               Map<FileSizeCountKey, Long> 
fileSizeCountMap,
-                                              DSLContext dslContext,
-                                              FileCountBySizeDao 
fileCountBySizeDao,
+                                              ReconFileMetadataManager 
reconFileMetadataManager,
                                               String taskName) {
     Table<String, OmKeyInfo> omKeyInfoTable = 
omMetadataManager.getKeyTable(bucketLayout);
     int totalKeysProcessed = 0;
+    
     try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> 
keyIter =
              omKeyInfoTable.iterator()) {
       while (keyIter.hasNext()) {
         Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
         handlePutKeyEvent(kv.getValue(), fileSizeCountMap);
         totalKeysProcessed++;
 
-        // Flush to DB periodically.
+        // Flush to RocksDB periodically.
         if (fileSizeCountMap.size() >= 100000) {
-          writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao);
+          // For reprocess, we don't need to check existing values since table 
was truncated
+          LOG.info("Flushing {} accumulated counts to RocksDB for {}", 
fileSizeCountMap.size(), taskName);

Review Comment:
   can have debug log



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to