This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch fix/adls-blob-soft-delete-support in repository https://gitbox.apache.org/repos/asf/pinot.git
commit b07eda45d553cef20bd2010a1032a073f3475eb1 Author: Xiang Fu <[email protected]> AuthorDate: Tue Mar 3 10:48:42 2026 -0800 Fix ADLS Gen2 plugin to support Azure Blob Soft Delete The DFS (Data Lake) API endpoint does not support Azure Blob Soft Delete, causing 409 EndpointUnsupportedAccountFeatures errors when uploading segments to storage accounts with org-mandated Soft Delete policies. This change adds a BlobContainerClient alongside the existing DataLakeFileSystemClient, using the Blob API (*.blob.core.windows.net) for file uploads which is fully compatible with Soft Delete. The DFS API is retained for read/metadata operations (list, exists, open, etc.) and as a write fallback when the BlobContainerClient is not initialized. Co-Authored-By: Claude Opus 4.6 <[email protected]> --- pinot-plugins/pinot-file-system/pinot-adls/pom.xml | 4 + .../pinot/plugin/filesystem/ADLSGen2PinotFS.java | 94 ++++++++++++++++++---- .../filesystem/test/ADLSGen2PinotFSTest.java | 67 ++++++++++++++- 3 files changed, 149 insertions(+), 16 deletions(-) diff --git a/pinot-plugins/pinot-file-system/pinot-adls/pom.xml b/pinot-plugins/pinot-file-system/pinot-adls/pom.xml index a95142cb367..da0b7e40f56 100644 --- a/pinot-plugins/pinot-file-system/pinot-adls/pom.xml +++ b/pinot-plugins/pinot-file-system/pinot-adls/pom.xml @@ -38,6 +38,10 @@ <groupId>com.azure</groupId> <artifactId>azure-storage-file-datalake</artifactId> </dependency> + <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-storage-blob</artifactId> + </dependency> <dependency> <groupId>com.azure</groupId> <artifactId>azure-identity</artifactId> diff --git a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java index e3b3cc01baa..01bcbdba273 100644 --- a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java +++ b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java @@ -26,6 +26,12 @@ import com.azure.core.util.Context; import com.azure.identity.ClientSecretCredential; import com.azure.identity.ClientSecretCredentialBuilder; import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobHttpHeaders; +import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.common.StorageSharedKeyCredential; import com.azure.storage.common.Utility; import com.azure.storage.file.datalake.DataLakeDirectoryClient; @@ -112,6 +118,10 @@ public class ADLSGen2PinotFS extends BasePinotFS { private DataLakeFileSystemClient _fileSystemClient; + // Blob API client used for file uploads to support Azure storage accounts with Blob Soft Delete enabled. + // The DFS API (Data Lake) does not support Soft Delete, causing 409 EndpointUnsupportedAccountFeatures errors. + private BlobContainerClient _blobContainerClient; + // If enabled, pinotFS implementation will guarantee that the bits you've read are the same as the ones you wrote. // However, there's some overhead in computing hash. (Adds roughly 3 seconds for 1GB file) private boolean _enableChecksum; @@ -119,10 +129,17 @@ public class ADLSGen2PinotFS extends BasePinotFS { public ADLSGen2PinotFS() { } + @VisibleForTesting public ADLSGen2PinotFS(DataLakeFileSystemClient fileSystemClient) { _fileSystemClient = fileSystemClient; } + @VisibleForTesting + public ADLSGen2PinotFS(DataLakeFileSystemClient fileSystemClient, BlobContainerClient blobContainerClient) { + _fileSystemClient = fileSystemClient; + _blobContainerClient = blobContainerClient; + } + @Override public void init(PinotConfiguration config) { boolean checksumEnabled = config.getProperty(ENABLE_CHECKSUM, false); @@ -154,9 +171,12 @@ public class ADLSGen2PinotFS extends BasePinotFS { String sasToken = config.getProperty(SAS_TOKEN); String dfsServiceEndpointUrl = HTTPS_URL_PREFIX + accountName + AZURE_STORAGE_DNS_SUFFIX; + String blobServiceEndpointUrl = HTTPS_URL_PREFIX + accountName + AZURE_BLOB_DNS_SUFFIX; DataLakeServiceClientBuilder dataLakeServiceClientBuilder = new DataLakeServiceClientBuilder().endpoint(dfsServiceEndpointUrl); + BlobServiceClientBuilder blobServiceClientBuilder = + new BlobServiceClientBuilder().endpoint(blobServiceEndpointUrl); switch (authType) { case ACCESS_KEY: { @@ -166,6 +186,7 @@ public class ADLSGen2PinotFS extends BasePinotFS { StorageSharedKeyCredential sharedKeyCredential = new StorageSharedKeyCredential(accountName, accessKey); dataLakeServiceClientBuilder.credential(sharedKeyCredential); + blobServiceClientBuilder.credential(sharedKeyCredential); break; } case SAS_TOKEN: { @@ -175,6 +196,7 @@ public class ADLSGen2PinotFS extends BasePinotFS { AzureSasCredential azureSasCredential = new AzureSasCredential(sasToken); dataLakeServiceClientBuilder.credential(azureSasCredential); + blobServiceClientBuilder.credential(azureSasCredential); break; } case AZURE_AD: { @@ -187,6 +209,7 @@ public class ADLSGen2PinotFS extends BasePinotFS { new ClientSecretCredentialBuilder().clientId(clientId).clientSecret(clientSecret).tenantId(tenantId) .build(); dataLakeServiceClientBuilder.credential(clientSecretCredential); + blobServiceClientBuilder.credential(clientSecretCredential); break; } case AZURE_AD_WITH_PROXY: { @@ -207,7 +230,9 @@ public class ADLSGen2PinotFS extends BasePinotFS { new ClientSecretCredentialBuilder().clientId(clientId).clientSecret(clientSecret).tenantId(tenantId); clientSecretCredentialBuilder.httpClient(builder.build()); - dataLakeServiceClientBuilder.credential(clientSecretCredentialBuilder.build()); + ClientSecretCredential credential = clientSecretCredentialBuilder.build(); + dataLakeServiceClientBuilder.credential(credential); + blobServiceClientBuilder.credential(credential); break; } case DEFAULT: { @@ -226,6 +251,7 @@ public class ADLSGen2PinotFS extends BasePinotFS { defaultAzureCredentialBuilder.authorityHost(authorityHost); } dataLakeServiceClientBuilder.credential(defaultAzureCredentialBuilder.build()); + blobServiceClientBuilder.credential(defaultAzureCredentialBuilder.build()); break; } case ANONYMOUS_ACCESS: { @@ -241,8 +267,12 @@ public class ADLSGen2PinotFS extends BasePinotFS { DataLakeServiceClient serviceClient = dataLakeServiceClientBuilder.buildClient(); _fileSystemClient = getOrCreateClientWithFileSystem(serviceClient, fileSystemName); + BlobServiceClient blobServiceClient = blobServiceClientBuilder.buildClient(); + _blobContainerClient = blobServiceClient.getBlobContainerClient(fileSystemName); + LOGGER.info("ADLSGen2PinotFS is initialized (accountName={}, fileSystemName={}, dfsServiceEndpointUrl={}, " - + "enableChecksum={})", accountName, fileSystemName, dfsServiceEndpointUrl, _enableChecksum); + + "blobServiceEndpointUrl={}, enableChecksum={})", accountName, fileSystemName, dfsServiceEndpointUrl, + blobServiceEndpointUrl, _enableChecksum); } /** @@ -550,7 +580,7 @@ public class ADLSGen2PinotFS extends BasePinotFS { LOGGER.debug("copyFromLocalFile is called with srcFile='{}', dstUri='{}'", srcFile, dstUri); byte[] contentMd5 = _enableChecksum ? computeContentMd5(srcFile) : null; try (InputStream fileInputStream = new FileInputStream(srcFile)) { - copyInputStreamToDst(fileInputStream, dstUri, contentMd5); + copyInputStreamToDst(fileInputStream, dstUri, contentMd5, srcFile.length()); } } @@ -637,34 +667,73 @@ public class ADLSGen2PinotFS extends BasePinotFS { PathProperties pathProperties = _fileSystemClient.getFileClient(AzurePinotFSUtil.convertUriToAzureStylePath(srcUri)).getProperties(); try (InputStream inputStream = open(srcUri)) { - return copyInputStreamToDst(inputStream, dstUri, pathProperties.getContentMd5()); + return copyInputStreamToDst(inputStream, dstUri, pathProperties.getContentMd5(), + pathProperties.getFileSize()); } } /** * Helper function to copy input stream to destination URI. * + * <p>Uses the Azure Blob API for uploads, which is compatible with storage accounts that have Blob Soft Delete + * enabled. The DFS (Data Lake) API does not support Soft Delete and will fail with 409 + * EndpointUnsupportedAccountFeatures on such accounts.</p> + * * NOTE: the caller has to close the input stream. * * @param inputStream input stream that will be written to dstUri * @param dstUri destination URI + * @param contentMd5 optional MD5 hash of the content + * @param contentLength length of the content in bytes * @return true if the copy succeeds */ - private boolean copyInputStreamToDst(InputStream inputStream, URI dstUri, byte[] contentMd5) + private boolean copyInputStreamToDst(InputStream inputStream, URI dstUri, byte[] contentMd5, long contentLength) + throws IOException { + String path = AzurePinotFSUtil.convertUriToAzureStylePath(dstUri); + + if (_blobContainerClient != null) { + return copyInputStreamToDstViaBlob(inputStream, dstUri, path, contentMd5, contentLength); + } + return copyInputStreamToDstViaDfs(inputStream, dstUri, path, contentMd5); + } + + /** + * Upload via Azure Blob API. Compatible with Blob Soft Delete. + */ + private boolean copyInputStreamToDstViaBlob(InputStream inputStream, URI dstUri, String path, byte[] contentMd5, + long contentLength) + throws IOException { + try { + BlobClient blobClient = _blobContainerClient.getBlobClient(path); + blobClient.upload(inputStream, contentLength, true); + + if (contentMd5 != null) { + blobClient.setHttpHeaders(new BlobHttpHeaders().setContentMd5(contentMd5)); + } + return true; + } catch (BlobStorageException e) { + LOGGER.error("Exception thrown while uploading to destination via Blob API (dstUri={}, errorStatus={})", dstUri, + e.getStatusCode(), e); + throw new IOException(e); + } + } + + /** + * Upload via DFS (Data Lake) API. Does NOT support Blob Soft Delete. + * Kept as fallback for backward compatibility when BlobContainerClient is not initialized. + */ + private boolean copyInputStreamToDstViaDfs(InputStream inputStream, URI dstUri, String path, byte[] contentMd5) throws IOException { int bytesRead; long totalBytesRead = 0; byte[] buffer = new byte[BUFFER_SIZE]; - // TODO: the newer client now has the API 'uploadFromFile' that directly takes the file as an input. We can replace - // this upload logic with the 'uploadFromFile'/ DataLakeFileClient fileClient; try { - fileClient = _fileSystemClient.createFile(AzurePinotFSUtil.convertUriToAzureStylePath(dstUri)); + fileClient = _fileSystemClient.createFile(path); } catch (DataLakeStorageException e) { - // If the path already exists, doing nothing and return true if (e.getStatusCode() == ALREADY_EXISTS_STATUS_CODE && e.getErrorCode().equals(PATH_ALREADY_EXISTS_ERROR_CODE)) { LOGGER.info("The destination path already exists and we are overwriting the file (dstUri={})", dstUri); - fileClient = _fileSystemClient.createFile(AzurePinotFSUtil.convertUriToAzureStylePath(dstUri), true); + fileClient = _fileSystemClient.createFile(path, true); } else { LOGGER.error("Exception thrown while calling copy stream to destination (dstUri={}, errorStatus ={})", dstUri, e.getStatusCode(), e); @@ -672,7 +741,6 @@ public class ADLSGen2PinotFS extends BasePinotFS { } } - // Update MD5 metadata if (contentMd5 != null) { PathHttpHeaders pathHttpHeaders = getPathHttpHeaders(fileClient.getProperties()); pathHttpHeaders.setContentMd5(contentMd5); @@ -683,21 +751,17 @@ public class ADLSGen2PinotFS extends BasePinotFS { while ((bytesRead = inputStream.read(buffer)) != -1) { byte[] md5BlockHash = null; if (_enableChecksum) { - // Compute md5 for the current block MessageDigest md5Block = MessageDigest.getInstance("MD5"); md5Block.update(buffer, 0, bytesRead); md5BlockHash = md5Block.digest(); } - // Upload 4MB at a time since Azure's limit for each append call is 4MB. ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(buffer, 0, bytesRead); fileClient.appendWithResponse(byteArrayInputStream, totalBytesRead, bytesRead, md5BlockHash, null, null, Context.NONE); byteArrayInputStream.close(); totalBytesRead += bytesRead; } - // Call flush on ADLS Gen 2 fileClient.flush(totalBytesRead, true); - return true; } catch (DataLakeStorageException | NoSuchAlgorithmException e) { throw new IOException(e); diff --git a/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java index c7823006520..fa178a8f22e 100644 --- a/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java +++ b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java @@ -21,6 +21,10 @@ package org.apache.pinot.plugin.filesystem.test; import com.azure.core.http.rest.PagedIterable; import com.azure.core.http.rest.SimpleResponse; import com.azure.core.util.Context; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.models.BlobHttpHeaders; +import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.file.datalake.DataLakeDirectoryClient; import com.azure.storage.file.datalake.DataLakeFileClient; import com.azure.storage.file.datalake.DataLakeFileSystemClient; @@ -89,6 +93,10 @@ public class ADLSGen2PinotFSTest { private PagedIterable _mockPagedIterable; @Mock private PathItem _mockPathItem; + @Mock + private BlobContainerClient _mockBlobContainerClient; + @Mock + private BlobClient _mockBlobClient; private URI _mockURI; private ADLSGen2PinotFS _adlsGen2PinotFsUnderTest; @@ -107,7 +115,7 @@ public class ADLSGen2PinotFSTest { public void tearDown() { verifyNoMoreInteractions(_mockDataLakeStorageException, _mockServiceClient, _mockFileSystemClient, _mockSimpleResponse, _mockDirectoryClient, _mockPathItem, _mockPagedIterable, _mockPathProperties, - _mockFileClient, _mockFileOpenInputStreamResult, _mockInputStream); + _mockFileClient, _mockFileOpenInputStreamResult, _mockInputStream, _mockBlobContainerClient, _mockBlobClient); } @Test(expectedExceptions = NullPointerException.class) @@ -642,4 +650,61 @@ public class ADLSGen2PinotFSTest { } } } + + @Test + public void testCopyFromLocalFileViaBlobApi() throws Exception { + // Create a test instance with Blob API support + ADLSGen2PinotFS blobEnabledFs = new ADLSGen2PinotFS(_mockFileSystemClient, _mockBlobContainerClient); + + // Create a temporary file with test data + File tempFile = File.createTempFile("pinot_blob_test", ".tmp"); + byte[] testData = "test segment data".getBytes(); + Files.write(tempFile.toPath(), testData); + + URI dstUri = new URI("adl2://account/container/test_segment"); + String expectedPath = AzurePinotFSUtil.convertUriToAzureStylePath(dstUri); + + when(_mockBlobContainerClient.getBlobClient(expectedPath)).thenReturn(_mockBlobClient); + doNothing().when(_mockBlobClient).upload(any(InputStream.class), eq((long) testData.length), eq(true)); + + try { + blobEnabledFs.copyFromLocalFile(tempFile, dstUri); + + verify(_mockBlobContainerClient).getBlobClient(expectedPath); + verify(_mockBlobClient).upload(any(InputStream.class), eq((long) testData.length), eq(true)); + // copyFromLocalFile always computes MD5, so setHttpHeaders is always called + verify(_mockBlobClient).setHttpHeaders(any(BlobHttpHeaders.class)); + } finally { + FileUtils.deleteQuietly(tempFile); + } + } + + @Test + public void testCopyFromLocalFileViaBlobApiWithException() throws Exception { + // Create a test instance with Blob API support + ADLSGen2PinotFS blobEnabledFs = new ADLSGen2PinotFS(_mockFileSystemClient, _mockBlobContainerClient); + + // Create a temporary file with test data + File tempFile = File.createTempFile("pinot_blob_test", ".tmp"); + byte[] testData = "test segment data".getBytes(); + Files.write(tempFile.toPath(), testData); + + URI dstUri = new URI("adl2://account/container/test_segment"); + String expectedPath = AzurePinotFSUtil.convertUriToAzureStylePath(dstUri); + + BlobStorageException blobException = mock(BlobStorageException.class); + when(blobException.getStatusCode()).thenReturn(500); + when(_mockBlobContainerClient.getBlobClient(expectedPath)).thenReturn(_mockBlobClient); + doThrow(blobException).when(_mockBlobClient).upload(any(InputStream.class), eq((long) testData.length), eq(true)); + + try { + expectThrows(IOException.class, () -> blobEnabledFs.copyFromLocalFile(tempFile, dstUri)); + + verify(_mockBlobContainerClient).getBlobClient(expectedPath); + verify(_mockBlobClient).upload(any(InputStream.class), eq((long) testData.length), eq(true)); + verify(blobException).getStatusCode(); + } finally { + FileUtils.deleteQuietly(tempFile); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
