[ 
https://issues.apache.org/jira/browse/HADOOP-18146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17700742#comment-17700742
 ] 

ASF GitHub Bot commented on HADOOP-18146:
-----------------------------------------

steveloughran commented on code in PR #4039:
URL: https://github.com/apache/hadoop/pull/4039#discussion_r1137255250


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -681,35 +685,38 @@ public AbfsRestOperation append(final String path, final 
byte[] buffer,
         abfsUriQueryBuilder, cachedSasToken);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
-    final AbfsRestOperation op = new AbfsRestOperation(
-        AbfsRestOperationType.Append,
-        this,
-        HTTP_METHOD_PUT,
-        url,
-        requestHeaders,
-        buffer,
-        reqParams.getoffset(),
-        reqParams.getLength(),
-        sasTokenForReuse);
+    final AbfsRestOperation op = 
getAbfsRestOperationForAppend(AbfsRestOperationType.Append,
+        HTTP_METHOD_PUT, url, requestHeaders, buffer, reqParams.getoffset(),
+        reqParams.getLength(), sasTokenForReuse);
     try {
       op.execute(tracingContext);
     } catch (AzureBlobFileSystemException e) {
+      /*
+         If the http response code indicates a user error we retry
+         the same append request with expect header disabled.
+         When "100-continue" header is enabled but a non Http 100 response 
comes,
+         the response message might not get set correctly by the server.
+         So, this handling is to avoid breaking of backward compatibility
+         if someone has taken dependency on the exception message,
+         which is created using the error string present in the response 
header.
+      */
+      int responseStatusCode = ((AbfsRestOperationException) 
e).getStatusCode();
+      if (checkUserError(responseStatusCode) && 
reqParams.isExpectHeaderEnabled()) {
+        LOG.debug("User error, retrying without 100 continue enabled");

Review Comment:
   is there anything which can be added here to help debug problems, e.g path, 
request?
   I'm thinking of the problem "20 GB of log and you want to know why one file 
is having problems"



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -681,35 +685,38 @@ public AbfsRestOperation append(final String path, final 
byte[] buffer,
         abfsUriQueryBuilder, cachedSasToken);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
-    final AbfsRestOperation op = new AbfsRestOperation(
-        AbfsRestOperationType.Append,
-        this,
-        HTTP_METHOD_PUT,
-        url,
-        requestHeaders,
-        buffer,
-        reqParams.getoffset(),
-        reqParams.getLength(),
-        sasTokenForReuse);
+    final AbfsRestOperation op = 
getAbfsRestOperationForAppend(AbfsRestOperationType.Append,
+        HTTP_METHOD_PUT, url, requestHeaders, buffer, reqParams.getoffset(),
+        reqParams.getLength(), sasTokenForReuse);
     try {
       op.execute(tracingContext);
     } catch (AzureBlobFileSystemException e) {
+      /*
+         If the http response code indicates a user error we retry
+         the same append request with expect header disabled.
+         When "100-continue" header is enabled but a non Http 100 response 
comes,
+         the response message might not get set correctly by the server.
+         So, this handling is to avoid breaking of backward compatibility
+         if someone has taken dependency on the exception message,
+         which is created using the error string present in the response 
header.
+      */
+      int responseStatusCode = ((AbfsRestOperationException) 
e).getStatusCode();
+      if (checkUserError(responseStatusCode) && 
reqParams.isExpectHeaderEnabled()) {
+        LOG.debug("User error, retrying without 100 continue enabled");
+        reqParams.setExpectHeaderEnabled(false);
+        return this.append(path, buffer, reqParams, cachedSasToken,
+            tracingContext);
+      }
       // If we have no HTTP response, throw the original exception.
       if (!op.hasResult()) {
         throw e;
       }
       if (reqParams.isAppendBlob()
           && appendSuccessCheckOp(op, path,
           (reqParams.getPosition() + reqParams.getLength()), tracingContext)) {
-        final AbfsRestOperation successOp = new AbfsRestOperation(
-            AbfsRestOperationType.Append,
-            this,
-            HTTP_METHOD_PUT,
-            url,
-            requestHeaders,
-            buffer,
-            reqParams.getoffset(),
-            reqParams.getLength(),
+        final AbfsRestOperation successOp = getAbfsRestOperationForAppend(
+            AbfsRestOperationType.Append, HTTP_METHOD_PUT, url, requestHeaders,

Review Comment:
   again, spread across lines



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -720,6 +727,48 @@ && appendSuccessCheckOp(op, path,
     return op;
   }
 
+  /**
+   * Returns the rest operation for append

Review Comment:
   needs a . at the end for javadoc to be happy



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -114,6 +116,18 @@ static AbfsClientThrottlingIntercept 
initializeSingleton(AbfsConfiguration abfsC
     return singleton;
   }
 
+  /**
+   * Updates the metrics for the case when getOutputStream() caught an 
IOException
+   * and response code signifies throttling.
+   * @param isThrottledOperation returns true if status code is 
HTTP_UNAVAILABLE
+   * @param abfsHttpOperation Used for status code and data transferred.
+   * @return

Review Comment:
   add a return detail.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -60,7 +62,7 @@ public AbfsClientThrottlingIntercept(String accountName, 
AbfsConfiguration abfsC
 
   // Hide default constructor
   private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
-    //Account name is kept as empty as same instance is shared across all 
accounts
+    //Account name is kept as empty as same instance is shared across all 
accounts.

Review Comment:
   nit, add a space



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -114,6 +116,18 @@ static AbfsClientThrottlingIntercept 
initializeSingleton(AbfsConfiguration abfsC
     return singleton;
   }
 
+  /**
+   * Updates the metrics for the case when getOutputStream() caught an 
IOException

Review Comment:
   no it doesn't.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -229,14 +232,29 @@ private void completeExecute(TracingContext 
tracingContext)
       }
     }
 
-    if (result.getStatusCode() >= HttpURLConnection.HTTP_BAD_REQUEST) {
+    int status = result.getStatusCode();
+    /*
+      If even after exhausting all retries, the http status code has an
+      invalid value it qualifies for InvalidAbfsRestOperationException.
+      All http status code less than 1xx range are considered as invalid
+      status codes.
+     */
+    if (status < HTTP_CONTINUE) {
+      throw new InvalidAbfsRestOperationException(null, retryCount);
+    }
+
+    if (status >= HttpURLConnection.HTTP_BAD_REQUEST) {
       throw new AbfsRestOperationException(result.getStatusCode(), 
result.getStorageErrorCode(),
           result.getStorageErrorMessage(), null, result);
     }
-
     LOG.trace("{} REST operation complete", operationType);
   }
 
+  AbfsHttpOperation getHttpOperation(final URL url, final String method,

Review Comment:
   nit: javadocs



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java:
##########
@@ -134,9 +148,22 @@ public void updateMetrics(AbfsRestOperationType 
operationType,
     boolean isFailedOperation = (status < HttpURLConnection.HTTP_OK
         || status >= HttpURLConnection.HTTP_INTERNAL_ERROR);
 
+    // If status code is 503, it considered a throttled operation.
+    boolean isThrottledOperation = (status == HTTP_UNAVAILABLE);
+
     switch (operationType) {
       case Append:
         contentLength = abfsHttpOperation.getBytesSent();
+        if (contentLength == 0) {
+          /*
+            Signifies the case where we could not update the bytesSent due to
+            throttling but there were some expectedBytesToBeSent.
+           */
+          if (updateBytesTransferred(isThrottledOperation, abfsHttpOperation)) 
{
+            LOG.debug("Updating metrics due to throttling");

Review Comment:
   could the URL be included?



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperation.java:
##########
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.ProtocolException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+
+@RunWith(Parameterized.class)
+public class TestAbfsRestOperation extends AbstractAbfsIntegrationTest {

Review Comment:
   again, probably needs to be an ITest. it must be if it depends on any 
credentials/settings in src/test/resources/auth-keys.xml



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -315,12 +333,25 @@ private boolean executeHttpOperation(final int retryCount,
       }
 
       if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
-        throw new InvalidAbfsRestOperationException(ex);
+        throw new InvalidAbfsRestOperationException(ex, retryCount);
       }
 
       return false;
     } finally {
-      intercept.updateMetrics(operationType, httpOperation);
+      int status = httpOperation.getStatusCode();
+      /*
+        A status less than 300 (2xx range) or greater than or equal
+        to 500 (5xx range) should contribute to throttling metrics updation.

Review Comment:
   typo



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -681,35 +685,38 @@ public AbfsRestOperation append(final String path, final 
byte[] buffer,
         abfsUriQueryBuilder, cachedSasToken);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
-    final AbfsRestOperation op = new AbfsRestOperation(
-        AbfsRestOperationType.Append,
-        this,
-        HTTP_METHOD_PUT,
-        url,
-        requestHeaders,
-        buffer,
-        reqParams.getoffset(),
-        reqParams.getLength(),
-        sasTokenForReuse);
+    final AbfsRestOperation op = 
getAbfsRestOperationForAppend(AbfsRestOperationType.Append,
+        HTTP_METHOD_PUT, url, requestHeaders, buffer, reqParams.getoffset(),

Review Comment:
   keep one parameter to a line; its common when there are many, many arguments



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -229,14 +232,29 @@ private void completeExecute(TracingContext 
tracingContext)
       }
     }
 
-    if (result.getStatusCode() >= HttpURLConnection.HTTP_BAD_REQUEST) {
+    int status = result.getStatusCode();
+    /*
+      If even after exhausting all retries, the http status code has an
+      invalid value it qualifies for InvalidAbfsRestOperationException.
+      All http status code less than 1xx range are considered as invalid
+      status codes.
+     */
+    if (status < HTTP_CONTINUE) {
+      throw new InvalidAbfsRestOperationException(null, retryCount);

Review Comment:
   this is clearly something odd. the status code should be included in the 
exception to help debug



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java:
##########
@@ -56,6 +56,12 @@ public class ExponentialRetryPolicy {
    */
   private static final double MAX_RANDOM_RATIO = 1.2;
 
+  /**
+   * All status codes less than http 100 signify error
+   * and should qualify for retry.
+   */
+  private static final int HTTP_CONTINUE = 100;

Review Comment:
   you should only have one declaration of this and cross references; makes it 
easier to find usages in the IDE



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java:
##########
@@ -59,14 +82,19 @@
  * Test useragent of abfs client.
  *
  */
-public final class TestAbfsClient {
+public final class TestAbfsClient extends AbstractAbfsIntegrationTest {

Review Comment:
   if this is now an integration test, it must be renamed ITestAbfsClient



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAbfsRestOperationException.java:
##########
@@ -30,14 +30,33 @@
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class InvalidAbfsRestOperationException extends 
AbfsRestOperationException {
+
+  private static final String ERROR_MESSAGE = 
"InvalidAbfsRestOperationException";
+
   public InvalidAbfsRestOperationException(
       final Exception innerException) {
     super(
         AzureServiceErrorCode.UNKNOWN.getStatusCode(),
         AzureServiceErrorCode.UNKNOWN.getErrorCode(),
         innerException != null
             ? innerException.toString()
-            : "InvalidAbfsRestOperationException",
+            : ERROR_MESSAGE,
         innerException);
   }
+
+  /**
+   * Adds the retry count along with the exception.
+   * @param innerException The inner exception which is originally caught.
+   * @param retryCount The retry count when the exception was thrown.
+   */
+  public InvalidAbfsRestOperationException(
+      final Exception innerException, int retryCount) {

Review Comment:
   what about allowing for status code/error code to be passed in?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -35,6 +35,7 @@ public final class ConfigurationKeys {
    * path to determine HNS status.
    */
   public static final String FS_AZURE_ACCOUNT_IS_HNS_ENABLED = 
"fs.azure.account.hns.enabled";
+  public static final String FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = 
"fs.azure.account.expect.header.enabled";

Review Comment:
   add a javadoc, ideally with an {@value} as the IDE should render it



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperation.java:
##########
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.ProtocolException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+
+@RunWith(Parameterized.class)
+public class TestAbfsRestOperation extends AbstractAbfsIntegrationTest {
+
+  // Specifies whether getOutputStream() or write() throws IOException.
+  public enum ErrorType {OUTPUTSTREAM, WRITE};
+
+  private static final int HTTP_EXPECTATION_FAILED = 417;
+  private static final int HTTP_ERROR = 0;
+  private static final int ZERO = 0;
+  private static final int REDUCED_RETRY_COUNT = 2;
+  private static final int REDUCED_BACKOFF_INTERVAL = 100;
+  private static final int BUFFER_LENGTH = 5;
+  private static final int BUFFER_OFFSET = 0;
+  private static final String TEST_PATH = "/testfile";
+
+  // Specifies whether the expect header is enabled or not.
+  @Parameterized.Parameter
+  public boolean expectHeaderEnabled;
+
+  // Gives the http response code.
+  @Parameterized.Parameter(1)
+  public int responseCode;
+
+  // Gives the http response message.
+  @Parameterized.Parameter(2)
+  public String responseMessage;
+
+  // Gives the errorType based on the enum.
+  @Parameterized.Parameter(3)
+  public ErrorType errorType;
+
+  // The intercept.
+  private AbfsThrottlingIntercept intercept;
+
+  /*
+    HTTP_OK = 200,
+    HTTP_UNAVAILABLE = 503,
+    HTTP_NOT_FOUND = 404,
+    HTTP_EXPECTATION_FAILED = 417,
+    HTTP_ERROR = 0.
+   */
+  @Parameterized.Parameters(name = "expect={0}-code={1}-ErrorType={3}")
+  public static Iterable<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {true, HTTP_OK, "OK", ErrorType.WRITE},
+        {false, HTTP_OK, "OK", ErrorType.WRITE},
+        {true, HTTP_UNAVAILABLE, "ServerBusy", ErrorType.OUTPUTSTREAM},
+        {true, HTTP_NOT_FOUND, "Resource Not Found", ErrorType.OUTPUTSTREAM},
+        {true, HTTP_EXPECTATION_FAILED, "Expectation Failed", 
ErrorType.OUTPUTSTREAM},
+        {true, HTTP_ERROR, "Error", ErrorType.OUTPUTSTREAM}
+    });
+  }
+
+  public TestAbfsRestOperation() throws Exception {
+    super();
+  }
+
+  /**
+   * Test helper method to get random bytes array.
+   * @param length The length of byte buffer
+   * @return byte buffer
+   */
+  private byte[] getRandomBytesArray(int length) {
+    final byte[] b = new byte[length];
+    new Random().nextBytes(b);
+    return b;
+  }
+
+  /**
+   * Gives the AbfsRestOperation.
+   * @return abfsRestOperation.
+   */
+  private AbfsRestOperation getRestOperation() throws Exception {
+    // Get the filesystem.
+    final AzureBlobFileSystem fs = getFileSystem();
+
+    final Configuration configuration = new Configuration();
+    configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
+    AbfsClient abfsClient = fs.getAbfsStore().getClient();
+
+    AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
+        configuration.get(FS_AZURE_ABFS_ACCOUNT_NAME));
+
+    // Update the configuration with reduced retry count and reduced backoff 
interval.
+    AbfsConfiguration abfsConfig
+        = TestAbfsConfigurationFieldsValidation.updateRetryConfigs(
+        abfsConfiguration,
+        REDUCED_RETRY_COUNT, REDUCED_BACKOFF_INTERVAL);
+
+    intercept = Mockito.mock(AbfsThrottlingIntercept.class);
+    Mockito.doNothing().when(intercept).updateMetrics(Mockito.any(), 
Mockito.any());
+
+    // Gets the client.
+    AbfsClient testClient = 
Mockito.spy(TestAbfsClient.createTestClientFromCurrentContext(
+        abfsClient,
+        abfsConfig));
+
+    Mockito.doReturn(intercept).when(testClient).getIntercept();
+
+    // Expect header is enabled or not based on the parameter.
+    AppendRequestParameters appendRequestParameters
+        = new AppendRequestParameters(
+        BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH,
+        AppendRequestParameters.Mode.APPEND_MODE, false, null,
+        expectHeaderEnabled);
+
+    byte[] buffer = getRandomBytesArray(5);
+
+    // Create a test container to upload the data.
+    Path testPath = path(TEST_PATH);
+    fs.create(testPath);
+    String finalTestPath = 
testPath.toString().substring(testPath.toString().lastIndexOf("/"));
+
+    // Creates a list of request headers.
+    final List<AbfsHttpHeader> requestHeaders = 
TestAbfsClient.getTestRequestHeaders(testClient);
+    requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, 
HTTP_METHOD_PATCH));
+    if (appendRequestParameters.isExpectHeaderEnabled()) {
+      requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
+    }
+
+    // Updates the query parameters.
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = 
testClient.createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, 
Long.toString(appendRequestParameters.getPosition()));
+
+    // Creates the url for the specified path.
+    URL url =  testClient.createRequestUrl(finalTestPath, 
abfsUriQueryBuilder.toString());
+
+    // Create a mock of the AbfsRestOperation to set the urlConnection in the 
corresponding httpOperation.
+    AbfsRestOperation op = Mockito.spy(new AbfsRestOperation(
+        AbfsRestOperationType.Append,
+        testClient,
+        HTTP_METHOD_PUT,
+        url,
+        requestHeaders, buffer,
+        appendRequestParameters.getoffset(),
+        appendRequestParameters.getLength(), null));
+
+    AbfsHttpOperation abfsHttpOperation = Mockito.spy(new 
AbfsHttpOperation(url, HTTP_METHOD_PUT, requestHeaders));
+
+    // Sets the expect request property if expect header is enabled.
+    if (expectHeaderEnabled) {
+      Mockito.doReturn(HUNDRED_CONTINUE)
+          .when(abfsHttpOperation)
+          .getConnProperty(EXPECT);
+    }
+
+    HttpURLConnection urlConnection = mock(HttpURLConnection.class);
+    Mockito.doNothing().when(urlConnection).setRequestProperty(Mockito
+        .any(), Mockito.any());
+    Mockito.doReturn(HTTP_METHOD_PUT).when(urlConnection).getRequestMethod();
+    Mockito.doReturn(url).when(urlConnection).getURL();
+    Mockito.doReturn(urlConnection).when(abfsHttpOperation).getConnection();
+
+    Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito
+        .any(), Mockito.any());
+    Mockito.doReturn(url).when(abfsHttpOperation).getConnUrl();
+    
Mockito.doReturn(HTTP_METHOD_PUT).when(abfsHttpOperation).getConnRequestMethod();
+
+    switch (errorType) {
+    case OUTPUTSTREAM:
+      // If the getOutputStream() throws IOException and Expect Header is
+      // enabled, it returns back to processResponse and hence we have
+      // mocked the response code and the response message to check different
+      // behaviour based on response code.
+      
Mockito.doReturn(responseCode).when(abfsHttpOperation).getConnResponseCode();
+      Mockito.doReturn(responseMessage)
+          .when(abfsHttpOperation)
+          .getConnResponseMessage();
+      Mockito.doThrow(new ProtocolException("Server rejected Operation"))
+          .when(abfsHttpOperation)
+          .getConnOutputStream();
+      break;
+    case WRITE:
+      // If write() throws IOException and Expect Header is
+      // enabled or not, it should throw back the exception.
+      OutputStream outputStream = Mockito.spy(new OutputStream() {
+        @Override
+        public void write(final int i) throws IOException {
+        }
+      });
+      
Mockito.doReturn(outputStream).when(abfsHttpOperation).getConnOutputStream();
+      Mockito.doThrow(new IOException())
+          .when(outputStream)
+          .write(buffer, appendRequestParameters.getoffset(),
+              appendRequestParameters.getLength());
+      break;
+    default:
+      break;
+    }
+
+    // Sets the httpOperation for the rest operation.
+    Mockito.doReturn(abfsHttpOperation)
+        .when(op)
+        .getHttpOperation(Mockito.any(), Mockito.any(), Mockito.any());
+    return op;
+  }
+
+  /**
+   * Test the functionalities based on whether getOutputStream() or write()
+   * throws exception and what is the corresponding response code.
+   */
+  @Test
+  public void testExpectHundredContinue() throws Exception {
+    // Gets the AbfsRestOperation.
+    AbfsRestOperation op = getRestOperation();
+    AbfsHttpOperation httpOperation = op.getHttpOperation(Mockito.any(), 
Mockito.any(), Mockito.any());
+
+    TracingContext tracingContext = Mockito.spy(new TracingContext("abcd",
+        "abcde", FSOperationType.APPEND,
+        TracingHeaderFormat.ALL_ID_FORMAT, null));
+
+    switch (errorType) {
+    case WRITE:
+      // If write() throws IOException and Expect Header is
+      // enabled or not, it should throw back the exception
+      // which is caught and exponential retry logic comes into place.
+      intercept(IOException.class,
+          () -> op.execute(tracingContext));
+
+      // Assert that the request is retried based on reduced retry count 
configured.
+      Assertions.assertThat(tracingContext.getRetryCount())
+          .describedAs("The retry count is incorrect")
+          .isEqualTo(REDUCED_RETRY_COUNT);
+
+      // Assert that metrics will be updated correctly.
+      Assertions.assertThat(httpOperation.getBytesSent())
+          .isEqualTo(BUFFER_LENGTH);
+      break;
+    case OUTPUTSTREAM:

Review Comment:
   case in case is pretty complex; in production I'd be worried. tests, well, 
it works.



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperation.java:
##########
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.ProtocolException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
+import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+
+@RunWith(Parameterized.class)
+public class TestAbfsRestOperation extends AbstractAbfsIntegrationTest {
+
+  // Specifies whether getOutputStream() or write() throws IOException.
+  public enum ErrorType {OUTPUTSTREAM, WRITE};
+
+  private static final int HTTP_EXPECTATION_FAILED = 417;
+  private static final int HTTP_ERROR = 0;
+  private static final int ZERO = 0;
+  private static final int REDUCED_RETRY_COUNT = 2;
+  private static final int REDUCED_BACKOFF_INTERVAL = 100;
+  private static final int BUFFER_LENGTH = 5;
+  private static final int BUFFER_OFFSET = 0;
+  private static final String TEST_PATH = "/testfile";
+
+  // Specifies whether the expect header is enabled or not.
+  @Parameterized.Parameter
+  public boolean expectHeaderEnabled;
+
+  // Gives the http response code.
+  @Parameterized.Parameter(1)
+  public int responseCode;
+
+  // Gives the http response message.
+  @Parameterized.Parameter(2)
+  public String responseMessage;
+
+  // Gives the errorType based on the enum.
+  @Parameterized.Parameter(3)
+  public ErrorType errorType;
+
+  // The intercept.
+  private AbfsThrottlingIntercept intercept;
+
+  /*
+    HTTP_OK = 200,
+    HTTP_UNAVAILABLE = 503,
+    HTTP_NOT_FOUND = 404,
+    HTTP_EXPECTATION_FAILED = 417,
+    HTTP_ERROR = 0.
+   */
+  @Parameterized.Parameters(name = "expect={0}-code={1}-ErrorType={3}")
+  public static Iterable<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {true, HTTP_OK, "OK", ErrorType.WRITE},
+        {false, HTTP_OK, "OK", ErrorType.WRITE},
+        {true, HTTP_UNAVAILABLE, "ServerBusy", ErrorType.OUTPUTSTREAM},
+        {true, HTTP_NOT_FOUND, "Resource Not Found", ErrorType.OUTPUTSTREAM},
+        {true, HTTP_EXPECTATION_FAILED, "Expectation Failed", 
ErrorType.OUTPUTSTREAM},
+        {true, HTTP_ERROR, "Error", ErrorType.OUTPUTSTREAM}
+    });
+  }
+
+  public TestAbfsRestOperation() throws Exception {
+    super();
+  }
+
+  /**
+   * Test helper method to get random bytes array.
+   * @param length The length of byte buffer
+   * @return byte buffer
+   */
+  private byte[] getRandomBytesArray(int length) {
+    final byte[] b = new byte[length];
+    new Random().nextBytes(b);
+    return b;
+  }
+
+  /**
+   * Gives the AbfsRestOperation.
+   * @return abfsRestOperation.
+   */
+  private AbfsRestOperation getRestOperation() throws Exception {
+    // Get the filesystem.
+    final AzureBlobFileSystem fs = getFileSystem();
+
+    final Configuration configuration = new Configuration();
+    configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
+    AbfsClient abfsClient = fs.getAbfsStore().getClient();
+
+    AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
+        configuration.get(FS_AZURE_ABFS_ACCOUNT_NAME));
+
+    // Update the configuration with reduced retry count and reduced backoff 
interval.
+    AbfsConfiguration abfsConfig
+        = TestAbfsConfigurationFieldsValidation.updateRetryConfigs(
+        abfsConfiguration,
+        REDUCED_RETRY_COUNT, REDUCED_BACKOFF_INTERVAL);
+
+    intercept = Mockito.mock(AbfsThrottlingIntercept.class);
+    Mockito.doNothing().when(intercept).updateMetrics(Mockito.any(), 
Mockito.any());
+
+    // Gets the client.
+    AbfsClient testClient = 
Mockito.spy(TestAbfsClient.createTestClientFromCurrentContext(
+        abfsClient,
+        abfsConfig));
+
+    Mockito.doReturn(intercept).when(testClient).getIntercept();
+
+    // Expect header is enabled or not based on the parameter.
+    AppendRequestParameters appendRequestParameters
+        = new AppendRequestParameters(
+        BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH,
+        AppendRequestParameters.Mode.APPEND_MODE, false, null,
+        expectHeaderEnabled);
+
+    byte[] buffer = getRandomBytesArray(5);
+
+    // Create a test container to upload the data.
+    Path testPath = path(TEST_PATH);
+    fs.create(testPath);
+    String finalTestPath = 
testPath.toString().substring(testPath.toString().lastIndexOf("/"));
+
+    // Creates a list of request headers.
+    final List<AbfsHttpHeader> requestHeaders = 
TestAbfsClient.getTestRequestHeaders(testClient);
+    requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, 
HTTP_METHOD_PATCH));
+    if (appendRequestParameters.isExpectHeaderEnabled()) {
+      requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
+    }
+
+    // Updates the query parameters.
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = 
testClient.createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, 
Long.toString(appendRequestParameters.getPosition()));
+
+    // Creates the url for the specified path.
+    URL url =  testClient.createRequestUrl(finalTestPath, 
abfsUriQueryBuilder.toString());
+
+    // Create a mock of the AbfsRestOperation to set the urlConnection in the 
corresponding httpOperation.
+    AbfsRestOperation op = Mockito.spy(new AbfsRestOperation(
+        AbfsRestOperationType.Append,
+        testClient,
+        HTTP_METHOD_PUT,
+        url,
+        requestHeaders, buffer,
+        appendRequestParameters.getoffset(),
+        appendRequestParameters.getLength(), null));
+
+    AbfsHttpOperation abfsHttpOperation = Mockito.spy(new 
AbfsHttpOperation(url, HTTP_METHOD_PUT, requestHeaders));
+
+    // Sets the expect request property if expect header is enabled.
+    if (expectHeaderEnabled) {
+      Mockito.doReturn(HUNDRED_CONTINUE)
+          .when(abfsHttpOperation)
+          .getConnProperty(EXPECT);
+    }
+
+    HttpURLConnection urlConnection = mock(HttpURLConnection.class);
+    Mockito.doNothing().when(urlConnection).setRequestProperty(Mockito
+        .any(), Mockito.any());
+    Mockito.doReturn(HTTP_METHOD_PUT).when(urlConnection).getRequestMethod();
+    Mockito.doReturn(url).when(urlConnection).getURL();
+    Mockito.doReturn(urlConnection).when(abfsHttpOperation).getConnection();
+
+    Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito
+        .any(), Mockito.any());
+    Mockito.doReturn(url).when(abfsHttpOperation).getConnUrl();
+    
Mockito.doReturn(HTTP_METHOD_PUT).when(abfsHttpOperation).getConnRequestMethod();
+
+    switch (errorType) {
+    case OUTPUTSTREAM:
+      // If the getOutputStream() throws IOException and Expect Header is
+      // enabled, it returns back to processResponse and hence we have
+      // mocked the response code and the response message to check different
+      // behaviour based on response code.
+      
Mockito.doReturn(responseCode).when(abfsHttpOperation).getConnResponseCode();
+      Mockito.doReturn(responseMessage)
+          .when(abfsHttpOperation)
+          .getConnResponseMessage();
+      Mockito.doThrow(new ProtocolException("Server rejected Operation"))
+          .when(abfsHttpOperation)
+          .getConnOutputStream();
+      break;
+    case WRITE:
+      // If write() throws IOException and Expect Header is
+      // enabled or not, it should throw back the exception.
+      OutputStream outputStream = Mockito.spy(new OutputStream() {
+        @Override
+        public void write(final int i) throws IOException {
+        }
+      });
+      
Mockito.doReturn(outputStream).when(abfsHttpOperation).getConnOutputStream();
+      Mockito.doThrow(new IOException())
+          .when(outputStream)
+          .write(buffer, appendRequestParameters.getoffset(),
+              appendRequestParameters.getLength());
+      break;
+    default:
+      break;
+    }
+
+    // Sets the httpOperation for the rest operation.
+    Mockito.doReturn(abfsHttpOperation)
+        .when(op)
+        .getHttpOperation(Mockito.any(), Mockito.any(), Mockito.any());
+    return op;
+  }
+
+  /**
+   * Test the functionalities based on whether getOutputStream() or write()
+   * throws exception and what is the corresponding response code.
+   */
+  @Test
+  public void testExpectHundredContinue() throws Exception {
+    // Gets the AbfsRestOperation.
+    AbfsRestOperation op = getRestOperation();
+    AbfsHttpOperation httpOperation = op.getHttpOperation(Mockito.any(), 
Mockito.any(), Mockito.any());
+
+    TracingContext tracingContext = Mockito.spy(new TracingContext("abcd",
+        "abcde", FSOperationType.APPEND,
+        TracingHeaderFormat.ALL_ID_FORMAT, null));
+
+    switch (errorType) {
+    case WRITE:
+      // If write() throws IOException and Expect Header is
+      // enabled or not, it should throw back the exception
+      // which is caught and exponential retry logic comes into place.
+      intercept(IOException.class,
+          () -> op.execute(tracingContext));
+
+      // Assert that the request is retried based on reduced retry count 
configured.
+      Assertions.assertThat(tracingContext.getRetryCount())

Review Comment:
   there's duplicate assertions in all the cases. better to add a new assert
   
   ```
   assertTraceContextState(tracingContext, int bytesSent, intExpectedBytes, int 
retryCount)



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java:
##########
@@ -404,4 +431,156 @@ public static AbfsRestOperation 
getRestOp(AbfsRestOperationType type,
   public static AccessTokenProvider getAccessTokenProvider(AbfsClient client) {
     return client.getTokenProvider();
   }
+
+  /**
+   * Test helper method to get random bytes array.
+   * @param length The length of byte buffer.
+   * @return byte buffer.
+   */
+  private byte[] getRandomBytesArray(int length) {
+    final byte[] b = new byte[length];
+    new Random().nextBytes(b);
+    return b;
+  }
+
+  /**
+   * Test to verify that client retries append request without
+   * expect header enabled if append with expect header enabled fails
+   * with 4xx kind of error.
+   * @throws Exception
+   */
+  @Test
+  public void testExpectHundredContinue() throws Exception {
+    // Get the filesystem.
+    final AzureBlobFileSystem fs = getFileSystem();
+
+    final Configuration configuration = new Configuration();
+    configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
+    AbfsClient abfsClient = fs.getAbfsStore().getClient();
+
+    AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
+        configuration.get(FS_AZURE_ABFS_ACCOUNT_NAME));
+
+    // Update the configuration with reduced retry count and reduced backoff 
interval.
+    AbfsConfiguration abfsConfig
+        = TestAbfsConfigurationFieldsValidation.updateRetryConfigs(
+        abfsConfiguration,
+        REDUCED_RETRY_COUNT, REDUCED_BACKOFF_INTERVAL);
+
+    // Gets the client.
+    AbfsClient testClient = Mockito.spy(
+        TestAbfsClient.createTestClientFromCurrentContext(
+            abfsClient,
+            abfsConfig));
+
+    // Create the append request params with expect header enabled initially.
+    AppendRequestParameters appendRequestParameters
+        = new AppendRequestParameters(
+        BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH,
+        AppendRequestParameters.Mode.APPEND_MODE, false, null, true);
+
+    byte[] buffer = getRandomBytesArray(BUFFER_LENGTH);
+
+    // Create a test container to upload the data.
+    Path testPath = path(TEST_PATH);
+    fs.create(testPath);
+    String finalTestPath = testPath.toString()
+        .substring(testPath.toString().lastIndexOf("/"));
+
+    // Creates a list of request headers.
+    final List<AbfsHttpHeader> requestHeaders
+        = TestAbfsClient.getTestRequestHeaders(testClient);
+    requestHeaders.add(
+        new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH));
+    if (appendRequestParameters.isExpectHeaderEnabled()) {
+      requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
+    }
+
+    // Updates the query parameters.
+    final AbfsUriQueryBuilder abfsUriQueryBuilder
+        = testClient.createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION,
+        Long.toString(appendRequestParameters.getPosition()));
+
+    // Creates the url for the specified path.
+    URL url = testClient.createRequestUrl(finalTestPath, 
abfsUriQueryBuilder.toString());
+
+    // Create a mock of the AbfsRestOperation to set the urlConnection in the 
corresponding httpOperation.
+    AbfsRestOperation op = Mockito.spy(new AbfsRestOperation(
+        AbfsRestOperationType.Append,
+        testClient,
+        HTTP_METHOD_PUT,
+        url,
+        requestHeaders, buffer,
+        appendRequestParameters.getoffset(),
+        appendRequestParameters.getLength(), null));
+
+    AbfsHttpOperation abfsHttpOperation = Mockito.spy(new 
AbfsHttpOperation(url,
+        HTTP_METHOD_PUT, requestHeaders));
+
+    // Sets the expect request property if expect header is enabled.
+    if (appendRequestParameters.isExpectHeaderEnabled()) {
+      Mockito.doReturn(HUNDRED_CONTINUE).when(abfsHttpOperation)
+          .getConnProperty(EXPECT);
+    }
+
+    HttpURLConnection urlConnection = mock(HttpURLConnection.class);
+    Mockito.doNothing().when(urlConnection).setRequestProperty(Mockito
+        .any(), Mockito.any());
+    Mockito.doReturn(HTTP_METHOD_PUT).when(urlConnection).getRequestMethod();
+    Mockito.doReturn(url).when(urlConnection).getURL();
+    Mockito.doReturn(urlConnection).when(abfsHttpOperation).getConnection();
+
+    Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito
+        .any(), Mockito.any());
+    Mockito.doReturn(url).when(abfsHttpOperation).getConnUrl();
+
+    // Give user error code 404 when processResponse is called.
+    
Mockito.doReturn(HTTP_METHOD_PUT).when(abfsHttpOperation).getConnRequestMethod();
+    
Mockito.doReturn(HTTP_NOT_FOUND).when(abfsHttpOperation).getConnResponseCode();
+    Mockito.doReturn("Resource Not Found")
+        .when(abfsHttpOperation)
+        .getConnResponseMessage();
+
+    // Make the getOutputStream throw IOException to see it returns from the 
sendRequest correctly.
+    Mockito.doThrow(new ProtocolException("Server rejected Operation"))
+        .when(abfsHttpOperation)
+        .getConnOutputStream();
+
+    // Sets the httpOperation for the rest operation.
+    Mockito.doReturn(abfsHttpOperation)
+        .when(op)
+        .getHttpOperation(Mockito.any(), Mockito.any(), Mockito.any());
+
+    // Mock the restOperation for the client.
+    Mockito.doReturn(op)
+        .when(testClient)
+        .getAbfsRestOperationForAppend(Mockito.any(),
+            Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
+            Mockito.nullable(int.class), Mockito.nullable(int.class),
+            Mockito.any());
+
+    TracingContext tracingContext = Mockito.spy(new TracingContext("abcd",
+        "abcde", FSOperationType.APPEND,
+        TracingHeaderFormat.ALL_ID_FORMAT, null));
+
+    // Check that expect header is enabled before the append call.
+    Assertions.assertThat(appendRequestParameters.isExpectHeaderEnabled())
+        .describedAs("The expect header is not true before the append call")
+            .isEqualTo(true);

Review Comment:
   i think you can just use isTrue() and isFalse() for the boolean asserts





> ABFS: Add changes for expect hundred continue header with append requests
> -------------------------------------------------------------------------
>
>                 Key: HADOOP-18146
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18146
>             Project: Hadoop Common
>          Issue Type: Sub-task
>    Affects Versions: 3.3.1
>            Reporter: Anmol Asrani
>            Assignee: Anmol Asrani
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
>  Heavy load from a Hadoop cluster lead to high resource utilization at FE 
> nodes. Investigations from the server side indicate payload buffering at 
> Http.Sys as the cause. Payload of requests that eventually fail due to 
> throttling limits are also getting buffered, as its triggered before FE could 
> start request processing.
> Approach: Client sends Append Http request with Expect header, but holds back 
> on payload transmission until server replies back with HTTP 100. We add this 
> header for all append requests so as to reduce.
> We made several workload runs with and without hundred continue enabled and 
> the overall observation is that :-
>  # The ratio of TCP SYN packet count with and without expect hundred continue 
> enabled is 0.32 : 3 on average.
>  #  The ingress into the machine at TCP level is almost 3 times lesser with 
> hundred continue enabled which implies a lot of bandwidth save.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to