Copilot commented on code in PR #18549:
URL: https://github.com/apache/pinot/pull/18549#discussion_r3278807687
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java:
##########
@@ -341,41 +359,146 @@ private void moveSegmentsToDeletedDir(String segmentId,
Long deletedSegmentsRete
/**
- * Retrieves the URI for segment deletion by checking two possible segment
file variants in deep store.
- * Looks for the segment file in two formats:
+ * Retrieves the URIs for segment deletion by checking the possible segment
file variants in deep store.
+ * Looks for segment files in these formats:
* - Without extension (conventional naming)
* - With .tar.gz extension (used by minions in
BaseMultipleSegmentsConversionExecutor)
+ * - Upload-attempt files generated by parallel push protection
*
* @param rawTableName name of the table containing the segment
* @param segmentId name of the segment
- * @return URI of the existing segment file if found in either format, null
if segment doesn't exist in either format
- * or if there are filesystem access errors
+ * @return URIs of existing segment files, empty if the segment doesn't
exist or if there are filesystem access errors
*/
- @Nullable
- private URI getFileToDeleteURI(String rawTableName, String segmentId) {
+ private List<URI> getFilesToDeleteURIs(String rawTableName, String
segmentId, PinotFS pinotFS,
+ @Nullable List<URI> uploadAttemptFileURIs) {
+ List<URI> filesToDelete = new ArrayList<>();
try {
URI plainFileUri = URIUtils.getUri(_dataDir, rawTableName,
URIUtils.encode(segmentId));
- PinotFS pinotFS = PinotFSFactory.create(plainFileUri.getScheme());
// Check for plain segment file first
if (pinotFS.exists(plainFileUri)) {
- return plainFileUri;
+ filesToDelete.add(plainFileUri);
}
URI tarGzFileUri = URIUtils.getUri(_dataDir, rawTableName,
URIUtils.encode(segmentId +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION));
// Check for .tar.gz segment file
if (pinotFS.exists(tarGzFileUri)) {
- return tarGzFileUri;
+ filesToDelete.add(tarGzFileUri);
+ }
+ if (uploadAttemptFileURIs != null) {
+ filesToDelete.addAll(uploadAttemptFileURIs);
+ }
+ if (filesToDelete.isEmpty()) {
+ LOGGER.error("No file found for segment: {} in deep store", segmentId);
}
- LOGGER.error("No file found for segment: {} in deep store", segmentId);
- return null;
} catch (Exception e) {
LOGGER.error("Caught exception while trying to find file for segment: {}
in deep store", segmentId);
Review Comment:
In getFilesToDeleteURIs(), the catch block logs an error but drops the
caught exception, which makes deep-store failures hard to debug in production.
Include the exception in the log statement (and consider narrowing the caught
exception type if possible).
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java:
##########
@@ -341,41 +359,146 @@ private void moveSegmentsToDeletedDir(String segmentId,
Long deletedSegmentsRete
/**
- * Retrieves the URI for segment deletion by checking two possible segment
file variants in deep store.
- * Looks for the segment file in two formats:
+ * Retrieves the URIs for segment deletion by checking the possible segment
file variants in deep store.
+ * Looks for segment files in these formats:
* - Without extension (conventional naming)
* - With .tar.gz extension (used by minions in
BaseMultipleSegmentsConversionExecutor)
+ * - Upload-attempt files generated by parallel push protection
*
* @param rawTableName name of the table containing the segment
* @param segmentId name of the segment
- * @return URI of the existing segment file if found in either format, null
if segment doesn't exist in either format
- * or if there are filesystem access errors
+ * @return URIs of existing segment files, empty if the segment doesn't
exist or if there are filesystem access errors
*/
- @Nullable
- private URI getFileToDeleteURI(String rawTableName, String segmentId) {
+ private List<URI> getFilesToDeleteURIs(String rawTableName, String
segmentId, PinotFS pinotFS,
+ @Nullable List<URI> uploadAttemptFileURIs) {
+ List<URI> filesToDelete = new ArrayList<>();
try {
URI plainFileUri = URIUtils.getUri(_dataDir, rawTableName,
URIUtils.encode(segmentId));
- PinotFS pinotFS = PinotFSFactory.create(plainFileUri.getScheme());
// Check for plain segment file first
if (pinotFS.exists(plainFileUri)) {
- return plainFileUri;
+ filesToDelete.add(plainFileUri);
}
URI tarGzFileUri = URIUtils.getUri(_dataDir, rawTableName,
URIUtils.encode(segmentId +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION));
// Check for .tar.gz segment file
if (pinotFS.exists(tarGzFileUri)) {
- return tarGzFileUri;
+ filesToDelete.add(tarGzFileUri);
+ }
+ if (uploadAttemptFileURIs != null) {
+ filesToDelete.addAll(uploadAttemptFileURIs);
+ }
+ if (filesToDelete.isEmpty()) {
+ LOGGER.error("No file found for segment: {} in deep store", segmentId);
}
- LOGGER.error("No file found for segment: {} in deep store", segmentId);
- return null;
} catch (Exception e) {
LOGGER.error("Caught exception while trying to find file for segment: {}
in deep store", segmentId);
+ }
+ return filesToDelete;
+ }
+
+ @Nullable
+ private Set<String> getKnownSegmentNames(String tableNameWithType,
Set<String> segmentsToDelete) {
+ Set<String> knownSegmentNames = new HashSet<>(segmentsToDelete);
+ try {
+ List<String> activeSegmentNames =
ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
+ if (activeSegmentNames == null) {
+ LOGGER.warn("Failed to read active segment names for table: {},
skipping upload-attempt cleanup",
+ tableNameWithType);
+ return null;
+ }
+ knownSegmentNames.addAll(activeSegmentNames);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to read active segment names for table: {}, skipping
upload-attempt cleanup",
+ tableNameWithType, e);
return null;
}
+ return knownSegmentNames;
}
+
+ private Map<String, List<URI>> getUploadAttemptFileURIs(String rawTableName,
Set<String> segmentsToDelete,
+ Set<String> knownSegmentNames, PinotFS pinotFS) {
+ Map<String, List<URI>> uploadAttemptFileURIsMap = new HashMap<>();
+ URI tableURI = URIUtils.getUri(_dataDir, rawTableName);
+ try {
+ if (!pinotFS.exists(tableURI) || !pinotFS.isDirectory(tableURI)) {
+ return uploadAttemptFileURIsMap;
+ }
+ String[] filePaths = pinotFS.listFiles(tableURI, false);
+ if (filePaths == null) {
+ return uploadAttemptFileURIsMap;
+ }
+ for (String filePath : filePaths) {
+ String fileName = URIUtils.getLastPart(filePath);
+ String segmentId = getUploadAttemptSegmentId(fileName,
segmentsToDelete);
+ if (segmentId == null || isKnownSegmentFile(fileName, segmentId,
knownSegmentNames, segmentsToDelete)) {
+ continue;
+ }
+ uploadAttemptFileURIsMap.computeIfAbsent(segmentId, key -> new
ArrayList<>())
+ .add(URIUtils.getUri(_dataDir, rawTableName, fileName));
+ }
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while trying to find upload-attempt files
for table: {} in deep store",
+ rawTableName, e);
+ }
+ return uploadAttemptFileURIsMap;
+ }
+
+ @Nullable
+ private String getUploadAttemptSegmentId(String fileName, Set<String>
segmentsToDelete) {
+ String uploadAttemptMarker =
CommonConstants.Segment.SEGMENT_UPLOAD_ATTEMPT_FILE_MARKER;
+ String decodedFileName = safeDecode(fileName);
+ String matchingSegmentName = null;
+ for (String segmentName : segmentsToDelete) {
+ if (isUploadAttemptFileForSegment(fileName, decodedFileName,
segmentName, uploadAttemptMarker)
+ && (matchingSegmentName == null || segmentName.length() >
matchingSegmentName.length())) {
+ matchingSegmentName = segmentName;
+ }
Review Comment:
getUploadAttemptSegmentId() linearly scans all segmentsToDelete for every
file returned by listFiles(), making upload-attempt cleanup O(numFiles *
numSegmentsToDelete). This can become a bottleneck for large tables. Consider
extracting the candidate segment prefix directly from the filename (split at
the upload-attempt marker, then decode) and doing O(1) membership checks, or
precomputing an efficient lookup/trie keyed by possible encoded/decoded segment
prefixes.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java:
##########
@@ -57,6 +84,206 @@ public static void validateTimeInterval(SegmentMetadata
segmentMetadata, TableCo
}
}
+ public static void validateUpsertSegmentPartitionMetadata(SegmentMetadata
segmentMetadata, TableConfig tableConfig) {
+ if (!isOfflineUpsertTable(tableConfig)) {
+ return;
+ }
+
+ String tableNameWithType = tableConfig.getTableName();
+ Map<String, ColumnPartitionConfig> columnPartitionMap =
getColumnPartitionMap(tableConfig);
+ String partitionColumn = getPartitionColumn(segmentMetadata, tableConfig,
tableNameWithType, columnPartitionMap);
+ validateNoExtraPartitionMetadataColumns(segmentMetadata,
tableNameWithType, partitionColumn);
+ validateSinglePartition(segmentMetadata, tableNameWithType,
partitionColumn, columnPartitionMap);
+ }
Review Comment:
validateUpsertSegmentPartitionMetadata() does not currently reject
offline-upsert tables whose segmentPartitionConfig contains multiple columns
when a partition column can be resolved via
TableConfigUtils.getPartitionColumn(). This means ambiguous table configs
(multiple partition columns) may still pass upload validation, which conflicts
with the stated behavior of rejecting uploads when segmentPartitionConfig has
more than one column. Consider explicitly enforcing
columnPartitionMap.size()==1 (and that it matches the effective partition
column) for offline upsert uploads.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]