Copilot commented on code in PR #17806:
URL: https://github.com/apache/pinot/pull/17806#discussion_r2897403359
##########
pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java:
##########
@@ -642,4 +655,182 @@ public void testCopyToLocalFileExistingDirectory() throws
Exception {
}
}
}
+
+ @Test
+ public void testCopyFromLocalFileViaBlobApi() throws Exception {
+ // Create a test instance with Blob API support
+ ADLSGen2PinotFS blobEnabledFs = new ADLSGen2PinotFS(_mockFileSystemClient,
_mockBlobContainerClient);
+
+ // Create a temporary file with test data
+ File tempFile = File.createTempFile("pinot_blob_test", ".tmp");
+ byte[] testData = "test segment data".getBytes(StandardCharsets.UTF_8);
+ Files.write(tempFile.toPath(), testData);
+
+ URI dstUri = new URI("adl2://account/container/test_segment");
Review Comment:
These new tests build destination/source URIs like
`adl2://account/container/...`, which makes the container name look like part
of the path. Elsewhere in this module, URI-to-path extraction tests use
`abfss://*.dfs.core.windows.net/<path>` and treat the path as the in-filesystem
path (container comes from `fileSystemName` config). To avoid confusing the
expected URI format (and accidentally testing an extra path component),
consider switching these URIs to the established style and removing the
`container/` segment from the path.
```suggestion
URI dstUri = new
URI("abfss://account.dfs.core.windows.net/test_segment");
```
##########
pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java:
##########
@@ -637,42 +682,127 @@ private boolean copySrcToDst(URI srcUri, URI dstUri)
PathProperties pathProperties =
_fileSystemClient.getFileClient(AzurePinotFSUtil.convertUriToAzureStylePath(srcUri)).getProperties();
try (InputStream inputStream = open(srcUri)) {
- return copyInputStreamToDst(inputStream, dstUri,
pathProperties.getContentMd5());
+ return copyInputStreamToDst(inputStream, dstUri,
pathProperties.getContentMd5(),
+ pathProperties.getFileSize());
}
}
/**
* Helper function to copy input stream to destination URI.
*
+ * <p>Uses the Azure Blob API for uploads, which is compatible with storage
accounts that have Blob Soft Delete
+ * enabled. The DFS (Data Lake) API does not support Soft Delete and will
fail with 409
+ * EndpointUnsupportedAccountFeatures on such accounts.</p>
+ *
* NOTE: the caller has to close the input stream.
*
* @param inputStream input stream that will be written to dstUri
* @param dstUri destination URI
+ * @param contentMd5 optional MD5 hash of the content
+ * @param contentLength length of the content in bytes
* @return true if the copy succeeds
*/
- private boolean copyInputStreamToDst(InputStream inputStream, URI dstUri,
byte[] contentMd5)
+ private boolean copyInputStreamToDst(InputStream inputStream, URI dstUri,
byte[] contentMd5, long contentLength)
+ throws IOException {
+ String path = AzurePinotFSUtil.convertUriToAzureStylePath(dstUri);
+
+ if (_blobContainerClient != null) {
+ return copyInputStreamToDstViaBlob(inputStream, dstUri, path,
contentMd5, contentLength);
+ }
+ return copyInputStreamToDstViaDfs(inputStream, dstUri, path, contentMd5);
+ }
+
+ /**
+ * Upload via Azure Blob API. Compatible with Blob Soft Delete.
+ */
+ private boolean copyInputStreamToDstViaBlob(InputStream inputStream, URI
dstUri, String path, byte[] contentMd5,
+ long contentLength)
+ throws IOException {
+ try {
+ BlobClient blobClient = _blobContainerClient.getBlobClient(path);
+ BlobHttpHeaders blobHttpHeaders = contentMd5 != null ?
getBlobHttpHeadersWithContentMd5(contentMd5)
+ : null;
+ if (_enableChecksum) {
+ uploadWithBlockLevelChecksum(blobClient.getBlockBlobClient(),
inputStream, blobHttpHeaders);
+ } else if (blobHttpHeaders != null) {
+ blobClient.uploadWithResponse(inputStream, contentLength, null,
blobHttpHeaders, null, null, null, null,
+ Context.NONE);
+ } else {
+ blobClient.upload(inputStream, contentLength, true);
Review Comment:
The new Blob upload code has a distinct branch for `contentMd5 != null`
while `_enableChecksum` is false (`uploadWithResponse(..., blobHttpHeaders,
...)`). There isn't a unit test covering this path, so regressions in header
propagation (Content-MD5) or the upload API choice could slip through. Consider
adding a test that exercises `copySrcToDst` (or `copyFromLocalFile` with an
injected `contentMd5`) with `_enableChecksum=false` and verifies
`uploadWithResponse` is called with `BlobHttpHeaders.getContentMd5()` set and
`upload(...)` is not used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]