Copilot commented on code in PR #17806:
URL: https://github.com/apache/pinot/pull/17806#discussion_r2892961289


##########
pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java:
##########
@@ -637,42 +674,134 @@ private boolean copySrcToDst(URI srcUri, URI dstUri)
     PathProperties pathProperties =
         
_fileSystemClient.getFileClient(AzurePinotFSUtil.convertUriToAzureStylePath(srcUri)).getProperties();
     try (InputStream inputStream = open(srcUri)) {
-      return copyInputStreamToDst(inputStream, dstUri, 
pathProperties.getContentMd5());
+      return copyInputStreamToDst(inputStream, dstUri, 
pathProperties.getContentMd5(),
+          pathProperties.getFileSize());
     }
   }
 
   /**
    * Helper function to copy input stream to destination URI.
    *
+   * <p>Uses the Azure Blob API for uploads, which is compatible with storage 
accounts that have Blob Soft Delete
+   * enabled. The DFS (Data Lake) API does not support Soft Delete and will 
fail with 409
+   * EndpointUnsupportedAccountFeatures on such accounts.</p>
+   *
    * NOTE: the caller has to close the input stream.
    *
    * @param inputStream input stream that will be written to dstUri
    * @param dstUri destination URI
+   * @param contentMd5 optional MD5 hash of the content
+   * @param contentLength length of the content in bytes
    * @return true if the copy succeeds
    */
