Copilot commented on code in PR #17806:
URL: https://github.com/apache/pinot/pull/17806#discussion_r2894594472
##########
pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java:
##########
@@ -642,4 +655,182 @@ public void testCopyToLocalFileExistingDirectory() throws
Exception {
}
}
}
+
+ @Test
+ public void testCopyFromLocalFileViaBlobApi() throws Exception {
+ // Create a test instance with Blob API support
+ ADLSGen2PinotFS blobEnabledFs = new ADLSGen2PinotFS(_mockFileSystemClient,
_mockBlobContainerClient);
+
+ // Create a temporary file with test data
+ File tempFile = File.createTempFile("pinot_blob_test", ".tmp");
+ byte[] testData = "test segment data".getBytes();
+ Files.write(tempFile.toPath(), testData);
+
+ URI dstUri = new URI("adl2://account/container/test_segment");
+ String expectedPath = AzurePinotFSUtil.convertUriToAzureStylePath(dstUri);
+
+
when(_mockBlobContainerClient.getBlobClient(expectedPath)).thenReturn(_mockBlobClient);
+ doNothing().when(_mockBlobClient).upload(any(InputStream.class), eq((long)
testData.length), eq(true));
+
+ try {
+ blobEnabledFs.copyFromLocalFile(tempFile, dstUri);
+
+ verify(_mockBlobContainerClient).getBlobClient(expectedPath);
+ verify(_mockBlobClient).upload(any(InputStream.class), eq((long)
testData.length), eq(true));
+ // _enableChecksum defaults to false for this test instance, so no
content MD5 header is set.
+ verify(_mockBlobClient,
never()).setHttpHeaders(any(BlobHttpHeaders.class));
+ } finally {
+ FileUtils.deleteQuietly(tempFile);
+ }
+ }
+
+ @Test
+ public void testCopyFromLocalFileViaBlobApiWithChecksumEnabled() throws
Exception {
+ // Create a test instance with Blob API support and checksum enabled
+ ADLSGen2PinotFS blobEnabledFs = new ADLSGen2PinotFS(_mockFileSystemClient,
_mockBlobContainerClient, true);
+
+ // Create a temporary file with test data
+ File tempFile = File.createTempFile("pinot_blob_checksum_test", ".tmp");
+ byte[] testData = "test segment data".getBytes(StandardCharsets.UTF_8);
+ Files.write(tempFile.toPath(), testData);
+
+ URI dstUri = new URI("adl2://account/container/test_segment");
+ String expectedPath = AzurePinotFSUtil.convertUriToAzureStylePath(dstUri);
+ String expectedBlockId =
+ Base64.getEncoder().encodeToString(String.format("%08d",
0).getBytes(StandardCharsets.UTF_8));
+
+ BlockBlobClient mockBlockBlobClient = mock(BlockBlobClient.class);
+
when(_mockBlobContainerClient.getBlobClient(expectedPath)).thenReturn(_mockBlobClient);
+ when(_mockBlobClient.getBlockBlobClient()).thenReturn(mockBlockBlobClient);
+
+ ArgumentCaptor<BlobHttpHeaders> headersCaptor =
ArgumentCaptor.forClass(BlobHttpHeaders.class);
+
+ try {
+ blobEnabledFs.copyFromLocalFile(tempFile, dstUri);
+
+ verify(_mockBlobContainerClient).getBlobClient(expectedPath);
+ verify(_mockBlobClient).getBlockBlobClient();
+ verify(_mockBlobClient, never()).getProperties();
+ verify(_mockBlobClient, never()).upload(any(InputStream.class),
anyLong(), anyBoolean());
+ verify(_mockBlobClient,
never()).setHttpHeaders(any(BlobHttpHeaders.class));
+
+ verify(mockBlockBlobClient).stageBlockWithResponse(eq(expectedBlockId),
any(InputStream.class),
+ eq((long) testData.length), any(byte[].class), isNull(), isNull(),
eq(Context.NONE));
+
verify(mockBlockBlobClient).commitBlockListWithResponse(eq(List.of(expectedBlockId)),
headersCaptor.capture(),
+ isNull(), isNull(), isNull(), isNull(), eq(Context.NONE));
+
+ byte[] expectedContentMd5 =
MessageDigest.getInstance("MD5").digest(testData);
+ BlobHttpHeaders uploadedHeaders = headersCaptor.getValue();
+ assertArrayEquals(uploadedHeaders.getContentMd5(), expectedContentMd5);
+ verifyNoMoreInteractions(mockBlockBlobClient);
+ } finally {
+ FileUtils.deleteQuietly(tempFile);
+ }
+ }
+
+ @Test
+ public void testCopyDirViaBlobApiWithChecksumEnabledAndNoContentMd5() throws
Exception {
+ DataLakeFileSystemClient mockFileSystemClient =
mock(DataLakeFileSystemClient.class);
+ BlobContainerClient mockBlobContainerClient =
mock(BlobContainerClient.class);
+ BlobClient mockBlobClient = mock(BlobClient.class);
+ BlockBlobClient mockBlockBlobClient = mock(BlockBlobClient.class);
+ DataLakeDirectoryClient mockSrcDirectoryClient =
mock(DataLakeDirectoryClient.class);
+ DataLakeDirectoryClient mockDstDirectoryClient =
mock(DataLakeDirectoryClient.class);
+ DataLakeStorageException mockNotFoundException =
mock(DataLakeStorageException.class);
+ PathProperties mockSrcDirectoryPathProperties = mock(PathProperties.class);
+ DataLakeFileClient mockSrcFileClient = mock(DataLakeFileClient.class);
+ PathProperties mockSrcFilePathProperties = mock(PathProperties.class);
+ DataLakeFileOpenInputStreamResult mockOpenInputStreamResult =
mock(DataLakeFileOpenInputStreamResult.class);
+
+ ADLSGen2PinotFS blobEnabledFs = new ADLSGen2PinotFS(mockFileSystemClient,
mockBlobContainerClient, true);
+
+ URI srcUri = new URI("adl2://account/container/src");
+ URI dstUri = new URI("adl2://account/container/dst");
+ String srcPath = AzurePinotFSUtil.convertUriToAzureStylePath(srcUri);
+ String dstPath = AzurePinotFSUtil.convertUriToAzureStylePath(dstUri);
+
+ byte[] testData = "test segment data".getBytes(StandardCharsets.UTF_8);
+ String expectedBlockId =
+ Base64.getEncoder().encodeToString(String.format("%08d",
0).getBytes(StandardCharsets.UTF_8));
+
+
when(mockFileSystemClient.getDirectoryClient(dstPath)).thenReturn(mockDstDirectoryClient);
+
when(mockDstDirectoryClient.getProperties()).thenThrow(mockNotFoundException);
+ when(mockNotFoundException.getStatusCode()).thenReturn(404);
+
+
when(mockFileSystemClient.getDirectoryClient(srcPath)).thenReturn(mockSrcDirectoryClient);
+
when(mockSrcDirectoryClient.getProperties()).thenReturn(mockSrcDirectoryPathProperties);
+ HashMap<String, String> srcMetadata = new HashMap<>();
+ srcMetadata.put("hdi_isfolder", "false");
+ when(mockSrcDirectoryPathProperties.getMetadata()).thenReturn(srcMetadata);
+
+
when(mockFileSystemClient.getFileClient(srcPath)).thenReturn(mockSrcFileClient);
+
when(mockSrcFileClient.getProperties()).thenReturn(mockSrcFilePathProperties);
+ when(mockSrcFilePathProperties.getContentMd5()).thenReturn(null);
+ when(mockSrcFilePathProperties.getFileSize()).thenReturn((long)
testData.length);
+
when(mockSrcFileClient.openInputStream()).thenReturn(mockOpenInputStreamResult);
+ when(mockOpenInputStreamResult.getInputStream()).thenReturn(new
ByteArrayInputStream(testData));
+
+
when(mockBlobContainerClient.getBlobClient(dstPath)).thenReturn(mockBlobClient);
+ when(mockBlobClient.getBlockBlobClient()).thenReturn(mockBlockBlobClient);
+
+ assertTrue(blobEnabledFs.copyDir(srcUri, dstUri));
+
+ verify(mockFileSystemClient).getDirectoryClient(dstPath);
+ verify(mockDstDirectoryClient).getProperties();
+ verify(mockNotFoundException).getStatusCode();
+ verify(mockFileSystemClient).getDirectoryClient(srcPath);
+ verify(mockSrcDirectoryClient).getProperties();
+ verify(mockSrcDirectoryPathProperties).getMetadata();
+ verify(mockFileSystemClient, times(2)).getFileClient(srcPath);
+ verify(mockSrcFileClient).getProperties();
+ verify(mockSrcFilePathProperties).getContentMd5();
+ verify(mockSrcFilePathProperties).getFileSize();
+ verify(mockSrcFileClient).openInputStream();
+ verify(mockOpenInputStreamResult).getInputStream();
+
+ verify(mockBlobContainerClient).getBlobClient(dstPath);
+ verify(mockBlobClient).getBlockBlobClient();
+ verify(mockBlobClient, never()).getProperties();
+ verify(mockBlobClient, never()).upload(any(InputStream.class), anyLong(),
anyBoolean());
+ verify(mockBlobClient, never()).setHttpHeaders(any(BlobHttpHeaders.class));
+
+ verify(mockBlockBlobClient).stageBlockWithResponse(eq(expectedBlockId),
any(InputStream.class),
+ eq((long) testData.length), any(byte[].class), isNull(), isNull(),
eq(Context.NONE));
+ verify(mockBlockBlobClient).commitBlockList(eq(List.of(expectedBlockId)));
+ verify(mockBlockBlobClient,
never()).commitBlockListWithResponse(anyList(), any(BlobHttpHeaders.class),
any(),
+ any(), any(), any(), any());
+
+ verifyNoMoreInteractions(mockFileSystemClient, mockBlobContainerClient,
mockBlobClient, mockBlockBlobClient,
+ mockSrcDirectoryClient, mockDstDirectoryClient, mockNotFoundException,
mockSrcDirectoryPathProperties,
+ mockSrcFileClient, mockSrcFilePathProperties,
mockOpenInputStreamResult);
+ }
+
+ @Test
+ public void testCopyFromLocalFileViaBlobApiWithException() throws Exception {
+ // Create a test instance with Blob API support
+ ADLSGen2PinotFS blobEnabledFs = new ADLSGen2PinotFS(_mockFileSystemClient,
_mockBlobContainerClient);
+
+ // Create a temporary file with test data
+ File tempFile = File.createTempFile("pinot_blob_test", ".tmp");
+ byte[] testData = "test segment data".getBytes();
Review Comment:
Avoid relying on the platform default charset in tests. Use an explicit
charset (e.g., `StandardCharsets.UTF_8`) for `getBytes(...)` so the test is
deterministic across environments.
```suggestion
byte[] testData = "test segment data".getBytes(StandardCharsets.UTF_8);
```
##########
pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSTest.java:
##########
@@ -642,4 +655,182 @@ public void testCopyToLocalFileExistingDirectory() throws
Exception {
}
}
}
+
+ @Test
+ public void testCopyFromLocalFileViaBlobApi() throws Exception {
+ // Create a test instance with Blob API support
+ ADLSGen2PinotFS blobEnabledFs = new ADLSGen2PinotFS(_mockFileSystemClient,
_mockBlobContainerClient);
+
+ // Create a temporary file with test data
+ File tempFile = File.createTempFile("pinot_blob_test", ".tmp");
+ byte[] testData = "test segment data".getBytes();
+ Files.write(tempFile.toPath(), testData);
Review Comment:
Avoid relying on the platform default charset in tests. Use an explicit
charset (e.g., `StandardCharsets.UTF_8`) for `getBytes(...)` so the test is
deterministic across environments.
##########
pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java:
##########
@@ -112,17 +123,36 @@ private enum AuthenticationType {
private DataLakeFileSystemClient _fileSystemClient;
+ // Blob API client used for file uploads to support Azure storage accounts
with Blob Soft Delete enabled.
+ // The DFS API (Data Lake) does not support Soft Delete, causing 409
EndpointUnsupportedAccountFeatures errors.
+ private BlobContainerClient _blobContainerClient;
+
Review Comment:
`_blobContainerClient` is intentionally nullable (you branch on `!= null` to
decide whether to use Blob vs DFS upload). In Pinot code we typically annotate
nullable fields/params with `javax.annotation.Nullable` to make the contract
explicit and avoid incorrect non-null assumptions by readers/static analysis
(e.g., `S3PinotFS` uses `@Nullable` for optional fields). Consider marking this
field as `@Nullable` (and importing the annotation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]