klsince commented on code in PR #13597:
URL: https://github.com/apache/pinot/pull/13597#discussion_r1678288823


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java:
##########
@@ -555,6 +750,67 @@ public void uploadSegmentAsMultiPart(FormDataMultiPart 
multiPart,
     }
   }
 
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  @Path("/segmentList")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Cluster.UPLOAD_SEGMENT)
+  @Authenticate(AccessType.CREATE)
+  @ApiOperation(value = "Upload a segment", notes = "Upload a segment as 
binary")

Review Comment:
   update the notes "Upload a batch of segments"
   
   high level question: is this batch uploading API atomic? I see there is 
logic to clean up uploaded segments upon any failure in the middle, so it's 
atomic in terms of the uploading action, but I assume it's not atomic as to the 
ongoing queries, as I didn't see SegmentLineage or similar mechanism was used 
here. But anyway, might be helpful to comment the API about how things work 
upon failures.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java:
##########
@@ -403,6 +413,173 @@ private SuccessResponse uploadSegment(@Nullable String 
tableName, TableType tabl
     }
   }
 
+  // Method used to update a list of segments in batch mode with the METADATA 
upload type.
+  private SuccessResponse uploadSegments(String tableName, TableType tableType,
+      FormDataMultiPart multiParts, boolean enableParallelPushProtection,
+      boolean allowRefresh, HttpHeaders headers, Request request) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    String tableNameWithType = tableType == TableType.OFFLINE
+        ? TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)
+        : TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to fetch table 
config for table: " + tableNameWithType,
+          Response.Status.BAD_REQUEST);
+    }
+
+    String clientAddress;
+    try {
+      clientAddress = 
InetAddress.getByName(request.getRemoteAddr()).getHostName();
+    } catch (UnknownHostException ex) {
+      throw new ControllerApplicationException(LOGGER, "Failed to resolve 
hostname from input request",
+          Response.Status.BAD_REQUEST, ex);
+    }
+
+    String uploadTypeStr = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
+    FileUploadType uploadType = getUploadType(uploadTypeStr);
+    if (!FileUploadType.METADATA.equals(uploadType)) {
+      throw new ControllerApplicationException(LOGGER, "Unsupported upload 
type: " + uploadTypeStr,
+          Response.Status.BAD_REQUEST);
+    }
+
+    String crypterClassNameInHeader = extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.CRYPTER);
+    String ingestionDescriptor = extractHttpHeader(headers, 
CommonConstants.Controller.INGESTION_DESCRIPTOR);
+    ControllerFilePathProvider provider = 
ControllerFilePathProvider.getInstance();
+    List<SegmentUploadMetadata> segmentUploadMetadataList = new ArrayList<>();
+    List<File> tempEncryptedFiles = new ArrayList<>();
+    List<File> tempDecryptedFiles = new ArrayList<>();
+    List<File> tempSegmentDirs = new ArrayList<>();
+    List<String> segmentNames = new ArrayList<>();
+
+    for (BodyPart bodyPartFromReq: multiParts.getBodyParts()) {
+      FormDataBodyPart bodyPart = (FormDataBodyPart) bodyPartFromReq;
+      String segmentName = bodyPart.getContentDisposition().getFileName();
+      segmentNames.add(segmentName);
+      if (StringUtils.isEmpty(segmentName)) {
+        throw new ControllerApplicationException(LOGGER,
+            "filename is a required field within the multipart object for 
METADATA batch upload mode.",
+            Response.Status.BAD_REQUEST);
+      }
+      File tempEncryptedFile;
+      File tempDecryptedFile;
+      File tempSegmentDir;
+      try {
+        String sourceDownloadURIStr = extractHttpHeader(headers,
+            CommonConstants.Controller.SEGMENT_URI_HTTP_HEADER_PREFIX + 
segmentName);
+        if (StringUtils.isEmpty(sourceDownloadURIStr)) {
+          throw new ControllerApplicationException(LOGGER,
+              "'DOWNLOAD_URI' is required as a field within the multipart 
object for METADATA batch upload mode.",
+              Response.Status.BAD_REQUEST);
+        }
+        // The downloadUri for putting into segment zk metadata
+        String segmentDownloadURIStr = sourceDownloadURIStr;
+
+        String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID();
+        tempEncryptedFile = new File(provider.getFileUploadTempDir(), 
tempFileName + ENCRYPTED_SUFFIX);
+        tempEncryptedFiles.add(tempEncryptedFile);
+        tempDecryptedFile = new File(provider.getFileUploadTempDir(), 
tempFileName);
+        tempDecryptedFiles.add(tempDecryptedFile);
+        tempSegmentDir = new File(provider.getUntarredFileTempDir(), 
tempFileName);
+        tempSegmentDirs.add(tempSegmentDir);
+        boolean encryptSegment = 
StringUtils.isNotEmpty(crypterClassNameInHeader);
+        File destFile = encryptSegment ? tempEncryptedFile : tempDecryptedFile;
+        // override copySegmentToFinalLocation if override provided in 
headers:COPY_SEGMENT_TO_DEEP_STORE
+        // else set to false for backward compatibility
+        String copySegmentToDeepStore =
+            extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
+        boolean copySegmentToFinalLocation = 
Boolean.parseBoolean(copySegmentToDeepStore);
+        createSegmentFileFromBodyPart(bodyPart, destFile);
+        // Include the un-tarred segment size when available
+        long segmentSizeInBytes = getSegmentSizeFromHeaders(segmentName, 
headers);
+        if (segmentSizeInBytes < 0) {
+          //  Use the tarred segment size as an approximation.
+          segmentSizeInBytes = getSegmentSizeFromFile(sourceDownloadURIStr);
+        }
+        if (encryptSegment) {
+          decryptFile(crypterClassNameInHeader, tempEncryptedFile, 
tempDecryptedFile);
+        }
+
+        String metadataProviderClass = 
DefaultMetadataExtractor.class.getName();
+        SegmentMetadata segmentMetadata = 
getSegmentMetadata(tempDecryptedFile, tempSegmentDir, metadataProviderClass);
+        LOGGER.info("Processing upload request for segment: {} of table: {} 
with upload type: {} from client: {}, "
+            + "ingestion descriptor: {}", segmentName, tableNameWithType, 
uploadType, clientAddress, ingestionDescriptor
+        );
+
+        // Validate segment
+        if (tableConfig.getIngestionConfig() == null || 
tableConfig.getIngestionConfig().isSegmentTimeValueCheck()) {
+          SegmentValidationUtils.validateTimeInterval(segmentMetadata, 
tableConfig);
+        }
+        long untarredSegmentSizeInBytes = 0L;
+        if (segmentSizeInBytes > 0) {
+          untarredSegmentSizeInBytes = segmentSizeInBytes;
+        }
+        SegmentValidationUtils.checkStorageQuota(segmentName, 
untarredSegmentSizeInBytes, tableConfig,
+            _controllerConf, _storageQuotaChecker);
+
+        // Encrypt segment
+        String crypterNameInTableConfig = 
tableConfig.getValidationConfig().getCrypterClassName();
+        Pair<String, File> encryptionInfo =
+            encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile, 
encryptSegment,
+                crypterClassNameInHeader, crypterNameInTableConfig, 
segmentName, tableNameWithType);
+        File segmentFile = encryptionInfo.getRight();
+
+        // Update download URI if controller is responsible for moving the 
segment to the deep store
+        URI finalSegmentLocationURI = null;
+        if (copySegmentToFinalLocation) {
+          URI dataDirURI = provider.getDataDirURI();
+          String dataDirPath = dataDirURI.toString();
+          String encodedSegmentName = URIUtils.encode(segmentName);
+          String finalSegmentLocationPath = URIUtils.getPath(dataDirPath, 
rawTableName, encodedSegmentName);
+          if 
(dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME))
 {
+            segmentDownloadURIStr = URIUtils.getPath(provider.getVip(), 
"segments", rawTableName, encodedSegmentName);
+          } else {
+            segmentDownloadURIStr = finalSegmentLocationPath;
+          }
+          finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath);
+        }
+        SegmentUploadMetadata segmentUploadMetadata = new 
SegmentUploadMetadata(segmentDownloadURIStr,
+            sourceDownloadURIStr, finalSegmentLocationURI, segmentSizeInBytes, 
segmentMetadata, encryptionInfo);
+        segmentUploadMetadataList.add(segmentUploadMetadata);
+        LOGGER.info("Using segment download URI: {} for segment: {} of table: 
{} (move segment: {})",
+            segmentDownloadURIStr, segmentFile, tableNameWithType, 
copySegmentToFinalLocation);
+      } catch (Exception ex) {
+        cleanupTempFiles(tempEncryptedFiles, tempDecryptedFiles, 
tempSegmentDirs);
+        
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_SEGMENT_UPLOAD_ERROR,
+            segmentUploadMetadataList.size());
+        throw new ControllerApplicationException(LOGGER, "Exception while 
processing segments to upload: "
+            + ex.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, ex);
+      }
+    }
+
+    try {
+      ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, 
_controllerConf, _controllerMetrics);
+      zkOperator.completeSegmentsOperations(tableNameWithType, uploadType, 
enableParallelPushProtection, allowRefresh,
+          headers, segmentUploadMetadataList);
+      return new SuccessResponse("Successfully uploaded segments: " + 
segmentNames + " of table: "
+          + tableNameWithType);
+    } catch (Exception ex) {
+      
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_SEGMENT_UPLOAD_ERROR,
+          segmentUploadMetadataList.size());
+      throw new ControllerApplicationException(LOGGER, "Exception while 
uploading segments: " + ex.getMessage(),
+          Response.Status.INTERNAL_SERVER_ERROR, ex);
+    } finally {
+      cleanupTempFiles(tempEncryptedFiles, tempDecryptedFiles, 
tempSegmentDirs);

Review Comment:
   why not combine the two try-catch blocks into one, and share this `finally { 
cleanupTempFiles(); } `
   
   btw, would it possible to process all segments inside one temp dir, then we 
just need to clean this root temp dir up in the end, and no need to track all 
temp dirs used by segments separately



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java:
##########
@@ -295,13 +302,18 @@ private SuccessResponse uploadSegment(@Nullable String 
tableName, TableType tabl
               extractHttpHeader(headers, 
FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
           copySegmentToFinalLocation = 
Boolean.parseBoolean(copySegmentToDeepStore);
           createSegmentFileFromMultipart(multiPart, destFile);
+          PinotFS pinotFS = null;

Review Comment:
   good catch, may consider to cut a small PR to land this fix first.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java:
##########
@@ -555,6 +750,67 @@ public void uploadSegmentAsMultiPart(FormDataMultiPart 
multiPart,
     }
   }
 
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  @Path("/segmentList")

Review Comment:
   maybe consider `v3/segments` as there is `v2/segments` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to