[
https://issues.apache.org/jira/browse/HADOOP-19232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17911814#comment-17911814
]
ASF GitHub Bot commented on HADOOP-19232:
-----------------------------------------
anmolanmol1234 commented on code in PR #7272:
URL: https://github.com/apache/hadoop/pull/7272#discussion_r1909970881
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java:
##########
@@ -149,4 +197,954 @@ public void testCloseOfDataBlockOnAppendComplete() throws
Exception {
}
}
}
-}
+
+ /**
+ * Creates a file over DFS and attempts to append over Blob.
+ * It should fallback to DFS when appending to the file fails.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Test
+ public void testCreateOverDfsAppendOverBlob() throws IOException {
+ Assume.assumeFalse(
+ getConfiguration().getBoolean(FS_AZURE_TEST_APPENDBLOB_ENABLED,
false));
+ final AzureBlobFileSystem fs = getFileSystem();
+ Path testPath = path(TEST_FILE_PATH);
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore().getClientHandler().getDfsClient().
+ createPath(makeQualified(testPath).toUri().getPath(), true, false,
+ permissions, false, null,
+ null, getTestTracingContext(fs, true));
+ fs.getAbfsStore()
+ .getAbfsConfiguration()
+ .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
+ FSDataOutputStream outputStream = fs.append(testPath);
+ AzureIngressHandler ingressHandler
+ = ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
+ AbfsClient client = ingressHandler.getClient();
+ Assert.assertTrue("Blob client was not used before fallback",
+ client instanceof AbfsBlobClient);
+ outputStream.write(TEN);
+ outputStream.hsync();
+ outputStream.write(TWENTY);
+ outputStream.hsync();
+ outputStream.write(THIRTY);
+ outputStream.hsync();
+ AzureIngressHandler ingressHandlerFallback
+ = ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
+ AbfsClient clientFallback = ingressHandlerFallback.getClient();
+ Assert.assertTrue("DFS client was not used after fallback",
+ clientFallback instanceof AbfsDfsClient);
+ }
+
+ /**
+ * Creates a file over Blob and attempts to append over DFS.
+ * It should fallback to Blob when appending to the file fails.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Test
+ public void testCreateOverBlobAppendOverDfs() throws IOException {
+ Assume.assumeFalse(
+ getConfiguration().getBoolean(FS_AZURE_TEST_APPENDBLOB_ENABLED,
+ false));
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
+ conf.set(FS_AZURE_INGRESS_SERVICE_TYPE,
+ String.valueOf(AbfsServiceType.DFS));
+ final AzureBlobFileSystem fs = (AzureBlobFileSystem)
FileSystem.newInstance(
+ conf);
+ Path testPath = path(TEST_FILE_PATH);
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore()
+ .getAbfsConfiguration()
+ .setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
+ fs.getAbfsStore().getAbfsConfiguration().set(FS_AZURE_INGRESS_SERVICE_TYPE,
+ String.valueOf(AbfsServiceType.DFS));
+ fs.getAbfsStore().getClientHandler().getBlobClient().
+ createPath(makeQualified(testPath).toUri().getPath(), true, false,
+ permissions, false, null,
+ null, getTestTracingContext(fs, true));
+ FSDataOutputStream outputStream = fs.append(testPath);
+ outputStream.write(TEN);
+ outputStream.hsync();
+ outputStream.write(TWENTY);
+ outputStream.hsync();
+ outputStream.write(THIRTY);
+ outputStream.hsync();
+ }
+
+ /**
+ * Creates an Append Blob over Blob and attempts to append over DFS.
+ * It should fallback to Blob when appending to the file fails.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Test
+ public void testCreateAppendBlobOverBlobEndpointAppendOverDfs()
+ throws IOException, NoSuchFieldException, IllegalAccessException {
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
+ conf.set(FS_AZURE_INGRESS_SERVICE_TYPE,
+ String.valueOf(AbfsServiceType.DFS));
+ final AzureBlobFileSystem fs = Mockito.spy(
+ (AzureBlobFileSystem) FileSystem.newInstance(conf));
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ Mockito.doReturn(true).when(store).isAppendBlobKey(anyString());
+
+ // Set abfsStore as our mocked value.
+ Field privateField = AzureBlobFileSystem.class.getDeclaredField(
+ "abfsStore");
+ privateField.setAccessible(true);
+ privateField.set(fs, store);
+ Path testPath = path(TEST_FILE_PATH);
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore()
+ .getAbfsConfiguration()
+ .setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
+ fs.getAbfsStore().getAbfsConfiguration().set(FS_AZURE_INGRESS_SERVICE_TYPE,
+ String.valueOf(AbfsServiceType.DFS));
+ fs.getAbfsStore().getClientHandler().getBlobClient().
+ createPath(makeQualified(testPath).toUri().getPath(), true, false,
+ permissions, true, null,
+ null, getTestTracingContext(fs, true));
+ FSDataOutputStream outputStream = fs.append(testPath);
+ outputStream.write(TEN);
+ outputStream.hsync();
+ outputStream.write(TWENTY);
+ outputStream.hsync();
+ outputStream.write(THIRTY);
+ outputStream.hsync();
+ }
+
+ /**
+ * Creates an append Blob over DFS and attempts to append over Blob.
+ * It should fallback to DFS when appending to the file fails.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Test
+ public void testCreateAppendBlobOverDfsEndpointAppendOverBlob()
+ throws IOException, NoSuchFieldException, IllegalAccessException {
+ Assume.assumeTrue(
+ "FNS does not support append blob creation for DFS endpoint",
+ getIsNamespaceEnabled(getFileSystem()));
+ final AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ Mockito.doReturn(true).when(store).isAppendBlobKey(anyString());
+
+ // Set abfsStore as our mocked value.
+ Field privateField = AzureBlobFileSystem.class.getDeclaredField(
+ "abfsStore");
+ privateField.setAccessible(true);
+ privateField.set(fs, store);
+ Path testPath = path(TEST_FILE_PATH);
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore().getClientHandler().getDfsClient().
+ createPath(makeQualified(testPath).toUri().getPath(), true, false,
+ permissions, true, null,
+ null, getTestTracingContext(fs, true));
+ fs.getAbfsStore()
+ .getAbfsConfiguration()
+ .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
+ FSDataOutputStream outputStream = fs.append(testPath);
+ AzureIngressHandler ingressHandler
+ = ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
+ AbfsClient client = ingressHandler.getClient();
+ Assert.assertTrue("Blob client was not used before fallback",
+ client instanceof AbfsBlobClient);
+ outputStream.write(TEN);
+ outputStream.hsync();
+ outputStream.write(TWENTY);
+ outputStream.hsync();
+ outputStream.write(THIRTY);
+ outputStream.flush();
+ AzureIngressHandler ingressHandlerFallback
+ = ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
+ AbfsClient clientFallback = ingressHandlerFallback.getClient();
+ Assert.assertTrue("DFS client was not used after fallback",
+ clientFallback instanceof AbfsDfsClient);
+ }
+
+
+ /**
+ * Tests the correct retrieval of the AzureIngressHandler based on the
configured ingress service type.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Test
+ public void testValidateIngressHandler() throws IOException {
+ Configuration configuration = getRawConfiguration();
+ configuration.set(FS_AZURE_INGRESS_SERVICE_TYPE,
+ AbfsServiceType.BLOB.name());
+ AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
+ configuration);
+ Path testPath = path(TEST_FILE_PATH);
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore().getClientHandler().getBlobClient().
+ createPath(makeQualified(testPath).toUri().getPath(), true,
+ false,
+ permissions, false, null,
+ null, getTestTracingContext(fs, true), getIsNamespaceEnabled(fs));
+ FSDataOutputStream outputStream = fs.append(testPath);
+ AzureIngressHandler ingressHandler
+ = ((AbfsOutputStream)
outputStream.getWrappedStream()).getIngressHandler();
+ Assert.assertTrue("Ingress handler instance is not correct",
+ ingressHandler instanceof AzureBlobIngressHandler);
+ AbfsClient client = ingressHandler.getClient();
+ Assert.assertTrue("Blob client was not used correctly",
+ client instanceof AbfsBlobClient);
+
+ Path testPath1 = new Path("testFile1");
+ fs.getAbfsStore().getClientHandler().getBlobClient().
+ createPath(makeQualified(testPath1).toUri().getPath(), true,
+ false,
+ permissions, false, null,
+ null, getTestTracingContext(fs, true), getIsNamespaceEnabled(fs));
+ fs.getAbfsStore()
+ .getAbfsConfiguration()
+ .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.DFS.name());
+ FSDataOutputStream outputStream1 = fs.append(testPath1);
+ AzureIngressHandler ingressHandler1
+ = ((AbfsOutputStream)
outputStream1.getWrappedStream()).getIngressHandler();
+ Assert.assertTrue("Ingress handler instance is not correct",
+ ingressHandler1 instanceof AzureDFSIngressHandler);
+ AbfsClient client1 = ingressHandler1.getClient();
+ Assert.assertTrue("DFS client was not used correctly",
+ client1 instanceof AbfsDfsClient);
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testAppendImplicitDirectory() throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem();
+ final Path folderPath = new Path(TEST_FOLDER_PATH);
+ fs.mkdirs(folderPath);
+ fs.append(folderPath.getParent());
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testAppendFileNotExists() throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem();
+ final Path folderPath = new Path(TEST_FOLDER_PATH);
+ fs.append(folderPath);
+ }
+
+ /**
+ * Create directory over dfs endpoint and append over blob endpoint.
+ * Should return error as append is not supported for directory.
+ * **/
+ @Test(expected = IOException.class)
+ public void testCreateExplicitDirectoryOverDfsAppendOverBlob()
+ throws IOException {
+ final AzureBlobFileSystem fs = getFileSystem();
+ final Path folderPath = path(TEST_FOLDER_PATH);
+ AzureBlobFileSystemStore.Permissions permissions
+ = new AzureBlobFileSystemStore.Permissions(false,
+ FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
+ fs.getAbfsStore().getClientHandler().getDfsClient().
+ createPath(makeQualified(folderPath).toUri().getPath(), false, false,
+ permissions, false, null,
+ null, getTestTracingContext(fs, true));
+ FSDataOutputStream outputStream = fs.append(folderPath);
+ outputStream.write(10);
+ outputStream.hsync();
+ }
+
+ /**
+ * Recreate file between append and flush. Etag mismatch happens.
+ **/
+ @Test(expected = IOException.class)
+ public void testRecreateAppendAndFlush() throws IOException {
+ final AzureBlobFileSystem fs = getFileSystem();
+ final Path filePath = path(TEST_FILE_PATH);
+ Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
+ fs.create(filePath);
+ AbfsClient abfsClient = fs.getAbfsStore()
+ .getClientHandler()
+ .getIngressClient();
+ Assume.assumeTrue("Skipping for DFS client",
+ abfsClient instanceof AbfsBlobClient);
+ FSDataOutputStream outputStream = fs.append(filePath);
+ outputStream.write(10);
+ final AzureBlobFileSystem fs1
+ = (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration());
+ FSDataOutputStream outputStream1 = fs1.create(filePath);
+ outputStream.hsync();
+ }
+
+ /**
+ * Recreate directory between append and flush. Etag mismatch happens.
+ **/
+ @Test(expected = IOException.class)
+ public void testRecreateDirectoryAppendAndFlush() throws IOException {
+ final AzureBlobFileSystem fs = getFileSystem();
+ final Path filePath = path(TEST_FILE_PATH);
+ fs.create(filePath);
+ FSDataOutputStream outputStream = fs.append(filePath);
+ outputStream.write(10);
+ final AzureBlobFileSystem fs1
+ = (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration());
+ fs1.mkdirs(filePath);
+ outputStream.hsync();
+ }
+
+ /**
+ * Verify that parallel write with same offset from different output streams
will not throw exception.
+ **/
+ @Test
+ public void testParallelWriteSameOffsetDifferentOutputStreams()
+ throws Exception {
+ Configuration configuration = getRawConfiguration();
+ configuration.set(FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, "false");
+ AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
+ configuration);
+ ExecutorService executorService = Executors.newFixedThreadPool(5);
+ List<Future<?>> futures = new ArrayList<>();
+
+ final byte[] b = new byte[8 * ONE_MB];
+ new Random().nextBytes(b);
+ final Path filePath = path(TEST_FILE_PATH);
+ // Create three output streams
+ FSDataOutputStream out1 = fs.create(filePath);
+ FSDataOutputStream out2 = fs.append(filePath);
+ FSDataOutputStream out3 = fs.append(filePath);
+
+ // Submit tasks to write to each output stream with the same offset
+ futures.add(executorService.submit(() -> {
+ try {
+ out1.write(b, 10, 200);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ futures.add(executorService.submit(() -> {
+ try {
+ out2.write(b, 10, 200);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ futures.add(executorService.submit(() -> {
+ try {
+ out3.write(b, 10, 200);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ int exceptionCaught = 0;
+ for (Future<?> future : futures) {
+ try {
+ future.get(); // wait for the task to complete and handle any
exceptions thrown by the lambda expression
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof RuntimeException) {
+ exceptionCaught++;
+ } else {
+ System.err.println("Unexpected exception caught: " + cause);
+ }
+ } catch (InterruptedException e) {
+ // handle interruption
+ }
+ }
+ assertEquals(exceptionCaught, 0);
+ }
+
+ /**
+ * Verify that parallel write for different content length will not throw
exception.
+ **/
+ @Test
+ public void testParallelWriteDifferentContentLength() throws Exception {
+ Configuration configuration = getRawConfiguration();
+ configuration.set(FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, "false");
+ FileSystem fs = FileSystem.newInstance(configuration);
+ ExecutorService executorService = Executors.newFixedThreadPool(5);
+ List<Future<?>> futures = new ArrayList<>();
+
+ final Path filePath = path(TEST_FILE_PATH);
+ // Create three output streams with different content length
+ FSDataOutputStream out1 = fs.create(filePath);
+ final byte[] b1 = new byte[8 * ONE_MB];
+ new Random().nextBytes(b1);
+
+ FSDataOutputStream out2 = fs.append(filePath);
+ FSDataOutputStream out3 = fs.append(filePath);
+
+ // Submit tasks to write to each output stream
+ futures.add(executorService.submit(() -> {
+ try {
+ out1.write(b1, 10, 200);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ futures.add(executorService.submit(() -> {
+ try {
+ out2.write(b1, 20, 300);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ futures.add(executorService.submit(() -> {
+ try {
+ out3.write(b1, 30, 400);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ int exceptionCaught = 0;
+ for (Future<?> future : futures) {
+ try {
+ future.get(); // wait for the task to complete and handle any
exceptions thrown by the lambda expression
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof RuntimeException) {
+ exceptionCaught++;
+ } else {
+ System.err.println("Unexpected exception caught: " + cause);
+ }
+ } catch (InterruptedException e) {
+ // handle interruption
+ }
+ }
+ assertEquals(exceptionCaught, 0);
+ }
+
+ /**
+ * Verify that parallel write for different content length will not throw
exception.
+ **/
+ @Test
+ public void testParallelWriteOutputStreamClose() throws Exception {
+ AzureBlobFileSystem fs = getFileSystem();
+ Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
+ final Path secondarytestfile = new Path("secondarytestfile");
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ List<Future<?>> futures = new ArrayList<>();
+
+ FSDataOutputStream out1 = fs.create(secondarytestfile);
+ AbfsClient abfsClient = fs.getAbfsStore()
+ .getClientHandler()
+ .getIngressClient();
+ Assume.assumeTrue("Skipping for DFS client",
+ abfsClient instanceof AbfsBlobClient);
+ AbfsOutputStream outputStream1 = (AbfsOutputStream)
out1.getWrappedStream();
+ String fileETag = outputStream1.getIngressHandler().getETag();
+ final byte[] b1 = new byte[8 * ONE_MB];
+ new Random().nextBytes(b1);
+ final byte[] b2 = new byte[8 * ONE_MB];
+ new Random().nextBytes(b2);
+
+ FSDataOutputStream out2 = fs.append(secondarytestfile);
+
+ // Submit tasks to write to each output stream
+ futures.add(executorService.submit(() -> {
+ try {
+ out1.write(b1, 0, 200);
+ out1.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ futures.add(executorService.submit(() -> {
+ try {
+ out2.write(b2, 0, 400);
+ out2.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ int exceptionCaught = 0;
+
+ for (Future<?> future : futures) {
+ try {
+ future.get(); // wait for the task to complete and handle any
exceptions thrown by the lambda expression
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof RuntimeException) {
+ exceptionCaught++;
+ } else {
+ System.err.println("Unexpected exception caught: " + cause);
+ }
+ } catch (InterruptedException e) {
+ // handle interruption
+ }
+ }
+
+ assertEquals(exceptionCaught, 1);
+ // Validate that the data written in the buffer is the same as what was
read
+ final byte[] readBuffer = new byte[8 * ONE_MB];
+ int result;
+ FSDataInputStream inputStream = fs.open(secondarytestfile);
+ inputStream.seek(0);
+
+ AbfsOutputStream outputStream2 = (AbfsOutputStream)
out1.getWrappedStream();
+ String out1Etag = outputStream2.getIngressHandler().getETag();
+
+ AbfsOutputStream outputStream3 = (AbfsOutputStream)
out2.getWrappedStream();
+ String out2Etag = outputStream3.getIngressHandler().getETag();
+
+ if (!fileETag.equals(out1Etag)) {
+ result = inputStream.read(readBuffer, 0, 4 * ONE_MB);
+ assertEquals(result,
+ 200); // Verify that the number of bytes read matches the number of
bytes written
+ assertArrayEquals(
+ Arrays.copyOfRange(readBuffer, 0, result), Arrays.copyOfRange(b1, 0,
+ result)); // Verify that the data read matches the original data
written
+ } else if (!fileETag.equals(out2Etag)) {
+ result = inputStream.read(readBuffer, 0, 4 * ONE_MB);
+ assertEquals(result,
+ 400); // Verify that the number of bytes read matches the number of
bytes written
+ assertArrayEquals(Arrays.copyOfRange(readBuffer, 0, result),
+ Arrays.copyOfRange(b2, 0,
+ result)); // Verify that the data read matches the original data
written
+ } else {
+ fail("Neither out1 nor out2 was flushed successfully.");
+ }
+ }
+
+ /**
+ * Verify that once flushed etag changes.
+ **/
+ @Test
+ public void testEtagMismatch() throws Exception {
+ AzureBlobFileSystem fs = getFileSystem();
+ final Path filePath = path(TEST_FILE_PATH);
+ Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
+ FSDataOutputStream out1 = fs.create(filePath);
+ FSDataOutputStream out2 = fs.create(filePath);
+ AbfsClient abfsClient = fs.getAbfsStore()
+ .getClientHandler()
+ .getIngressClient();
+ Assume.assumeTrue("Skipping for DFS client",
+ abfsClient instanceof AbfsBlobClient);
+ out2.write(10);
+ out2.hsync();
+ out1.write(10);
+ intercept(IOException.class, () -> out1.hsync());
+ }
+
+ @Test
+ public void testAppendWithLease() throws Exception {
+ final Path testFilePath = new Path(path(methodName.getMethodName()),
+ TEST_FILE_PATH);
+ final AzureBlobFileSystem fs = Mockito.spy(
+ getCustomFileSystem(testFilePath.getParent(), 1));
+ FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL,
+ FsAction.ALL);
+ FsPermission umask = new FsPermission(FsAction.NONE, FsAction.NONE,
+ FsAction.NONE);
+ AbfsOutputStream outputStream = (AbfsOutputStream) fs.getAbfsStore()
+ .createFile(testFilePath, null, true,
+ permission, umask, getTestTracingContext(fs, true));
+ outputStream.write(10);
+ outputStream.close();
+ assertNotNull(outputStream.getLeaseId());
+ }
+
+ private AzureBlobFileSystem getCustomFileSystem(Path infiniteLeaseDirs,
+ int numLeaseThreads) throws Exception {
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(String.format("fs.%s.impl.disable.cache", getAbfsScheme()),
+ true);
+ conf.set(FS_AZURE_INFINITE_LEASE_KEY, infiniteLeaseDirs.toUri().getPath());
+ conf.setInt(FS_AZURE_LEASE_THREADS, numLeaseThreads);
+ FileSystem fileSystem = FileSystem.newInstance(conf);
+ return (AzureBlobFileSystem) fileSystem;
+ }
+
+ @Test
+ public void testAppendImplicitDirectoryAzcopy() throws Exception {
+ AzureBlobFileSystem fs = getFileSystem();
+ createAzCopyFolder(new Path("/src"));
+ createAzCopyFile(new Path("/src/file"));
+ intercept(FileNotFoundException.class, () -> fs.append(new Path("/src")));
+ }
+
+ /**
+ * If a write operation fails asynchronously, when the next write comes once
failure is
+ * registered, that operation would fail with the exception caught on
previous
+ * write operation.
+ * The next close, hsync, hflush would also fail for the last caught
exception.
+ */
+ @Test
+ public void testIntermittentAppendFailureToBeReported() throws Exception {
+ AzureBlobFileSystem fs = Mockito.spy(
+ (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()));
+ Assume.assumeTrue(!getIsNamespaceEnabled(fs));
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ Assume.assumeTrue(store.getClient() instanceof AbfsBlobClient);
+ Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
+
+ AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
+ AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
+
+ Mockito.doReturn(clientHandler).when(store).getClientHandler();
+ Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
+ Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
+
+ Mockito.doThrow(
+ new AbfsRestOperationException(503, "", "", new Exception()))
+ .when(blobClient)
+ .append(Mockito.anyString(), Mockito.any(byte[].class), Mockito.any(
+ AppendRequestParameters.class), Mockito.any(), Mockito.any(),
+ Mockito.any(TracingContext.class));
+
+ byte[] bytes = new byte[1024 * 1024 * 8];
+ new Random().nextBytes(bytes);
+
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ try (FSDataOutputStream os = createMockedOutputStream(fs, new
Path("/test/file"), blobClient)) {
+ os.write(bytes);
+ }
+ });
+
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ FSDataOutputStream os = createMockedOutputStream(fs, new
Path("/test/file/file1"), blobClient);
+ os.write(bytes);
+ os.close();
+ });
+
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ FSDataOutputStream os = createMockedOutputStream(fs, new
Path("/test/file/file2"), blobClient);
+ os.write(bytes);
+ os.hsync();
+ });
+
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ FSDataOutputStream os = createMockedOutputStream(fs, new
Path("/test/file/file3"), blobClient);
+ os.write(bytes);
+ os.hflush();
+ });
+
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ AbfsOutputStream os = (AbfsOutputStream) createMockedOutputStream(fs,
new Path("/test/file/file4"), blobClient).getWrappedStream();
+ os.write(bytes);
+ while (!os.areWriteOperationsTasksDone());
+ os.write(bytes);
+ });
+ }
+
+ private FSDataOutputStream createMockedOutputStream(AzureBlobFileSystem fs,
Path path, AbfsClient client) throws IOException {
+ AbfsOutputStream abfsOutputStream = Mockito.spy((AbfsOutputStream)
fs.create(path).getWrappedStream());
+ AzureIngressHandler ingressHandler =
Mockito.spy(abfsOutputStream.getIngressHandler());
+
Mockito.doReturn(ingressHandler).when(abfsOutputStream).getIngressHandler();
+ Mockito.doReturn(client).when(ingressHandler).getClient();
+
+ FSDataOutputStream fsDataOutputStream = Mockito.spy(new
FSDataOutputStream(abfsOutputStream, null));
+ return fsDataOutputStream;
+ }
+
+ /**
+ * Test to check when async write takes time, the close, hsync, hflush method
+ * wait to get async ops completed and then flush. If async ops fail, the
methods
+ * will throw exception.
+ */
+ @Test
+ public void testWriteAsyncOpFailedAfterCloseCalled() throws Exception {
+ AzureBlobFileSystem fs = Mockito.spy(
+ (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()));
+ Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
+ AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
+ AbfsDfsClient dfsClient = Mockito.spy(clientHandler.getDfsClient());
+
+ AbfsClient client = clientHandler.getIngressClient();
+ if (clientHandler.getIngressClient() instanceof AbfsBlobClient) {
+ Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
+ Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
+ } else {
+ Mockito.doReturn(dfsClient).when(clientHandler).getDfsClient();
+ Mockito.doReturn(dfsClient).when(clientHandler).getIngressClient();
+ }
+ Mockito.doReturn(clientHandler).when(store).getClientHandler();
+
+ byte[] bytes = new byte[1024 * 1024 * 8];
+ new Random().nextBytes(bytes);
+
+ AtomicInteger count = new AtomicInteger(0);
+
+ Mockito.doAnswer(answer -> {
+ count.incrementAndGet();
+ while (count.get() < 2);
+ Thread.sleep(1000);
+ throw new AbfsRestOperationException(503, "", "", new Exception());
+ })
+ .when(client instanceof AbfsBlobClient ? blobClient : dfsClient)
+ .append(Mockito.anyString(), Mockito.any(byte[].class), Mockito.any(
+ AppendRequestParameters.class), Mockito.any(), Mockito.any(),
+ Mockito.any(TracingContext.class));
+
+ Mockito.doAnswer(answer -> {
+ count.incrementAndGet();
+ while (count.get() < 2);
+ Thread.sleep(1000);
+ throw new AbfsRestOperationException(503, "", "", new Exception());
+ })
+ .when(client instanceof AbfsBlobClient ? blobClient : dfsClient)
+ .append(Mockito.anyString(), Mockito.any(byte[].class), Mockito.any(
+ AppendRequestParameters.class), Mockito.any(), Mockito.any(),
+ Mockito.any(TracingContext.class));
+
+ FSDataOutputStream os = createMockedOutputStream(fs, new
Path("/test/file"),
+ client instanceof AbfsBlobClient ? blobClient : dfsClient);
+ os.write(bytes);
+ os.write(bytes);
+ LambdaTestUtils.intercept(IOException.class, os::close);
+
+ count.set(0);
+ FSDataOutputStream os1 = createMockedOutputStream(fs,
+ new Path("/test/file1"),
+ client instanceof AbfsBlobClient ? blobClient : dfsClient);
+ os1.write(bytes);
+ os1.write(bytes);
+ LambdaTestUtils.intercept(IOException.class, os1::hsync);
+
+ count.set(0);
+ FSDataOutputStream os2 = createMockedOutputStream(fs,
+ new Path("/test/file2"),
+ client instanceof AbfsBlobClient ? blobClient : dfsClient);
+ os2.write(bytes);
+ os2.write(bytes);
+ LambdaTestUtils.intercept(IOException.class, os2::hflush);
+ }
+
+ /**
+ * Helper method that generates blockId.
+ * @param position The offset needed to generate blockId.
+ * @return String representing the block ID generated.
+ */
+ private String generateBlockId(AbfsOutputStream os, long position) {
+ String streamId = os.getStreamID();
+ String streamIdHash = Integer.toString(streamId.hashCode());
+ String blockId = String.format("%d_%s", position, streamIdHash);
+ byte[] blockIdByteArray = new byte[BLOCK_ID_LENGTH];
+ System.arraycopy(blockId.getBytes(), 0, blockIdByteArray, 0,
Math.min(BLOCK_ID_LENGTH, blockId.length()));
+ return new String(Base64.encodeBase64(blockIdByteArray),
StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Test to simulate a successful flush operation followed by a connection
reset
+ * on the response, triggering a retry.
+ *
+ * This test verifies that the flush operation is retried in the event of a
+ * connection reset during the response phase. The test creates a mock
+ * AzureBlobFileSystem and its associated components to simulate the flush
+ * operation and the connection reset. It then verifies that the flush
+ * operation is retried once before succeeding if the md5hash matches.
+ *
+ * @throws Exception if an error occurs during the test execution.
+ */
+ @Test
+ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws
Exception {
+ // Create a spy of AzureBlobFileSystem
+ AzureBlobFileSystem fs = Mockito.spy(
+ (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()));
+ Assume.assumeTrue(!getIsNamespaceEnabled(fs));
+
+ // Create a spy of AzureBlobFileSystemStore
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ Assume.assumeTrue(store.getClient() instanceof AbfsBlobClient);
+ Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
+
+ // Create spies for the client handler and blob client
+ AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
+ AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
+
+ // Set up the spies to return the mocked objects
+ Mockito.doReturn(clientHandler).when(store).getClientHandler();
+ Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
+ Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
+ AtomicInteger flushCount = new AtomicInteger(0);
+ FSDataOutputStream os = createMockedOutputStream(fs, new
Path("/test/file"), blobClient);
+ AbfsOutputStream out = (AbfsOutputStream) os.getWrappedStream();
+ String eTag = out.getIngressHandler().getETag();
+ byte[] bytes = new byte[1024 * 1024 * 8];
+ new Random().nextBytes(bytes);
+ // Write some bytes and attempt to flush, which should retry
+ out.write(bytes);
+ List<String> list = new ArrayList<>();
+ list.add(generateBlockId(out, 0));
+ String blockListXml = generateBlockListXml(list);
+
+ Mockito.doAnswer(answer -> {
+ // Set up the mock for the flush operation
+ AbfsClientTestUtil.setMockAbfsRestOperationForFlushOperation(blobClient,
eTag, blockListXml,
+ (httpOperation) -> {
+ Mockito.doAnswer(invocation -> {
+ // Call the real processResponse method
+ invocation.callRealMethod();
+
+ int currentCount = flushCount.incrementAndGet();
+ if (currentCount == 1) {
+ Mockito.when(httpOperation.getStatusCode())
+ .thenReturn(
+ 500); // Status code 500 for Internal Server Error
+ Mockito.when(httpOperation.getStorageErrorMessage())
+ .thenReturn("CONNECTION_RESET"); // Error message
+ throw new IOException("Connection Reset");
+ }
+ return null;
+ }).when(httpOperation).processResponse(
+ Mockito.nullable(byte[].class),
+ Mockito.anyInt(),
+ Mockito.anyInt()
+ );
+
+ return httpOperation;
+ });
+ return answer.callRealMethod();
+ }).when(blobClient).flush(
+ Mockito.any(byte[].class),
+ Mockito.anyString(),
+ Mockito.anyBoolean(),
+ Mockito.nullable(String.class),
+ Mockito.nullable(String.class),
+ Mockito.anyString(),
+ Mockito.nullable(ContextEncryptionAdapter.class),
+ Mockito.any(TracingContext.class)
+ );
+
+ out.hsync();
+ out.close();
+ Mockito.verify(blobClient, Mockito.times(1)).flush(
+ Mockito.any(byte[].class),
+ Mockito.anyString(),
+ Mockito.anyBoolean(),
+ Mockito.nullable(String.class),
+ Mockito.nullable(String.class),
+ Mockito.anyString(),
+ Mockito.nullable(ContextEncryptionAdapter.class),
+ Mockito.any(TracingContext.class));
+ }
+
+ /**
+ * Test to simulate a successful flush operation followed by a connection
reset
+ * on the response, triggering a retry.
+ *
+ * This test verifies that the flush operation is retried in the event of a
+ * connection reset during the response phase. The test creates a mock
+ * AzureBlobFileSystem and its associated components to simulate the flush
+ * operation and the connection reset. It then verifies that the flush
+ * operation is retried once before succeeding if the md5hash matches.
+ *
+ * @throws Exception if an error occurs during the test execution.
+ */
+ @Test
+ public void testFlushSuccessWithConnectionResetOnResponseInvalidMd5() throws
Exception {
+ // Create a spy of AzureBlobFileSystem
+ AzureBlobFileSystem fs = Mockito.spy(
+ (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()));
+ Assume.assumeTrue(!getIsNamespaceEnabled(fs));
+
+ // Create a spy of AzureBlobFileSystemStore
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ Assume.assumeTrue(store.getClient() instanceof AbfsBlobClient);
+ Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
+
+ // Create spies for the client handler and blob client
+ AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
+ AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
+
+ // Set up the spies to return the mocked objects
+ Mockito.doReturn(clientHandler).when(store).getClientHandler();
+ Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
+ Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
+ AtomicInteger flushCount = new AtomicInteger(0);
+ FSDataOutputStream os = createMockedOutputStream(fs,
+ new Path("/test/file"), blobClient);
+ AbfsOutputStream out = (AbfsOutputStream) os.getWrappedStream();
+ String eTag = out.getIngressHandler().getETag();
+ byte[] bytes = new byte[1024 * 1024 * 8];
+ new Random().nextBytes(bytes);
+ // Write some bytes and attempt to flush, which should retry
+ out.write(bytes);
+ List<String> list = new ArrayList<>();
+ list.add(generateBlockId(out, 0));
+ String blockListXml = generateBlockListXml(list);
+
+ Mockito.doAnswer(answer -> {
+ // Set up the mock for the flush operation
+ AbfsClientTestUtil.setMockAbfsRestOperationForFlushOperation(blobClient,
+ eTag, blockListXml,
+ (httpOperation) -> {
+ Mockito.doAnswer(invocation -> {
+ // Call the real processResponse method
+ invocation.callRealMethod();
+
+ int currentCount = flushCount.incrementAndGet();
+ if (currentCount == 1) {
+ Mockito.when(httpOperation.getStatusCode())
+ .thenReturn(
+ 500); // Status code 500 for Internal Server Error
+ Mockito.when(httpOperation.getStorageErrorMessage())
+ .thenReturn("CONNECTION_RESET"); // Error message
+ throw new IOException("Connection Reset");
+ } else if (currentCount == 2) {
+ Mockito.when(httpOperation.getStatusCode())
+ .thenReturn(200);
+ Mockito.when(httpOperation.getStorageErrorMessage())
+ .thenReturn("HTTP_OK");
+ }
+ return null;
+ }).when(httpOperation).processResponse(
+ Mockito.nullable(byte[].class),
+ Mockito.anyInt(),
+ Mockito.anyInt()
+ );
+
+ return httpOperation;
+ });
+ return answer.callRealMethod();
+ }).when(blobClient).flush(
+ Mockito.any(byte[].class),
+ Mockito.anyString(),
+ Mockito.anyBoolean(),
+ Mockito.nullable(String.class),
+ Mockito.nullable(String.class),
+ Mockito.anyString(),
+ Mockito.nullable(ContextEncryptionAdapter.class),
+ Mockito.any(TracingContext.class)
+ );
+
+ FSDataOutputStream os1 = createMockedOutputStream(fs,
+ new Path("/test/file"), blobClient);
+ AbfsOutputStream out1 = (AbfsOutputStream) os1.getWrappedStream();
+ byte[] bytes1 = new byte[1024 * 1024 * 8];
+ new Random().nextBytes(bytes1);
+ out1.write(bytes1);
+
+ //parallel flush call should lead to the first call failing because of md5
mismatch.
+ Thread parallelFlushThread = new Thread(() -> {
+ try {
+ out1.hsync();
+ } catch (IOException e) {
+ }
+ });
+
+ parallelFlushThread.start(); // Start the parallel flush operation
+ parallelFlushThread.join();
+ // Perform the first flush operation
+ intercept(IOException.class,
+ "The condition specified using HTTP conditional header(s) is not met.",
+ out::hsync
+ );
+ }
+}
Review Comment:
taken
> ABFS: [FnsOverBlob] Implementing Ingress Support with various Fallback
> Handling
> -------------------------------------------------------------------------------
>
> Key: HADOOP-19232
> URL: https://issues.apache.org/jira/browse/HADOOP-19232
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.4.0
> Reporter: Descifrado
> Assignee: Anmol Asrani
> Priority: Major
> Labels: pull-request-available
>
> Scope of this task is to refactor the AbfsOutputStream class to handle the
> ingress for DFS and Blob endpoint effectively.
> More details will be added soon.
> Perquisites for this Patch:
> 1. [HADOOP-19187] ABFS: [FnsOverBlob]Making AbfsClient Abstract for
> supporting both DFS and Blob Endpoint - ASF JIRA (apache.org)
> 2. [HADOOP-19226] ABFS: [FnsOverBlob]Implementing Azure Rest APIs on Blob
> Endpoint for AbfsBlobClient - ASF JIRA (apache.org)
> 3. [HADOOP-19207] ABFS: [FnsOverBlob]Response Handling of Blob Endpoint APIs
> and Metadata APIs - ASF JIRA (apache.org)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]