wirybeaver commented on code in PR #10815:
URL: https://github.com/apache/pinot/pull/10815#discussion_r1299527276


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1465,6 +1469,65 @@ public void uploadToDeepStoreIfMissing(TableConfig 
tableConfig, List<SegmentZKMe
     }
   }
 
+  public long deleteTmpSegments(String tableNameWithType) {
+    Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+    if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+      return 0L;
+    }
+
+    TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      LOGGER.warn("Failed to find table config for table: {}, skipping 
deletion of tmp segments", tableNameWithType);
+      return 0L;
+    }
+
+    if (!isLowLevelConsumer(tableNameWithType, tableConfig)
+        || !getIsSplitCommitEnabled()
+        || !isTmpSegmentAsyncDeletionEnabled()) {
+      return 0L;
+    }
+
+    // Delete tmp segments for realtime table with low level consumer, split 
commit and async deletion is enabled.
+    List<SegmentZKMetadata> segmentsZKMetadata = 
_helixResourceManager.getSegmentsZKMetadata(tableNameWithType);
+    Set<String> deepURIs = segmentsZKMetadata.stream().parallel().filter(meta 
-> meta.getStatus() == Status.DONE
+        && 
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(meta.getDownloadUrl())).map(
+        SegmentZKMetadata::getDownloadUrl).collect(
+        Collectors.toSet());
+
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), 
rawTableName);
+    PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme());
+    long orphanTmpSegments = 0;
+    try {
+      for (String filePath : pinotFS.listFiles(tableDirURI, false)) {
+        // prepend scheme
+        URI uri = URIUtils.getUri(filePath);
+        if (isTmpAndCanDelete(uri, deepURIs, pinotFS)) {
+          LOGGER.info("Deleting temporary segment file: {}", uri);
+          Preconditions.checkState(pinotFS.delete(uri, true), "Failed to 
delete file: %s", uri);
+          orphanTmpSegments++;
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while deleting temporary files for table: 
{}", rawTableName, e);
+    }
+    return orphanTmpSegments;
+  }
+
+  private boolean isTmpAndCanDelete(URI uri, Set<String> deepURIs, PinotFS 
pinotFS) throws Exception {
+    long lastModified = pinotFS.lastModified(uri);
+    String uriString = uri.toString();
+    return SegmentCompletionUtils.isTmpFile(uriString) && 
!deepURIs.contains(uriString)

Review Comment:
   Actually, the Uber's existing realtime tmp file name follows the patten 
<dataDir>/<tableName>/<segmentName><UUID> instead of 
<dataDir>/<tableName>/<segmentName>.tmp.<UUID>. It's safe to skip the down url 
checking and zk metadata read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to