This is an automated email from the ASF dual-hosted git repository.
anujmodi pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new c5d43758bcf HADOOP-19658_3.4. ABFS:Create and rename idempotency for
FNS Blob (#7914) (#7929)
c5d43758bcf is described below
commit c5d43758bcf9ce698a2a77823be110f1f8b1e6fa
Author: Anmol Asrani <[email protected]>
AuthorDate: Thu Oct 16 03:56:24 2025 +0000
HADOOP-19658_3.4. ABFS:Create and rename idempotency for FNS Blob (#7914)
(#7929)
Contributed by Anmol Asrani
---
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 14 ++
.../fs/azurebfs/constants/ConfigurationKeys.java | 2 +
.../constants/FileSystemConfigurations.java | 2 +
.../fs/azurebfs/services/AbfsBlobClient.java | 32 ++-
.../hadoop/fs/azurebfs/services/AbfsIoUtils.java | 12 +-
.../fs/azurebfs/ITestAbfsNetworkStatistics.java | 263 +++++++++++----------
.../azurebfs/ITestAzureBlobFileSystemCreate.java | 110 ++++++++-
.../fs/azurebfs/ITestAzureBlobFileSystemLease.java | 2 +-
.../azurebfs/ITestAzureBlobFileSystemRename.java | 82 +++++++
.../fs/azurebfs/services/AbfsClientTestUtil.java | 117 +++++++++
10 files changed, 504 insertions(+), 132 deletions(-)
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index aa0548a009d..b72b7e83d9c 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -457,6 +457,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID)
private boolean enableClientTransactionId;
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY,
+ DefaultValue = DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY)
+ private boolean enableCreateIdempotency;
+
private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;
@@ -1005,6 +1009,12 @@ public String getAzureAtomicRenameDirs() {
}
public boolean isConditionalCreateOverwriteEnabled() {
+ // If either the configured FS service type or the ingress service type is
BLOB,
+ // conditional create-overwrite is not used.
+ if (getIsCreateIdempotencyEnabled() && (getFsConfiguredServiceType() ==
AbfsServiceType.BLOB
+ || getIngressServiceType() == AbfsServiceType.BLOB)) {
+ return false;
+ }
return this.enableConditionalCreateOverwrite;
}
@@ -1136,6 +1146,10 @@ public boolean getIsClientTransactionIdEnabled() {
return enableClientTransactionId;
}
+ public boolean getIsCreateIdempotencyEnabled() {
+ return enableCreateIdempotency;
+ }
+
/**
* Enum config to allow user to pick format of x-ms-client-request-id header
* @return tracingContextFormat config if valid, else default ALL_ID_FORMAT
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 16335c4d4d3..eaad2737e56 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -394,6 +394,8 @@ public static String containerProperty(String property,
String fsName, String ac
public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD =
"fs.azure.blob.dir.delete.max.thread";
/**Flag to enable/disable sending client transactional ID during
create/rename operations: {@value}*/
public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID =
"fs.azure.enable.client.transaction.id";
+ /**Flag to enable/disable create idempotency during create operation:
{@value}*/
+ public static final String FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY =
"fs.azure.enable.create.blob.idempotency";
private ConfigurationKeys() {}
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 827b6da413a..7234ca1a70d 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -232,5 +232,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID =
true;
+ public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY
= true;
+
private FileSystemConfigurations() {}
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
index bb46a97835f..77aca70990f 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
@@ -509,9 +509,34 @@ public AbfsRestOperation createPath(final String path,
final TracingContext tracingContext) throws AzureBlobFileSystemException
{
AbfsRestOperation op;
if (isFileCreation) {
- // Create a file with the specified parameters
- op = createFile(path, overwrite, permissions, isAppendBlob, eTag,
- contextEncryptionAdapter, tracingContext);
+ if (getAbfsConfiguration().getIsCreateIdempotencyEnabled()) {
+ AbfsRestOperation statusOp = null;
+ try {
+ // Check if the file already exists by calling GetPathStatus
+ statusOp = getPathStatus(path, tracingContext, null, false);
+ } catch (AbfsRestOperationException ex) {
+ // If the path does not exist, continue with file creation
+ // For other errors, rethrow the exception
+ if (ex.getStatusCode() != HTTP_NOT_FOUND) {
+ throw ex;
+ }
+ }
+ // If the file exists and overwrite is not allowed, throw conflict
+ if (statusOp != null && statusOp.hasResult() && !overwrite) {
+ throw new AbfsRestOperationException(
+ HTTP_CONFLICT,
+ AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+ PATH_EXISTS,
+ null);
+ } else {
+ // Proceed with file creation (force overwrite = true)
+ op = createFile(path, true, permissions, isAppendBlob, eTag,
+ contextEncryptionAdapter, tracingContext);
+ }
+ } else {
+ op = createFile(path, overwrite, permissions, isAppendBlob, eTag,
+ contextEncryptionAdapter, tracingContext);
+ }
} else {
// Create a directory with the specified parameters
op = createDirectory(path, permissions, isAppendBlob, eTag,
@@ -584,7 +609,6 @@ public AbfsRestOperation createPathRestOp(final String path,
if (eTag != null && !eTag.isEmpty()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH,
eTag));
}
-
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.PutBlob,
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java
index 44fa2d8d8bd..22fd9e15b6b 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.fs.azurebfs.services;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +56,15 @@ public static void dumpHeadersToDebugLog(final String origin,
if (key == null) {
key = "HTTP Response";
}
- String values = StringUtils.join(";", entry.getValue());
+ List<String> valuesList = entry.getValue();
+ if (valuesList == null) {
+ valuesList = Collections.emptyList();
+ } else {
+ valuesList = valuesList.stream()
+ .map(v -> v == null ? "" : v) // replace null with empty string
+ .collect(Collectors.toList());
+ }
+ String values = StringUtils.join(";", valuesList);
if (key.contains("Cookie")) {
values = "*cookie info*";
}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
index e66afbcaa74..4b6c9456504 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
@@ -25,9 +25,11 @@
import org.slf4j.LoggerFactory;
import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
@@ -41,6 +43,7 @@
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY;
public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
@@ -74,134 +77,69 @@ private int countDirectory(String path) {
public void testAbfsHttpSendStatistics() throws IOException {
describe("Test to check correct values of statistics after Abfs http send "
+ "request is done.");
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY, false);
+ FileSystem fileSystem = FileSystem.newInstance(conf);
+ try (AzureBlobFileSystem fs = (AzureBlobFileSystem) fileSystem) {
+ Map<String, Long> metricMap;
+ Path sendRequestPath = path(getMethodName());
+ String path = sendRequestPath.toString();
+ int directory = countDirectory(path);
+ String testNetworkStatsString = "http_send";
+
+ metricMap = getInstrumentationMap(fs);
+ long expectedConnectionsMade = metricMap.get(
+ CONNECTIONS_MADE.getStatName());
+ long expectedRequestsSent = metricMap.get(SEND_REQUESTS.getStatName());
+ long expectedBytesSent = 0;
+ AbfsClient client = fs.getAbfsStore()
+ .getClientHandler()
+ .getIngressClient();
- AzureBlobFileSystem fs = getFileSystem();
- Map<String, Long> metricMap;
- Path sendRequestPath = path(getMethodName());
- String path = sendRequestPath.toString();
- int directory = countDirectory(path);
- String testNetworkStatsString = "http_send";
-
- metricMap = getInstrumentationMap(fs);
- long expectedConnectionsMade =
metricMap.get(CONNECTIONS_MADE.getStatName());
- long expectedRequestsSent = metricMap.get(SEND_REQUESTS.getStatName());
- long expectedBytesSent = 0;
- AbfsClient client =
fs.getAbfsStore().getClientHandler().getIngressClient();
-
- // --------------------------------------------------------------------
- // Operation: Creating AbfsOutputStream
- try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
- sendRequestPath)) {
- // Network stats calculation: For Creating AbfsOutputStream:
- // 1 create request = 1 connection made and 1 send request
- if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
- expectedRequestsSent += (directory);
- // Per directory, we have 2 calls :- 1 PutBlob and 1 ListBlobs call.
- expectedConnectionsMade += ((directory * 2));
- } else {
- expectedRequestsSent++;
- expectedConnectionsMade++;
- }
- // --------------------------------------------------------------------
-
- // Operation: Write small data
- // Network stats calculation: No additions.
- // Data written is less than the buffer size and hence will not
- // trigger any append request to store
- out.write(testNetworkStatsString.getBytes());
- // --------------------------------------------------------------------
-
- // Operation: HFlush
- // Flushes all outstanding data (i.e. the current unfinished packet)
- // from the client into the service on all DataNode replicas.
- out.hflush();
- /*
- * Network stats calculation:
- * 3 possibilities here:
- * A. As there is pending data to be written to store, this will result
in:
- * 1 append + 1 flush = 2 connections and 2 send requests
- *
- * B. If config "fs.azure.enable.small.write.optimization" is enabled,
append
- * and flush call will be merged for small data in buffer in this test.
- * In which case it will be:
- * 1 append+flush request = 1 connection and 1 send request
- *
- * C. If the path is configured for append Blob files to be used, hflush
- * is a no-op. So in this case:
- * 1 append = 1 connection and 1 send request
- */
- if
(fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())
- ||
(fs.getAbfsStore().getAbfsConfiguration().isSmallWriteOptimizationEnabled())) {
- expectedConnectionsMade++;
- expectedRequestsSent++;
- } else {
- expectedConnectionsMade += 2;
- expectedRequestsSent += 2;
- }
- expectedBytesSent += testNetworkStatsString.getBytes().length;
// --------------------------------------------------------------------
-
- // Assertions
- metricMap = getInstrumentationMap(fs);
- assertAbfsStatistics(CONNECTIONS_MADE,
- expectedConnectionsMade, metricMap);
- assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent,
- metricMap);
- assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
- expectedBytesSent, metricMap);
- }
-
- // --------------------------------------------------------------------
- // Operation: AbfsOutputStream close.
- // Network Stats calculation: 1 flush (with close) is send.
- // 1 flush request = 1 connection and 1 send request
- // Flush with no data is a no-op for blob endpoint, hence update only for
dfs endpoint.
- if (client instanceof AbfsDfsClient) {
- expectedConnectionsMade++;
- expectedRequestsSent++;
- }
- // --------------------------------------------------------------------
-
- // Operation: Re-create the file / create overwrite scenario
- try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
- sendRequestPath)) {
- /*
- * Network Stats calculation: create overwrite
- * There are 2 possibilities here.
- * A. create overwrite results in 1 server call
- * create with overwrite=true = 1 connection and 1 send request
- *
- * B. If config "fs.azure.enable.conditional.create.overwrite" is
enabled,
- * create overwrite=false (will fail in this case as file is indeed
present)
- * + getFileStatus to fetch the file ETag
- * + create overwrite=true
- * = 3 connections and 2 send requests in case of Dfs Client
- * = 1 ListBlob + 2 GPS + 2 PutBlob
- */
- if
(fs.getAbfsStore().getAbfsConfiguration().isConditionalCreateOverwriteEnabled())
{
+ // Operation: Creating AbfsOutputStream
+ try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
+ sendRequestPath)) {
+ // Network stats calculation: For Creating AbfsOutputStream:
+ // 1 create request = 1 connection made and 1 send request
if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
- expectedRequestsSent += 2;
- expectedConnectionsMade += 5;
+ expectedRequestsSent += (directory);
+ // Per directory, we have 2 calls :- 1 PutBlob and 1 ListBlobs call.
+ expectedConnectionsMade += ((directory * 2));
} else {
- expectedConnectionsMade += 3;
- expectedRequestsSent += 2;
+ expectedRequestsSent++;
+ expectedConnectionsMade++;
}
- } else {
- expectedConnectionsMade += 1;
- expectedRequestsSent += 1;
- }
- // --------------------------------------------------------------------
+ // --------------------------------------------------------------------
- // Operation: Multiple small appends + hflush
- for (int i = 0; i < WRITE_OPERATION_LOOP_COUNT; i++) {
+ // Operation: Write small data
+ // Network stats calculation: No additions.
+ // Data written is less than the buffer size and hence will not
+ // trigger any append request to store
out.write(testNetworkStatsString.getBytes());
- // Network stats calculation: no-op. Small write
+ // --------------------------------------------------------------------
+
+ // Operation: HFlush
+ // Flushes all outstanding data (i.e. the current unfinished packet)
+ // from the client into the service on all DataNode replicas.
out.hflush();
- // Network stats calculation: Hflush
- // refer to previous comments for hFlush network stats calcualtion
- // possibilities
+ /*
+ * Network stats calculation:
+ * 3 possibilities here:
+ * A. As there is pending data to be written to store, this will
result in:
+ * 1 append + 1 flush = 2 connections and 2 send requests
+ *
+ * B. If config "fs.azure.enable.small.write.optimization" is enabled,
append
+ * and flush call will be merged for small data in buffer in this
test.
+ * In which case it will be:
+ * 1 append+flush request = 1 connection and 1 send request
+ *
+ * C. If the path is configured for append Blob files to be used,
hflush
+ * is a no-op. So in this case:
+ * 1 append = 1 connection and 1 send request
+ */
if
(fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())
- || (this.getConfiguration().isSmallWriteOptimizationEnabled())) {
+ ||
(fs.getAbfsStore().getAbfsConfiguration().isSmallWriteOptimizationEnabled())) {
expectedConnectionsMade++;
expectedRequestsSent++;
} else {
@@ -209,16 +147,91 @@ public void testAbfsHttpSendStatistics() throws
IOException {
expectedRequestsSent += 2;
}
expectedBytesSent += testNetworkStatsString.getBytes().length;
+ // --------------------------------------------------------------------
+
+ // Assertions
+ metricMap = getInstrumentationMap(fs);
+ assertAbfsStatistics(CONNECTIONS_MADE,
+ expectedConnectionsMade, metricMap);
+ assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent,
+ metricMap);
+ assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
+ expectedBytesSent, metricMap);
+ }
+
+ // --------------------------------------------------------------------
+ // Operation: AbfsOutputStream close.
+ // Network Stats calculation: 1 flush (with close) is send.
+ // 1 flush request = 1 connection and 1 send request
+ // Flush with no data is a no-op for blob endpoint, hence update only
for dfs endpoint.
+ if (client instanceof AbfsDfsClient) {
+ expectedConnectionsMade++;
+ expectedRequestsSent++;
}
// --------------------------------------------------------------------
- // Assertions
- metricMap = fs.getInstrumentationMap();
- assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade,
metricMap);
- assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent, metricMap);
- assertAbfsStatistics(AbfsStatistic.BYTES_SENT, expectedBytesSent,
metricMap);
+ // Operation: Re-create the file / create overwrite scenario
+ try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
+ sendRequestPath)) {
+ /*
+ * Network Stats calculation: create overwrite
+ * There are 2 possibilities here.
+ * A. create overwrite results in 1 server call
+ * create with overwrite=true = 1 connection and 1 send request
+ *
+ * B. If config "fs.azure.enable.conditional.create.overwrite" is
enabled,
+ * create overwrite=false (will fail in this case as file is indeed
present)
+ * + getFileStatus to fetch the file ETag
+ * + create overwrite=true
+ * = 3 connections and 2 send requests in case of Dfs Client
+ * = 1 ListBlob + 2 GPS + 2 PutBlob
+ */
+ if (fs.getAbfsStore()
+ .getAbfsConfiguration()
+ .isConditionalCreateOverwriteEnabled()) {
+ if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
+ expectedRequestsSent += 2;
+ expectedConnectionsMade += 5;
+ } else {
+ expectedConnectionsMade += 3;
+ expectedRequestsSent += 2;
+ }
+ } else {
+ expectedConnectionsMade += 1;
+ expectedRequestsSent += 1;
+ }
+ // --------------------------------------------------------------------
+
+ // Operation: Multiple small appends + hflush
+ for (int i = 0; i < WRITE_OPERATION_LOOP_COUNT; i++) {
+ out.write(testNetworkStatsString.getBytes());
+ // Network stats calculation: no-op. Small write
+ out.hflush();
+ // Network stats calculation: Hflush
+ // refer to previous comments for hFlush network stats calcualtion
+ // possibilities
+ if (fs.getAbfsStore()
+ .isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())
+ || (this.getConfiguration().isSmallWriteOptimizationEnabled())) {
+ expectedConnectionsMade++;
+ expectedRequestsSent++;
+ } else {
+ expectedConnectionsMade += 2;
+ expectedRequestsSent += 2;
+ }
+ expectedBytesSent += testNetworkStatsString.getBytes().length;
+ }
+ // --------------------------------------------------------------------
+
+ // Assertions
+ metricMap = fs.getInstrumentationMap();
+ assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade,
+ metricMap);
+ assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent, metricMap);
+ assertAbfsStatistics(AbfsStatistic.BYTES_SENT, expectedBytesSent,
+ metricMap);
+ }
}
-
}
/**
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
index b719a3217b2..a4aced63ee3 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
@@ -33,6 +33,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.junit.Test;
@@ -88,6 +89,7 @@
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID;
import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY;
import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_MKDIR_OVERWRITE;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
@@ -96,6 +98,7 @@
import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
@@ -460,10 +463,18 @@ public void testDefaultCreateOverwriteFileTest() throws
Throwable {
public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite)
throws Throwable {
+ if (enableConditionalCreateOverwrite) {
+ assumeHnsEnabled();
+ assumeDfsServiceType();
+ assumeThat(getIngressServiceType())
+ .as("DFS service type is required for this test")
+ .isEqualTo(AbfsServiceType.DFS);
+ }
try (AzureBlobFileSystem currentFs = getFileSystem()) {
Configuration config = new Configuration(this.getRawConfiguration());
config.set("fs.azure.enable.conditional.create.overwrite",
Boolean.toString(enableConditionalCreateOverwrite));
+ config.set("fs.azure.enable.create.idempotency", "false");
AzureBlobFileSystemStore store = currentFs.getAbfsStore();
AbfsClient client = store.getClientHandler().getIngressClient();
@@ -595,7 +606,11 @@ public void testCreateFileOverwrite(boolean
enableConditionalCreateOverwrite)
@Test
public void testNegativeScenariosForCreateOverwriteDisabled()
throws Throwable {
-
+ assumeHnsEnabled();
+ assumeDfsServiceType();
+ assumeThat(getIngressServiceType())
+ .as("DFS service type is required for this test")
+ .isEqualTo(AbfsServiceType.DFS);
try (AzureBlobFileSystem currentFs = getFileSystem()) {
Configuration config = new Configuration(this.getRawConfiguration());
config.set("fs.azure.enable.conditional.create.overwrite",
@@ -1087,6 +1102,7 @@ public void testParallelCreateOverwriteFalse()
throws Exception {
Configuration configuration = getRawConfiguration();
configuration.set(FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, "false");
+ configuration.set(FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY, "false");
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
configuration)) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
@@ -2236,6 +2252,98 @@ public void
testFailureInGetPathStatusDuringCreateRecovery() throws Exception {
}
}
+ /**
+ * Test to simulate a successful create operation followed by a connection
reset
+ * on the response, triggering a retry.
+ *
+ * This test verifies that the create operation is retried in the event of a
+ * connection reset during the response phase. The test creates a mock
+ * AzureBlobFileSystem and its associated components to simulate the create
+ * operation and the connection reset. It then verifies that the create
+ * operation is retried once before succeeding.
+ *
+ * @throws Exception if an error occurs during the test execution.
+ */
+ @Test
+ public void testCreateIdempotencyForNonHnsBlob() throws Exception {
+ assumeThat(isAppendBlobEnabled()).as("Not valid for APPEND
BLOB").isFalse();
+ assumeHnsDisabled();
+ assumeBlobServiceType();
+ // Create a spy of AzureBlobFileSystem
+ try (AzureBlobFileSystem fs = Mockito.spy(
+ (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
+ // Create a spy of AzureBlobFileSystemStore
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+
+ // Create spies for the client handler and blob client
+ AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
+ AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
+ fs.getAbfsStore().setClient(blobClient);
+ fs.getAbfsStore().setClientHandler(clientHandler);
+ // Set up the spies to return the mocked objects
+ Mockito.doReturn(clientHandler).when(store).getClientHandler();
+ Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
+ Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
+
+ AtomicInteger createCount = new AtomicInteger(0);
+
+ Mockito.doAnswer(answer -> {
+ // Set up the mock for the create operation
+
AbfsClientTestUtil.setMockAbfsRestOperationForCreateOperation(blobClient,
+ (httpOperation) -> {
+ Mockito.doAnswer(invocation -> {
+ // Call the real processResponse method
+ invocation.callRealMethod();
+
+ int currentCount = createCount.incrementAndGet();
+ if (currentCount == 2) {
+ Mockito.when(httpOperation.getStatusCode())
+ .thenReturn(
+ HTTP_INTERNAL_ERROR); // Status code 500 for
Internal Server Error
+ Mockito.when(httpOperation.getStorageErrorMessage())
+ .thenReturn("CONNECTION_RESET"); // Error message
+ throw new IOException("Connection Reset");
+ }
+ return null;
+ }).when(httpOperation).processResponse(
+ Mockito.nullable(byte[].class),
+ Mockito.anyInt(),
+ Mockito.anyInt()
+ );
+
+ return httpOperation;
+ });
+ return answer.callRealMethod();
+ }).when(blobClient).createPath(
+ Mockito.anyString(),
+ Mockito.anyBoolean(),
+ Mockito.anyBoolean(),
+ Mockito.any(AzureBlobFileSystemStore.Permissions.class),
+ Mockito.anyBoolean(), Mockito.nullable(String.class),
Mockito.any(ContextEncryptionAdapter.class),
+ any(TracingContext.class)
+ );
+
+ Path path = new Path("/test/file");
+ fs.create(path, false);
+ Mockito.verify(blobClient, Mockito.times(1)).createPath(
+ Mockito.anyString(),
+ Mockito.anyBoolean(),
+ Mockito.anyBoolean(),
+ Mockito.any(AzureBlobFileSystemStore.Permissions.class),
+ Mockito.anyBoolean(), Mockito.nullable(String.class),
Mockito.any(ContextEncryptionAdapter.class),
+ any(TracingContext.class));
+
+ Mockito.verify(blobClient, Mockito.times(2)).createPathRestOp(
+ Mockito.anyString(),
+ Mockito.anyBoolean(),
+ Mockito.anyBoolean(),
+ Mockito.anyBoolean(),
+ Mockito.nullable(String.class),
Mockito.any(ContextEncryptionAdapter.class),
+ any(TracingContext.class));
+ assertIsFile(fs, path);
+ }
+ }
+
/**
* Mocks and returns an instance of {@link AbfsDfsClient} for the given
AzureBlobFileSystem.
* This method sets up the necessary mock behavior for the client handler
and ingress client.
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
index af1e9e8496d..7a29b653874 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
@@ -153,7 +153,7 @@ public void testTwoCreate() throws Exception {
try (FSDataOutputStream out = fs.create(testFilePath)) {
LambdaTestUtils.intercept(IOException.class,
- isHNSEnabled ? PARALLEL_ACCESS
+ isHNSEnabled && getIngressServiceType() == AbfsServiceType.DFS ?
PARALLEL_ACCESS
: client instanceof AbfsBlobClient
? ERR_NO_LEASE_ID_SPECIFIED_BLOB
: ERR_NO_LEASE_ID_SPECIFIED, () -> {
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
index e4a2d0eed4d..7abc8ef6d2a 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
@@ -52,6 +52,7 @@
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
@@ -74,6 +75,7 @@
import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_FORBIDDEN;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_OK;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
@@ -108,6 +110,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assumptions.assumeThat;
/**
* Test rename operation.
@@ -1705,6 +1708,85 @@ public void testRenamePathRetryIdempotency() throws
Exception {
}
}
+ /**
+ * Test to simulate a successful copy blob operation followed by a
connection reset
+ * on the response, triggering a retry.
+ *
+ * This test verifies that the copy blob operation is retried in the event
of a
+ * connection reset during the response phase. The test creates a mock
+ * AzureBlobFileSystem and its associated components to simulate the copy
blob
+ * operation and the connection reset. It then verifies that the create
+ * operation is retried once before succeeding.
+ *
+ * @throws Exception if an error occurs during the test execution.
+ */
+ @Test
+ public void testRenameIdempotencyForNonHnsBlob() throws Exception {
+ assumeThat(isAppendBlobEnabled()).as("Not valid for APPEND
BLOB").isFalse();
+ assumeHnsDisabled();
+ assumeBlobServiceType();
+ // Create a spy of AzureBlobFileSystem
+ try (AzureBlobFileSystem fs = Mockito.spy(
+ (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
+ // Create a spy of AzureBlobFileSystemStore
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+
+ // Create spies for the client handler and blob client
+ AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
+ AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
+ fs.getAbfsStore().setClient(blobClient);
+ fs.getAbfsStore().setClientHandler(clientHandler);
+ // Set up the spies to return the mocked objects
+ Mockito.doReturn(clientHandler).when(store).getClientHandler();
+ Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
+ Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
+
+ AtomicInteger copyBlobCount = new AtomicInteger(0);
+ Path sourceDir = path("/testSrc");
+ assertMkdirs(fs, sourceDir);
+ String filename = "file1";
+ Path sourceFilePath = new Path(sourceDir, filename);
+ touch(sourceFilePath);
+ Path destFilePath = new Path(sourceDir, "file2");
+ Mockito.doAnswer(answer -> {
+ // Set up the mock for the create operation
+
AbfsClientTestUtil.setMockAbfsRestOperationForCopyBlobOperation(blobClient,
sourceFilePath, destFilePath,
+ (httpOperation) -> {
+ Mockito.doAnswer(invocation -> {
+ // Call the real processResponse method
+ invocation.callRealMethod();
+
+ int currentCount = copyBlobCount.incrementAndGet();
+ if (currentCount == 1) {
+ Mockito.when(httpOperation.getStatusCode())
+ .thenReturn(
+ HTTP_INTERNAL_ERROR); // Status code 500 for
Internal Server Error
+ Mockito.when(httpOperation.getStorageErrorMessage())
+ .thenReturn("CONNECTION_RESET"); // Error message
+ throw new IOException("Connection Reset");
+ }
+ return null;
+ }).when(httpOperation).processResponse(
+ Mockito.nullable(byte[].class),
+ Mockito.anyInt(),
+ Mockito.anyInt()
+ );
+
+ return httpOperation;
+ });
+ return answer.callRealMethod();
+ }).when(blobClient).copyBlob(
+ Mockito.any(Path.class),
+ Mockito.any(Path.class),
+ Mockito.nullable(String.class),
+ Mockito.any(TracingContext.class)
+ );
+ Assertions.assertThat(fs.rename(sourceFilePath, destFilePath))
+ .describedAs("Rename should succeed.")
+ .isTrue();
+ }
+ }
+
/**
* Test to verify that the client transaction ID is included in the response
header
* after renaming a file in Azure Blob Storage.
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
index b9dcefc35e2..cd1a2af7d6c 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
@@ -33,6 +33,7 @@
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl;
import org.assertj.core.api.Assertions;
@@ -41,6 +42,7 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.util.functional.FunctionRaisingIOE;
@@ -50,16 +52,22 @@
import static
org.apache.hadoop.fs.azurebfs.ITestAzureBlobFileSystemListStatus.TEST_CONTINUATION_TOKEN;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_XML;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCKLIST;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCK_BLOB_TYPE;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR;
import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH;
import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE;
import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_NONE_MATCH;
import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_BLOB_CONTENT_MD5;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_BLOB_TYPE;
import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_SOURCE;
import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CLOSE;
import static
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_COMP;
+import static
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.PutBlob;
import static
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.PutBlockList;
import static org.apache.hadoop.fs.azurebfs.services.AuthType.OAuth;
import static
org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION;
@@ -192,6 +200,115 @@ public static void
setMockAbfsRestOperationForFlushOperation(
addMockBehaviourToRestOpAndHttpOp(abfsRestOperation, functionRaisingIOE);
}
+ /**
+ * Sets up a mocked {@link AbfsRestOperation} for a create (PutBlob)
operation
+ * in the Azure Blob File System (ABFS).
+ * <p>
+ * This method is intended for use in testing scenarios where the behavior of
+ * a create request needs to be simulated. It configures a mock
+ * {@link AbfsRestOperation} with the appropriate request headers and
parameters
+ * for a {@code PutBlob} call, and applies the provided {@code
functionRaisingIOE}
+ * to customize the behavior of the underlying {@link AbfsHttpOperation}.
+ * <p>
+ *
+ * @param spiedClient the spied instance of {@link AbfsClient} used
+ * for making HTTP requests
+ * @param functionRaisingIOE a function that customizes the behavior of the
+ * {@link AbfsRestOperation}'s associated
+ * {@link AbfsHttpOperation}, enabling the
simulation
+ * of error conditions or special responses
+ * @throws Exception if an error occurs while setting up the mocked
+ * operation
+ */
+ public static void setMockAbfsRestOperationForCreateOperation(
+ final AbfsClient spiedClient,
+ FunctionRaisingIOE<AbfsHttpOperation, AbfsHttpOperation>
functionRaisingIOE)
+ throws Exception {
+ List<AbfsHttpHeader> requestHeaders =
ITestAbfsClient.getTestRequestHeaders(
+ spiedClient);
+ requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_TYPE, BLOCK_BLOB_TYPE));
+ requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH,
AbfsHttpConstants.ZERO));
+ requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, APPLICATION_XML));
+ final AbfsUriQueryBuilder abfsUriQueryBuilder =
spiedClient.createDefaultUriQueryBuilder();
+ final URL url = spiedClient.createRequestUrl("/test/file",
abfsUriQueryBuilder.toString());
+ AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
+ PutBlob, spiedClient, HTTP_METHOD_PUT,
+ url,
+ requestHeaders,
+ spiedClient.getAbfsConfiguration()));
+
+ Mockito.doReturn(abfsRestOperation)
+ .when(spiedClient)
+ .getAbfsRestOperation(eq(AbfsRestOperationType.PutBlob),
+ Mockito.anyString(), Mockito.any(URL.class), Mockito.anyList());
+
+ addMockBehaviourToRestOpAndHttpOp(abfsRestOperation, functionRaisingIOE);
+ }
+
+ /**
+ * Sets up a mocked {@link AbfsRestOperation} for a CopyBlob operation
+ * in the Azure Blob File System (ABFS).
+ * <p>
+ * This method is intended for use in testing scenarios where the behavior of
+ * a copyBlob request needs to be simulated. It configures a mock
+ * {@link AbfsRestOperation} with the appropriate request headers and
parameters
+ * for a {@code CopyBlob} call, and applies the provided {@code
functionRaisingIOE}
+ * to customize the behavior of the underlying {@link AbfsHttpOperation}.
+ * <p>
+ *
+ * @param spiedClient the spied instance of {@link AbfsClient} used
+ * for making HTTP requests
+ * @param srcPath the source blob path
+ * @param dstPath the destination blob path
+ * @param functionRaisingIOE a function that customizes the behavior of the
+ * {@link AbfsRestOperation}'s associated
+ * {@link AbfsHttpOperation}, enabling the
simulation
+ * of error conditions or special responses
+ * @throws Exception if an error occurs while setting up the mocked
+ * operation
+ */
+ public static void setMockAbfsRestOperationForCopyBlobOperation(
+ final AbfsClient spiedClient,
+ final Path srcPath,
+ final Path dstPath,
+ FunctionRaisingIOE<AbfsHttpOperation, AbfsHttpOperation>
functionRaisingIOE)
+ throws Exception {
+
+ // Prepare headers
+ List<AbfsHttpHeader> requestHeaders =
ITestAbfsClient.getTestRequestHeaders(spiedClient);
+
+ // Add CopyBlob specific headers
+ AbfsUriQueryBuilder abfsUriQueryBuilderDst =
spiedClient.createDefaultUriQueryBuilder();
+ AbfsUriQueryBuilder abfsUriQueryBuilderSrc = new AbfsUriQueryBuilder();
+
+ String dstBlobRelativePath = dstPath.toUri().getPath();
+ String srcBlobRelativePath = srcPath.toUri().getPath();
+
+ final URL url = spiedClient.createRequestUrl(
+ dstBlobRelativePath, abfsUriQueryBuilderDst.toString());
+ final String sourcePathUrl = spiedClient.createRequestUrl(
+ srcBlobRelativePath, abfsUriQueryBuilderSrc.toString()).toString();
+
+ requestHeaders.add(new AbfsHttpHeader(X_MS_COPY_SOURCE, sourcePathUrl));
+ requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
+
+ // Spy on the real CopyBlob operation
+ AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
+ AbfsRestOperationType.CopyBlob,
+ spiedClient,
+ HTTP_METHOD_PUT,
+ url,
+ requestHeaders,
+ spiedClient.getAbfsConfiguration()));
+
+ Mockito.doReturn(abfsRestOperation)
+ .when(spiedClient)
+ .getAbfsRestOperation(eq(AbfsRestOperationType.CopyBlob),
+ Mockito.nullable(String.class), Mockito.any(URL.class),
Mockito.anyList());
+
+ addMockBehaviourToRestOpAndHttpOp(abfsRestOperation, functionRaisingIOE);
+ }
+
/**
* Adding general mock behaviour to AbfsRestOperation and AbfsHttpOperation
* to avoid any NPE occurring. These will avoid any network call made and
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]