[
https://issues.apache.org/jira/browse/HADOOP-19522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17945093#comment-17945093
]
ASF GitHub Bot commented on HADOOP-19522:
-----------------------------------------
anujmodi2021 commented on code in PR #7559:
URL: https://github.com/apache/hadoop/pull/7559#discussion_r2047122530
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java:
##########
@@ -0,0 +1,860 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity;
+import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND;
+import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test rename recovery operation.
+ */
+public class ITestAzureBlobFileSystemRenameRecovery extends
+ AbstractAbfsIntegrationTest {
+
+ private static final int FAILED_CALL = 15;
+
+ private static final int TOTAL_FILES = 25;
+
+ private static final int TOTAL_THREADS_IN_POOL = 5;
+
+ public ITestAzureBlobFileSystemRenameRecovery() throws Exception {
+ super();
+ }
+
+ /**
+ * Triggers rename recovery by calling getPathStatus on the source path.
+ * This simulates a scenario where the rename operation was interrupted,
+ * and the system needs to recover the state of the source path.
+ *
+ * @param fs The AzureBlobFileSystem instance.
+ * @param src The source path to trigger recovery on.
+ * @throws Exception If an error occurs during the recovery process.
+ */
+ private void triggerRenameRecovery(AzureBlobFileSystem fs, Path src) throws
Exception {
+ // Trigger rename recovery
+ TracingContext tracingContext = new TracingContext(
+ getConfiguration().getClientCorrelationId(), fs.getFileSystemId(),
+ FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT,
null);
+ AzureServiceErrorCode errorCode = LambdaTestUtils.intercept(
+ AbfsRestOperationException.class, () -> {
+ fs.getAbfsStore().getClient().getPathStatus(src.toUri().getPath(),
true,
+ tracingContext, null);
+ }).getErrorCode();
+ Assertions.assertThat(errorCode)
+ .describedAs("Path had to be recovered from atomic rename operation.")
+ .isEqualTo(PATH_NOT_FOUND);
+ }
+
+ /**
+ * Simulates a failure during the rename operation by throwing an exception
+ * when the copyBlob method is called. This is used to test the behavior of
+ * the rename recovery operation when a blob already exists at the
destination.
+ *
+ * @param client The AbfsBlobClient instance.
+ * @param copyCall The AtomicInteger to track the number of copy calls.
+ * @throws AzureBlobFileSystemException If an error occurs during the
operation.
+ */
+ private void renameCrashInBetween(AbfsBlobClient client, AtomicInteger
copyCall)
+ throws AzureBlobFileSystemException {
+ Mockito.doAnswer(copyRequest -> {
+ if (copyCall.get() == FAILED_CALL) {
+ throw new AbfsRestOperationException(
+ BLOB_ALREADY_EXISTS.getStatusCode(),
+ BLOB_ALREADY_EXISTS.getErrorCode(),
+ BLOB_ALREADY_EXISTS.getErrorMessage(),
+ new Exception());
+ }
+ copyCall.incrementAndGet();
+ return copyRequest.callRealMethod();
+ }).when(client).copyBlob(Mockito.any(Path.class),
+ Mockito.any(Path.class), Mockito.nullable(String.class),
+ Mockito.any(TracingContext.class));
+ }
+
+ /**
+ * Helper method to create the configuration for the AzureBlobFileSystem.
+ *
+ * @return The configuration object.
+ */
+ private Configuration getConfig() {
+ Configuration config = new Configuration(this.getRawConfiguration());
+ config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "5");
+ config.set(FS_AZURE_CONSUMER_MAX_LAG, "3");
+ config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, "2");
+ return config;
+ }
+
+ /**
+ * Spies on the AzureBlobFileSystem's store and client to enable mocking and
verification
+ * of client interactions in tests. It replaces the actual store and client
with mocked versions.
+ *
+ * @param fs the AzureBlobFileSystem instance
+ * @return the spied AbfsClient for interaction verification
+ */
+ private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) {
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ Mockito.doReturn(store).when(fs).getAbfsStore();
+ AbfsClient client = Mockito.spy(store.getClient());
+ Mockito.doReturn(client).when(store).getClient();
+ return client;
+ }
+
+ /**
+ * Helper method to validate that the rename was successful and that the
destination exists.
+ *
+ * @param fs The AzureBlobFileSystem instance to check the existence on.
+ * @param dst The destination path.
+ * @param src The source path.
+ * @throws IOException If an I/O error occurs during the validation.
+ */
+ private void validateRename(AzureBlobFileSystem fs, Path src, Path dst,
+ boolean isSrcExist, boolean isDstExist, boolean isJsonExist) throws
Exception {
+ // Validate pending JSON file status
+ assertPathStatus(fs,
+ new Path(src.getParent(), src.getName() + SUFFIX), isJsonExist,
+ "Pending JSON file");
+
+ // Validate source directory status
+ assertPathStatus(fs, src, isSrcExist, "Source directory");
+
+ // Validate destination directory status
+ assertPathStatus(fs, dst, isDstExist, "Destination directory");
+ }
+
+ /**
+ * Helper method to assert the status of a path in the AzureBlobFileSystem.
+ *
+ * @param fs The AzureBlobFileSystem instance to check the existence on.
+ * @param path The path to check.
+ * @param shouldExist Whether the path should exist or not.
+ * @param description A description for the assertion.
+ * @throws Exception If an error occurs during the assertion.
+ */
+ private void assertPathStatus(AzureBlobFileSystem fs, Path path,
+ boolean shouldExist, String description) throws Exception{
+ TracingContext tracingContext = getTestTracingContext(fs, true);
+ AbfsBlobClient client = ((AbfsBlobClient) fs.getAbfsClient());
+ if (shouldExist) {
+ int actualStatus = client.getPathStatus(
+ path.toUri().getPath(), tracingContext,
+ null, true)
+ .getResult().getStatusCode();
+ Assertions.assertThat(actualStatus)
+ .describedAs("%s should exists", description)
+ .isEqualTo(HTTP_OK);
+ } else {
+ AzureServiceErrorCode errorCode = LambdaTestUtils.intercept(
+ AbfsRestOperationException.class, () -> {
+ client.getPathStatus(path.toUri().getPath(), true,
+ tracingContext, null);
+ }).getErrorCode();
+ Assertions.assertThat(errorCode)
+ .describedAs("%s should not exists", description)
+ .isEqualTo(BLOB_PATH_NOT_FOUND);
+ }
+ }
+
+ /**
+ * Helper method to create files in the given directory.
+ *
+ * @param fs The AzureBlobFileSystem instance to use for file creation.
+ * @param src The source path (directory).
+ * @param numFiles The number of files to create.
+ * @throws ExecutionException, InterruptedException If an error occurs
during file creation.
+ */
+ public static void createFiles(AzureBlobFileSystem fs, Path src, int
numFiles)
+ throws ExecutionException, InterruptedException {
+ ExecutorService executorService =
Executors.newFixedThreadPool(TOTAL_THREADS_IN_POOL);
+ List<Future> futures = new ArrayList<>();
+ for (int i = 0; i < numFiles; i++) {
+ final int iter = i;
+ Future future = executorService.submit(() ->
+ fs.create(new Path(src, "file" + iter + ".txt")));
+ futures.add(future);
+ }
+ for (Future future : futures) {
+ future.get();
+ }
+ executorService.shutdown();
+ }
+
+ /**
+ * Helper method to create a json file.
+ * @param path parent path
+ * @param renameJson rename json path
+ * @return file system
+ * @throws IOException in case of failure
+ */
+ private AzureBlobFileSystem createJsonFile(Path path, Path renameJson)
+ throws IOException {
+ final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
+ assumeBlobServiceType();
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ Mockito.doReturn(store).when(fs).getAbfsStore();
+ AbfsClient client = Mockito.spy(store.getClient());
+ Mockito.doReturn(client).when(store).getClient();
+
+ fs.setWorkingDirectory(new Path(ROOT_PATH));
+ fs.create(new Path(path, "file.txt"));
+
+ VersionedFileStatus fileStatus
+ = (VersionedFileStatus) fs.getFileStatus(path);
+
+ new RenameAtomicity(path, new Path("/hbase/test4"),
+ renameJson, getTestTracingContext(fs, true),
+ fileStatus.getEtag(), client)
+ .preRename();
+
+ Assertions.assertThat(fs.exists(renameJson))
+ .describedAs("Rename Pending Json file should exist.")
+ .isTrue();
+
+ return fs;
+ }
+
+ /**
+ * Tests renaming a directory with a failure during the copy operation.
+ * Simulates an error when copying on the 6th call.
+ */
+ @Test
+ public void testRenameCopyFailureInBetween() throws Exception {
+ try (AzureBlobFileSystem fs =
Mockito.spy(this.getFileSystem(getConfig()))) {
+ assumeBlobServiceType();
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+ fs.getAbfsStore().setClient(client);
+ Path src = new Path("/hbase/A1/A2");
+ Path dst = new Path("/hbase/A1/A3");
+
+ // Create sample files in the source directory
+ createFiles(fs, src, TOTAL_FILES);
+
+ // Track the number of copy operations
+ AtomicInteger copyCall = new AtomicInteger(0);
+ renameCrashInBetween(client, copyCall);
+ Assertions.assertThat(fs.rename(src, dst))
+ .describedAs("Rename should crash in between.")
+ .isFalse();
+
+ // Validate copy operation count
+ Assertions.assertThat(copyCall.get())
+ .describedAs("Copy operation count should be less than 10.")
+ .isLessThan(TOTAL_FILES);
+
+ // Assertions to validate renamed destination and source
+ validateRename(fs, src, dst, true, true, true);
+
+ // Validate that rename redo operation was triggered
+ copyCall.set(0);
+ triggerRenameRecovery(fs, src);
+
+ Assertions.assertThat(copyCall.get())
+ .describedAs("Copy operation count should be greater than 0.")
+ .isGreaterThan(0);
+
+ // Validate final state of destination and source
+ validateRename(fs, src, dst, false, true, false);
+ }
+ }
+
+ /**
+ * Tests renaming a directory with a failure during the delete operation.
+ * Simulates an error on the 6th delete operation and verifies the behavior.
+ */
+ @Test
+ public void testRenameDeleteFailureInBetween() throws Exception {
+ try (AzureBlobFileSystem fs =
Mockito.spy(this.getFileSystem(getConfig()))) {
+ assumeBlobServiceType();
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+ fs.getAbfsStore().setClient(client);
+ Path src = new Path("/hbase/A1/A2");
+ Path dst = new Path("/hbase/A1/A3");
+
+ // Create sample files in the source directory
+ createFiles(fs, src, TOTAL_FILES);
+
+ // Track the number of delete operations
+ AtomicInteger deleteCall = new AtomicInteger(0);
+ Mockito.doAnswer(deleteRequest -> {
+ if (deleteCall.get() == FAILED_CALL) {
+ throw new AbfsRestOperationException(
+ BLOB_PATH_NOT_FOUND.getStatusCode(),
+ BLOB_PATH_NOT_FOUND.getErrorCode(),
+ BLOB_PATH_NOT_FOUND.getErrorMessage(),
+ new Exception());
+ }
+ deleteCall.incrementAndGet();
+ return deleteRequest.callRealMethod();
+ }).when(client).deleteBlobPath(Mockito.any(Path.class),
+ Mockito.anyString(), Mockito.any(TracingContext.class));
+
+ Assertions.assertThat(fs.rename(src, dst))
+ .describedAs("Rename should crash in between.")
+ .isFalse();
+
+ // Validate delete operation count
+ Assertions.assertThat(deleteCall.get())
+ .describedAs("Delete operation count should be less than 10.")
+ .isLessThan(TOTAL_FILES);
+
+ // Assertions to validate renamed destination and source
+ validateRename(fs, src, dst, true, true, true);
+
+ // Validate that rename redo operation was triggered
+ deleteCall.set(0);
+ triggerRenameRecovery(fs, src);
+
+ Assertions.assertThat(deleteCall.get())
+ .describedAs("Delete operation count should be greater than 0.")
+ .isGreaterThan(0);
+
+ // Validate final state of destination and source
+ // Validate that delete redo operation was triggered
+ validateRename(fs, src, dst, false, true, false);
+ }
+ }
+
+ /**
+ * Tests renaming a directory with a failure during the copy operation.
+ * Since, destination path already exists, there will be adjustment in the
+ * destination path. After crash recovery, recovery should succeed even in
the
+ * case when destination path already exists.
+ * Simulates an error when copying on the 6th call.
+ */
+ @Test
+ public void testRenameRecoveryWhenDestAlreadyExist() throws Exception {
+ try (AzureBlobFileSystem fs =
Mockito.spy(this.getFileSystem(getConfig()))) {
+ assumeBlobServiceType();
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+ fs.getAbfsStore().setClient(client);
+ Path src = new Path("/hbase/A1/A2");
+ Path dst = new Path("/hbase/A1/A3");
+
+ // Create sample files in the source directory
+ createFiles(fs, src, TOTAL_FILES);
+ // Create the destination directory
+ fs.mkdirs(dst);
+
+ // Track the number of copy operations
+ AtomicInteger copyCall = new AtomicInteger(0);
+ renameCrashInBetween(client, copyCall);
+ Assertions.assertThat(fs.rename(src, dst))
+ .describedAs("Rename should crash in between.")
+ .isFalse();
+
+ // Validate copy operation count
+ Assertions.assertThat(copyCall.get())
+ .describedAs("Copy operation count should be less than 10.")
+ .isLessThan(TOTAL_FILES);
+
+ // Assertions to validate renamed destination and source
+ validateRename(fs, src, dst, true, true, true);
+
+ copyCall.set(0);
+ // List Status on src, this will internally do rename recovery
+ triggerRenameRecovery(fs, src);
+
+ Assertions.assertThat(copyCall.get())
+ .describedAs("Copy operation count should be greater than 0.")
+ .isGreaterThan(0);
+
+ // Validate final state of destination and source
+ validateRename(fs, src, dst, false, true, false);
+ }
+ }
+
+ /**
+ * Tests renaming a directory with a failure during the copy operation.
+ * Since, destination path already exists, there will be adjustment in the
+ * destination path. After crash recovery, recovery should succeed even in
the
+ * case when destination path already exists.
+ * Simulates an error when copying on the 6th call.
+ */
+ @Test
+ public void testRenameRecoveryWithMarkerPresentInDest() throws Exception {
+ try (AzureBlobFileSystem fs =
Mockito.spy(this.getFileSystem(getConfig()))) {
+ assumeBlobServiceType();
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+ fs.getAbfsStore().setClient(client);
+ Path src = new Path("/hbase/A1/A2");
+ Path dst = new Path("/hbase/A1/A3");
+
+ // Create sample files in the source directory
+ createFiles(fs, src, TOTAL_FILES);
+
+ // Track the number of copy operations
+ AtomicInteger copyCall = new AtomicInteger(0);
+ renameCrashInBetween(client, copyCall);
+ Assertions.assertThat(fs.rename(src, dst))
+ .describedAs("Rename should crash in between.")
+ .isFalse();
+
+ // Validate copy operation count
+ Assertions.assertThat(copyCall.get())
+ .describedAs("Copy operation count should be less than 10.")
+ .isLessThan(TOTAL_FILES);
+
+ // This will create marker file in the destination
+ fs.exists(dst);
+
+ copyCall.set(0);
+ // List Status on src, this will internally do rename recovery
+ triggerRenameRecovery(fs, src);
+
+ Assertions.assertThat(copyCall.get())
+ .describedAs("Copy operation count should be greater than 0.")
+ .isGreaterThan(0);
+
+ // Validate final state of destination and source
+ validateRename(fs, src, dst, false, true, false);
+ }
+ }
+
+ /**
+ * Test to check behaviour when rename is called on a atomic rename directory
+ * for which rename pending json file is already present.
+ * @throws Exception in case of failure
+ */
+ @Test
+ public void testRenameWhenAlreadyRenamePendingJsonFilePresent() throws
Exception {
+ try (AzureBlobFileSystem fs =
Mockito.spy(this.getFileSystem(getConfig()))) {
+ assumeBlobServiceType();
+ AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+ fs.getAbfsStore().setClient(client);
+ Path src = new Path("/hbase/A1/A2");
+ Path dst = new Path("/hbase/A1/A3");
+
+ // Create sample files in the source directory
+ createFiles(fs, src, TOTAL_FILES);
+
+ // Track the number of copy operations
+ AtomicInteger copyCall = new AtomicInteger(0);
+ renameCrashInBetween(client, copyCall);
+ Assertions.assertThat(fs.rename(src, dst))
+ .describedAs("Rename should crash in between.")
+ .isFalse();
+
+ // Validate copy operation count
+ Assertions.assertThat(copyCall.get())
+ .describedAs("Copy operation count should be less than 10.")
+ .isLessThan(TOTAL_FILES);
+
+ // Validate final state of destination and source
+ validateRename(fs, src, dst, true, true, true);
+
+ copyCall.set(0);
+ Assertions.assertThat(fs.rename(src, dst))
+ .describedAs("Rename should succeed.")
+ .isTrue();
+
+ Assertions.assertThat(copyCall.get())
+ .describedAs("Copy operation count should be greater than 0.")
+ .isGreaterThan(0);
+
+ // Validate final state of destination and source
+ validateRename(fs, src, dst, false, true, false);
+ }
+ }
+
+ /**
+ * Test case to verify crash recovery with a single child folder.
+ *
+ * This test simulates a scenario where a pending rename JSON file exists
for a single child folder
+ * under the parent directory. It ensures that when listing the files in the
parent directory,
+ * only the child folder (with the pending rename JSON file) is returned,
and no additional files are listed.
+ *
+ * @throws Exception if any error occurs during the test execution
+ */
+ @Test
+ public void testListCrashRecoveryWithSingleChildFolder() throws Exception {
+ AzureBlobFileSystem fs = null;
+ try {
+ Path path = new Path("/hbase/A1/A2");
+ Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+ fs = createJsonFile(path, renameJson);
+
+ FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1"));
+
+ Assertions.assertThat(fileStatuses.length)
+ .describedAs("List should return 1 file")
+ .isEqualTo(1);
+ } finally {
+ if (fs != null) {
+ fs.close();
Review Comment:
Thanks for checking, this might need update in multiple test where fs close
is happening.
> ABFS: [FnsOverBlob] Rename Recovery Should Succeed When Marker File Exists
> with Destination Directory
> -----------------------------------------------------------------------------------------------------
>
> Key: HADOOP-19522
> URL: https://issues.apache.org/jira/browse/HADOOP-19522
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.5.0, 3.4.1
> Reporter: Manish Bhatt
> Assignee: Manish Bhatt
> Priority: Blocker
> Labels: pull-request-available
>
> On the blob endpoint, since rename is not a direct operation but a
> combination of two operations—copy and delete—in the case of directory
> rename, we first rename all the blobs that have the source prefix and, at the
> end, rename the source to the destination.
> In the normal rename flow, renaming is not allowed if the destination already
> exists. However, in the case of recovery, there is a possibility that some
> files have already been renamed from the source to the destination. With the
> recent change ([HADOOP-19474] ABFS: [FnsOverBlob] Listing Optimizations to
> avoid multiple iteration over list response. - ASF JIRA), where we create a
> marker if the path is implicit, rename recovery will fail at the end when it
> tries to rename the source to the destination after renaming all the files.
> To fix this, while renaming the source to the destination, if we encounter an
> error indicating that the path already exists, we will suppress the error and
> mark the rename recovery as successful.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]