Copilot commented on code in PR #17806:
URL: https://github.com/apache/pinot/pull/17806#discussion_r2887813423
##########
pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java:
##########
@@ -226,6 +254,7 @@ public void init(PinotConfiguration config) {
defaultAzureCredentialBuilder.authorityHost(authorityHost);
}
dataLakeServiceClientBuilder.credential(defaultAzureCredentialBuilder.build());
+
blobServiceClientBuilder.credential(defaultAzureCredentialBuilder.build());
break;
Review Comment:
In the DEFAULT auth branch, `defaultAzureCredentialBuilder.build()` is
called twice (once for the DFS client and once for the Blob client). That
creates two separate DefaultAzureCredential instances (separate caches/HTTP
pipelines) and adds unnecessary overhead. Build it once, store it in a
variable, and pass the same credential to both builders.
##########
pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java:
##########
@@ -637,42 +670,115 @@ private boolean copySrcToDst(URI srcUri, URI dstUri)
PathProperties pathProperties =
_fileSystemClient.getFileClient(AzurePinotFSUtil.convertUriToAzureStylePath(srcUri)).getProperties();
try (InputStream inputStream = open(srcUri)) {
- return copyInputStreamToDst(inputStream, dstUri,
pathProperties.getContentMd5());
+ return copyInputStreamToDst(inputStream, dstUri,
pathProperties.getContentMd5(),
+ pathProperties.getFileSize());
}
}
/**
* Helper function to copy input stream to destination URI.
*
+ * <p>Uses the Azure Blob API for uploads, which is compatible with storage
accounts that have Blob Soft Delete
+ * enabled. The DFS (Data Lake) API does not support Soft Delete and will
fail with 409
+ * EndpointUnsupportedAccountFeatures on such accounts.</p>
+ *
* NOTE: the caller has to close the input stream.
*
* @param inputStream input stream that will be written to dstUri
* @param dstUri destination URI
+ * @param contentMd5 optional MD5 hash of the content
+ * @param contentLength length of the content in bytes
* @return true if the copy succeeds
*/
- private boolean copyInputStreamToDst(InputStream inputStream, URI dstUri,
byte[] contentMd5)
+ private boolean copyInputStreamToDst(InputStream inputStream, URI dstUri,
byte[] contentMd5, long contentLength)
+ throws IOException {
+ String path = AzurePinotFSUtil.convertUriToAzureStylePath(dstUri);
+
+ if (_blobContainerClient != null) {
+ return copyInputStreamToDstViaBlob(inputStream, dstUri, path,
contentMd5, contentLength);
+ }
+ return copyInputStreamToDstViaDfs(inputStream, dstUri, path, contentMd5);
+ }
+
+ /**
+ * Upload via Azure Blob API. Compatible with Blob Soft Delete.
+ */
+ private boolean copyInputStreamToDstViaBlob(InputStream inputStream, URI
dstUri, String path, byte[] contentMd5,
+ long contentLength)
+ throws IOException {
+ try {
+ BlobClient blobClient = _blobContainerClient.getBlobClient(path);
+ if (_enableChecksum) {
+ uploadWithBlockLevelChecksum(blobClient.getBlockBlobClient(),
inputStream);
+ } else {
+ blobClient.upload(inputStream, contentLength, true);
+ }
+
+ if (contentMd5 != null) {
+ blobClient.setHttpHeaders(new
BlobHttpHeaders().setContentMd5(contentMd5));
+ }
+ return true;
+ } catch (BlobStorageException e) {
+ LOGGER.error("Exception thrown while uploading to destination via Blob
API (dstUri={}, errorStatus={})", dstUri,
+ e.getStatusCode(), e);
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Uploads stream using block staging with per-block MD5 for transactional
integrity.
+ */
+ private void uploadWithBlockLevelChecksum(BlockBlobClient blockBlobClient,
InputStream inputStream)
+ throws IOException, BlobStorageException {
+ int bytesRead;
+ int blockIdCounter = 0;
+ byte[] buffer = new byte[BUFFER_SIZE];
+ List<String> blockIds = new ArrayList<>();
+
+ try {
+ while ((bytesRead = inputStream.read(buffer)) != -1) {
+ MessageDigest md5Block = MessageDigest.getInstance("MD5");
+ md5Block.update(buffer, 0, bytesRead);
+ byte[] md5BlockHash = md5Block.digest();
+
+ String blockId =
Base64.getEncoder().encodeToString(String.format("%08d",
blockIdCounter).getBytes());
+ blockIdCounter++;
Review Comment:
`String.format(...).getBytes()` uses the platform default charset. Specify
an explicit charset (e.g., UTF-8) when generating block IDs to avoid
platform-dependent behavior.
##########
pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java:
##########
@@ -637,42 +670,115 @@ private boolean copySrcToDst(URI srcUri, URI dstUri)
PathProperties pathProperties =
_fileSystemClient.getFileClient(AzurePinotFSUtil.convertUriToAzureStylePath(srcUri)).getProperties();
try (InputStream inputStream = open(srcUri)) {
- return copyInputStreamToDst(inputStream, dstUri,
pathProperties.getContentMd5());
+ return copyInputStreamToDst(inputStream, dstUri,
pathProperties.getContentMd5(),
+ pathProperties.getFileSize());
}
}
/**
* Helper function to copy input stream to destination URI.
*
+ * <p>Uses the Azure Blob API for uploads, which is compatible with storage
accounts that have Blob Soft Delete
+ * enabled. The DFS (Data Lake) API does not support Soft Delete and will
fail with 409
+ * EndpointUnsupportedAccountFeatures on such accounts.</p>
+ *
* NOTE: the caller has to close the input stream.
*
* @param inputStream input stream that will be written to dstUri
* @param dstUri destination URI
+ * @param contentMd5 optional MD5 hash of the content
+ * @param contentLength length of the content in bytes
* @return true if the copy succeeds
*/
- private boolean copyInputStreamToDst(InputStream inputStream, URI dstUri,
byte[] contentMd5)
+ private boolean copyInputStreamToDst(InputStream inputStream, URI dstUri,
byte[] contentMd5, long contentLength)
+ throws IOException {
+ String path = AzurePinotFSUtil.convertUriToAzureStylePath(dstUri);
+
+ if (_blobContainerClient != null) {
+ return copyInputStreamToDstViaBlob(inputStream, dstUri, path,
contentMd5, contentLength);
+ }
+ return copyInputStreamToDstViaDfs(inputStream, dstUri, path, contentMd5);
+ }
+
+ /**
+ * Upload via Azure Blob API. Compatible with Blob Soft Delete.
+ */
+ private boolean copyInputStreamToDstViaBlob(InputStream inputStream, URI
dstUri, String path, byte[] contentMd5,
+ long contentLength)
+ throws IOException {
+ try {
+ BlobClient blobClient = _blobContainerClient.getBlobClient(path);
+ if (_enableChecksum) {
+ uploadWithBlockLevelChecksum(blobClient.getBlockBlobClient(),
inputStream);
+ } else {
+ blobClient.upload(inputStream, contentLength, true);
+ }
+
+ if (contentMd5 != null) {
+ blobClient.setHttpHeaders(new
BlobHttpHeaders().setContentMd5(contentMd5));
Review Comment:
`blobClient.setHttpHeaders(new BlobHttpHeaders().setContentMd5(contentMd5))`
will clear any other blob HTTP headers (e.g., Content-Type, Content-Encoding,
Cache-Control) because the other fields are left null. To avoid unintentionally
wiping headers (especially on overwrite), preserve existing header values when
setting Content-MD5, or set Content-MD5 as part of the upload request so no
follow-up header-reset call is needed.
```suggestion
BlobHttpHeaders headers =
blobClient.getProperties().getBlobHttpHeaders();
if (headers == null) {
headers = new BlobHttpHeaders();
}
headers.setContentMd5(contentMd5);
blobClient.setHttpHeaders(headers);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]