zerolbsony commented on code in PR #17279:
URL: https://github.com/apache/iotdb/pull/17279#discussion_r2957386011


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java:
##########
@@ -3117,4 +3136,350 @@ public TSStatus writeAuditLog(TAuditLogReq req) {
   public void handleClientExit() {
     // Do nothing
   }
+
+  // ====================================================
+  // Data Partition Table Integrity Check Implementation
+  // ====================================================
+
+  private volatile DataPartitionTableGenerator currentGenerator;
+  private volatile CompletableFuture<Void> currentGeneratorFuture;
+  private volatile long currentTaskId = 0;
+
+  @Override
+  public TGetEarliestTimeslotsResp getEarliestTimeslots() {
+    TGetEarliestTimeslotsResp resp = new TGetEarliestTimeslotsResp();
+
+    try {
+      Map<String, Long> earliestTimeslots = new HashMap<>();
+
+      // Get data directories from configuration
+      String[] dataDirs = 
IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+
+      for (String dataDir : dataDirs) {
+        File dir = new File(dataDir);
+        if (dir.exists() && dir.isDirectory()) {
+          processDataDirectoryForEarliestTimeslots(dir, earliestTimeslots);
+        }
+      }
+
+      resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+      resp.setDatabaseToEarliestTimeslot(earliestTimeslots);
+
+      LOGGER.info("Retrieved earliest timeslots for {} databases", 
earliestTimeslots.size());
+
+    } catch (Exception e) {
+      LOGGER.error("Failed to get earliest timeslots", e);
+      resp.setStatus(
+          onIoTDBException(
+              e,
+              OperationType.GET_EARLIEST_TIMESLOTS,
+              TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+    }
+
+    return resp;
+  }
+
+  @Override
+  public TGenerateDataPartitionTableResp generateDataPartitionTable(
+      TGenerateDataPartitionTableReq req) {
+    TGenerateDataPartitionTableResp resp = new 
TGenerateDataPartitionTableResp();
+
+    try {
+      // Check if there's already a task in the progress
+      if (currentGenerator != null
+          && currentGenerator.getStatus() == 
DataPartitionTableGenerator.TaskStatus.IN_PROGRESS) {
+        
resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode());
+        resp.setMessage("DataPartitionTable generation is already in the 
progress");
+        resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+        return resp;
+      }
+
+      // Create generator for all data directories
+      int seriesSlotNum = 
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum();
+      String seriesPartitionExecutorClass =
+          
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass();
+
+      final ExecutorService partitionTableRecoverExecutor =
+          new WrappedThreadPoolExecutor(
+              0,
+              
IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum(),
+              0L,
+              TimeUnit.SECONDS,
+              new ArrayBlockingQueue<>(
+                  
IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum()),
+              new 
IoTThreadFactory(ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName()),
+              ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName(),
+              new ThreadPoolExecutor.CallerRunsPolicy());
+
+      currentGenerator =
+          new DataPartitionTableGenerator(
+              partitionTableRecoverExecutor,
+              req.getDatabases(),
+              seriesSlotNum,
+              seriesPartitionExecutorClass);
+      currentTaskId = System.currentTimeMillis();
+
+      // Start generation synchronously for now to return the data partition 
table immediately
+      currentGeneratorFuture = currentGenerator.startGeneration();
+      parseGenerationStatus(resp);
+    } catch (Exception e) {
+      LOGGER.error("Failed to generate DataPartitionTable", e);
+      resp.setStatus(
+          onIoTDBException(
+              e,
+              OperationType.GENERATE_DATA_PARTITION_TABLE,
+              TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+    }
+
+    return resp;
+  }
+
+  @Override
+  public TGenerateDataPartitionTableHeartbeatResp 
generateDataPartitionTableHeartbeat() {
+    TGenerateDataPartitionTableHeartbeatResp resp = new 
TGenerateDataPartitionTableHeartbeatResp();
+    // Set default value
+    resp.setDatabaseScopedDataPartitionTables(Collections.emptyList());
+    try {
+      currentGeneratorFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
+      if (currentGenerator == null) {
+        resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
+        resp.setMessage("No DataPartitionTable generation task found");
+        resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+        return resp;
+      }
+
+      parseGenerationStatus(resp);
+      if 
(currentGenerator.getStatus().equals(DataPartitionTableGenerator.TaskStatus.COMPLETED))
 {
+        boolean success = false;
+        List<DatabaseScopedDataPartitionTable> 
databaseScopedDataPartitionTableList = new ArrayList<>();
+        Map<String, DataPartitionTable> dataPartitionTableMap = 
currentGenerator.getDatabasePartitionTableMap();
+        if (!dataPartitionTableMap.isEmpty()) {
+          for (Map.Entry<String, DataPartitionTable> entry : 
dataPartitionTableMap.entrySet()) {
+            String database = entry.getKey();
+            DataPartitionTable dataPartitionTable = entry.getValue();
+            if (!StringUtils.isEmpty(database) && dataPartitionTable != null) {
+              DatabaseScopedDataPartitionTable 
databaseScopedDataPartitionTable = new 
DatabaseScopedDataPartitionTable(database, dataPartitionTable);
+              
databaseScopedDataPartitionTableList.add(databaseScopedDataPartitionTable);
+              success = true;
+            }
+          }
+        }
+
+        if (success) {
+          List<ByteBuffer> result = 
serializeDatabaseScopedTableList(databaseScopedDataPartitionTableList);
+          resp.setDatabaseScopedDataPartitionTables(result);
+
+          // Clear current generator
+          currentGenerator = null;
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to check DataPartitionTable generation status", e);
+      resp.setStatus(
+          onIoTDBException(
+              e,
+              OperationType.CHECK_DATA_PARTITION_TABLE_STATUS,
+              TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+    }
+    return resp;
+  }
+
+  private <T> void parseGenerationStatus(T resp) {
+    if (resp instanceof TGenerateDataPartitionTableResp) {
+      handleResponse((TGenerateDataPartitionTableResp) resp);
+    } else {
+      handleResponse((TGenerateDataPartitionTableHeartbeatResp) resp);
+    }
+  }
+
+  private void handleResponse(TGenerateDataPartitionTableResp resp) {
+    updateResponse(resp);
+  }
+
+  private void handleResponse(TGenerateDataPartitionTableHeartbeatResp resp) {
+    updateResponse(resp);
+  }
+
+  private <T> void updateResponse(T resp) {
+    if (currentGenerator == null) return;

Review Comment:
   Changed yet



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to