ankitsultana commented on code in PR #10815:
URL: https://github.com/apache/pinot/pull/10815#discussion_r1303630415
##########
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java:
##########
@@ -215,9 +215,14 @@ public static class ControllerPeriodicTasksConf {
"controller.realtime.segment.deepStoreUploadRetryEnabled";
public static final String DEEP_STORE_RETRY_UPLOAD_TIMEOUT_MS =
"controller.realtime.segment.deepStoreUploadRetry.timeoutMs";
+ public static final String ENABLE_TMP_SEGMENT_ASYNC_DELETION =
+ "controller.realtime.split.commit.segment.tmp.cleanup.async.enable";
+ public static final String SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND =
+ "controller.realtime.split.commit.segment.tmp.lifetime.second";
public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
+ public static final int DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND =
600;
Review Comment:
I think we can keep this a bit higher (1 hour). In cases where controller is
overwhelmed, deepstore latencies are high, etc. it is likely that the tmp
segment may stick around for a good few minutes.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1465,6 +1469,64 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
}
}
+ public long deleteTmpSegments(String tableNameWithType,
List<SegmentZKMetadata> segmentsZKMetadata) {
+ Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+ if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+ return 0L;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping
deletion of tmp segments", tableNameWithType);
+ return 0L;
+ }
+
+ if (!isLowLevelConsumer(tableNameWithType, tableConfig)
+ || !getIsSplitCommitEnabled()
+ || !isTmpSegmentAsyncDeletionEnabled()) {
+ return 0L;
+ }
+
+ // Delete tmp segments for realtime table with low level consumer, split
commit and async deletion is enabled.
+ Set<String> deepURIs = segmentsZKMetadata.stream().parallel().filter(meta
-> meta.getStatus() == Status.DONE
Review Comment:
nit: avoid parallel
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCompletionUtils.java:
##########
@@ -42,6 +43,19 @@ public static String getSegmentNamePrefix(String
segmentName) {
}
public static String generateSegmentFileName(String segmentNameStr) {
- return getSegmentNamePrefix(segmentNameStr) + UUID.randomUUID().toString();
+ return getSegmentNamePrefix(segmentNameStr) + UUID.randomUUID();
+ }
+
+ public static boolean isTmpFile(String uri) {
Review Comment:
Can you add a UT or two for this?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1465,6 +1469,64 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
}
}
+ public long deleteTmpSegments(String tableNameWithType,
List<SegmentZKMetadata> segmentsZKMetadata) {
+ Preconditions.checkState(!_isStopping, "Segment manager is stopping");
+
+ if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+ return 0L;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping
deletion of tmp segments", tableNameWithType);
+ return 0L;
+ }
+
+ if (!isLowLevelConsumer(tableNameWithType, tableConfig)
+ || !getIsSplitCommitEnabled()
+ || !isTmpSegmentAsyncDeletionEnabled()) {
+ return 0L;
+ }
+
+ // Delete tmp segments for realtime table with low level consumer, split
commit and async deletion is enabled.
+ Set<String> deepURIs = segmentsZKMetadata.stream().parallel().filter(meta
-> meta.getStatus() == Status.DONE
+ &&
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(meta.getDownloadUrl())).map(
+ SegmentZKMetadata::getDownloadUrl).collect(
+ Collectors.toSet());
+
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(),
rawTableName);
+ PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme());
+ long orphanTmpSegments = 0;
+ try {
+ for (String filePath : pinotFS.listFiles(tableDirURI, false)) {
+ // prepend scheme
+ URI uri = URIUtils.getUri(filePath);
+ if (isTmpAndCanDelete(uri, deepURIs, pinotFS)) {
+ LOGGER.info("Deleting temporary segment file: {}", uri);
+ Preconditions.checkState(pinotFS.delete(uri, true), "Failed to
delete file: %s", uri);
Review Comment:
If delete returns false, we can simply log it. Throwing would mean that we
would skip deleting a bunch of files which should have been deleted in this run.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]