-  private boolean copyInputStreamToDst(InputStream inputStream, URI dstUri, 
byte[] contentMd5)
+  private boolean copyInputStreamToDst(InputStream inputStream, URI dstUri, 
byte[] contentMd5, long contentLength)
+      throws IOException {
+    String path = AzurePinotFSUtil.convertUriToAzureStylePath(dstUri);
+
+    if (_blobContainerClient != null) {
+      return copyInputStreamToDstViaBlob(inputStream, dstUri, path, 
contentMd5, contentLength);
+    }
+    return copyInputStreamToDstViaDfs(inputStream, dstUri, path, contentMd5);
+  }
+
+  /**
+   * Upload via Azure Blob API. Compatible with Blob Soft Delete.
+   */
+  private boolean copyInputStreamToDstViaBlob(InputStream inputStream, URI 
dstUri, String path, byte[] contentMd5,
+      long contentLength)
+      throws IOException {
+    try {
+      BlobClient blobClient = _blobContainerClient.getBlobClient(path);
+      BlobHttpHeaders blobHttpHeaders = contentMd5 != null ? 
getBlobHttpHeadersWithContentMd5(blobClient, contentMd5)
+          : null;
+      if (_enableChecksum) {
+        uploadWithBlockLevelChecksum(blobClient.getBlockBlobClient(), 
inputStream, blobHttpHeaders);
+      } else if (blobHttpHeaders != null) {
+        blobClient.uploadWithResponse(inputStream, contentLength, null, 
blobHttpHeaders, null, null, null, null,
+            Context.NONE);
+      } else {
+        blobClient.upload(inputStream, contentLength, true);
+      }
+      return true;
+    } catch (BlobStorageException e) {
+      LOGGER.error("Exception thrown while uploading to destination via Blob 
API (dstUri={}, errorStatus={})", dstUri,
+          e.getStatusCode(), e);
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Uploads stream using block staging with per-block MD5 for transactional 
integrity.
+   */
+  private void uploadWithBlockLevelChecksum(BlockBlobClient blockBlobClient, 
InputStream inputStream,
+      BlobHttpHeaders blobHttpHeaders)
+      throws IOException, BlobStorageException {
+    int bytesRead;
+    int blockIdCounter = 0;
+    byte[] buffer = new byte[BUFFER_SIZE];
+    List<String> blockIds = new ArrayList<>();
+
+    try {
+      while ((bytesRead = inputStream.read(buffer)) != -1) {
+        MessageDigest md5Block = MessageDigest.getInstance("MD5");

Review Comment:
   In `uploadWithBlockLevelChecksum`, a new `MessageDigest` is instantiated on 
every loop iteration. This adds avoidable overhead for large uploads; consider 
creating a single `MessageDigest` outside the loop and `reset()`/reuse it per 
block.
   ```suggestion
         MessageDigest md5Block = MessageDigest.getInstance("MD5");
         while ((bytesRead = inputStream.read(buffer)) != -1) {
           md5Block.reset();
   ```



##########
pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java:
##########
@@ -637,42 +674,134 @@ private boolean copySrcToDst(URI srcUri, URI dstUri)
     PathProperties pathProperties =
         
_fileSystemClient.getFileClient(AzurePinotFSUtil.convertUriToAzureStylePath(srcUri)).getProperties();
     try (InputStream inputStream = open(srcUri)) {
-      return copyInputStreamToDst(inputStream, dstUri, 
pathProperties.getContentMd5());
+      return copyInputStreamToDst(inputStream, dstUri, 
pathProperties.getContentMd5(),
+          pathProperties.getFileSize());
     }
   }
 
   /**
    * Helper function to copy input stream to destination URI.
    *
+   * <p>Uses the Azure Blob API for uploads, which is compatible with storage 
accounts that have Blob Soft Delete
+   * enabled. The DFS (Data Lake) API does not support Soft Delete and will 
fail with 409
+   * EndpointUnsupportedAccountFeatures on such accounts.</p>
+   *
    * NOTE: the caller has to close the input stream.
    *
    * @param inputStream input stream that will be written to dstUri
    * @param dstUri destination URI
+   * @param contentMd5 optional MD5 hash of the content
+   * @param contentLength length of the content in bytes
    * @return true if the copy succeeds
    */
-  private boolean copyInputStreamToDst(InputStream inputStream, URI dstUri, 
byte[] contentMd5)
+  private boolean copyInputStreamToDst(InputStream inputStream, URI dstUri, 
byte[] contentMd5, long contentLength)
+      throws IOException {
+    String path = AzurePinotFSUtil.convertUriToAzureStylePath(dstUri);
+
+    if (_blobContainerClient != null) {
+      return copyInputStreamToDstViaBlob(inputStream, dstUri, path, 
contentMd5, contentLength);
+    }
+    return copyInputStreamToDstViaDfs(inputStream, dstUri, path, contentMd5);
+  }
+
+  /**
+   * Upload via Azure Blob API. Compatible with Blob Soft Delete.
+   */
+  private boolean copyInputStreamToDstViaBlob(InputStream inputStream, URI 
dstUri, String path, byte[] contentMd5,
+      long contentLength)
+      throws IOException {
+    try {
+      BlobClient blobClient = _blobContainerClient.getBlobClient(path);
+      BlobHttpHeaders blobHttpHeaders = contentMd5 != null ? 
getBlobHttpHeadersWithContentMd5(blobClient, contentMd5)
+          : null;
+      if (_enableChecksum) {
+        uploadWithBlockLevelChecksum(blobClient.getBlockBlobClient(), 
inputStream, blobHttpHeaders);
+      } else if (blobHttpHeaders != null) {
+        blobClient.uploadWithResponse(inputStream, contentLength, null, 
blobHttpHeaders, null, null, null, null,
+            Context.NONE);
+      } else {
+        blobClient.upload(inputStream, contentLength, true);
+      }
+      return true;
+    } catch (BlobStorageException e) {
+      LOGGER.error("Exception thrown while uploading to destination via Blob 
API (dstUri={}, errorStatus={})", dstUri,
+          e.getStatusCode(), e);
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Uploads stream using block staging with per-block MD5 for transactional 
integrity.
+   */
+  private void uploadWithBlockLevelChecksum(BlockBlobClient blockBlobClient, 
InputStream inputStream,
+      BlobHttpHeaders blobHttpHeaders)
+      throws IOException, BlobStorageException {
+    int bytesRead;
+    int blockIdCounter = 0;
+    byte[] buffer = new byte[BUFFER_SIZE];
+    List<String> blockIds = new ArrayList<>();
+
+    try {
+      while ((bytesRead = inputStream.read(buffer)) != -1) {
+        MessageDigest md5Block = MessageDigest.getInstance("MD5");
+        md5Block.update(buffer, 0, bytesRead);
+        byte[] md5BlockHash = md5Block.digest();
+
+        String blockId = Base64.getEncoder()
+            .encodeToString(String.format("%08d", 
blockIdCounter).getBytes(StandardCharsets.UTF_8));
+        blockIdCounter++;
+        blockIds.add(blockId);
+
+        try (ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(buffer, 0, bytesRead)) {
+          blockBlobClient.stageBlockWithResponse(blockId, 
byteArrayInputStream, bytesRead, md5BlockHash, null, null,
+              Context.NONE);
+        }
+      }
+      if (blobHttpHeaders != null) {
+        blockBlobClient.commitBlockListWithResponse(blockIds, blobHttpHeaders, 
null, null, null, null, Context.NONE);
+      } else {
+        blockBlobClient.commitBlockList(blockIds);
+      }
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private BlobHttpHeaders getBlobHttpHeadersWithContentMd5(BlobClient 
blobClient, byte[] contentMd5) {
+    BlobHttpHeaders defaultBlobHttpHeaders = new 
BlobHttpHeaders().setContentMd5(contentMd5);
+    try {
+      return 
getBlobHttpHeaders(blobClient.getProperties()).setContentMd5(contentMd5);
+    } catch (BlobStorageException e) {
+      if (e.getStatusCode() == NOT_FOUND_STATUS_CODE) {
+        return defaultBlobHttpHeaders;
+      }
+      throw e;
+    }

Review Comment:
   `getBlobHttpHeadersWithContentMd5()` calls `blobClient.getProperties()` 
before every upload when `contentMd5` is present, adding an extra network 
round-trip and potentially requiring read permissions even when only uploads 
are needed. Consider avoiding the properties lookup by constructing 
`BlobHttpHeaders` directly (e.g., only setting Content-MD5), or making the 
lookup conditional on an explicit need to preserve existing headers.
   ```suggestion
       // Avoid an extra network call to fetch existing blob properties; only 
set Content-MD5 explicitly.
       return new BlobHttpHeaders().setContentMd5(contentMd5);
   ```



##########
pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java:
##########
@@ -103,11 +118,18 @@ public void setup()
     _mockURI = new URI("mock://mock");
   }
 
+  private void setEnableChecksum(ADLSGen2PinotFS adlsGen2PinotFS, boolean 
enableChecksum)
+      throws ReflectiveOperationException {
+    Field enableChecksumField = 
ADLSGen2PinotFS.class.getDeclaredField("_enableChecksum");
+    enableChecksumField.setAccessible(true);
+    enableChecksumField.set(adlsGen2PinotFS, enableChecksum);
+  }

Review Comment:
   The tests use reflection (`setAccessible(true)`) to mutate the private 
`_enableChecksum` field, which is brittle if the implementation changes. Since 
the class already has `@VisibleForTesting` constructors, consider adding a 
small `@VisibleForTesting` setter/constructor parameter for `_enableChecksum` 
to keep tests resilient without reflective access.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to