steveloughran commented on code in PR #6633:
URL: https://github.com/apache/hadoop/pull/6633#discussion_r1621368582


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -167,5 +167,14 @@ public final class FileSystemConfigurations {
   public static final int HUNDRED = 100;
   public static final long THOUSAND = 1000L;
 
+  public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
+      = HttpOperationType.APACHE_HTTP_CLIENT;
+
+  public static final int DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES 
= 3;
+
+  public static final long DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_TIME = 5_000L;
+
+  public static final int DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_CONNECTIONS = 5;

Review Comment:
   that's quite a low number, isn't it?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsApacheHttpExpect100Exception.java:
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
+
+import java.io.IOException;
+
+import org.apache.http.HttpResponse;
+
+public class AbfsApacheHttpExpect100Exception extends IOException {
+  private final HttpResponse httpResponse;
+
+  public AbfsApacheHttpExpect100Exception(final String s, final HttpResponse 
httpResponse) {
+    super(s);
+    this.httpResponse = httpResponse;

Review Comment:
   pull this into the proposed superclass, add a requireNonNull()



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link 
HttpClientConnectionManager}.
+ * This implementation manages connection-pooling heuristics and custom 
implementation
+ * of {@link ManagedHttpClientConnectionFactory}.
+ */
+class AbfsConnectionManager implements HttpClientConnectionManager {
+
+  private final KeepAliveCache kac = KeepAliveCache.getInstance();
+
+  private final AbfsConnFactory httpConnectionFactory;
+
+  private final HttpClientConnectionOperator connectionOperator;
+
+  AbfsConnectionManager(Registry<ConnectionSocketFactory> 
socketFactoryRegistry,
+      AbfsConnFactory connectionFactory) {
+    this.httpConnectionFactory = connectionFactory;
+    connectionOperator = new DefaultHttpClientConnectionOperator(
+        socketFactoryRegistry, null, null);
+  }
+
+  @Override
+  public ConnectionRequest requestConnection(final HttpRoute route,
+      final Object state) {
+    return new ConnectionRequest() {

Review Comment:
   log at debug that a connection was requested



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -316,5 +316,10 @@ public static String accountProperty(String property, 
String account) {
    * @see FileSystem#openFile(org.apache.hadoop.fs.Path)
    */
   public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = 
"fs.azure.buffered.pread.disable";
+  /**Defines what network library to use for server IO calls {@value }*/
+  public static final String FS_AZURE_NETWORKING_LIBRARY = 
"fs.azure.networking.library";
+  public static final String 
FS_AZURE_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES = 
"fs.azure.apache.http.client.max.io.exception.retries";

Review Comment:
   these all need javadocs



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -95,11 +98,15 @@ public class AbfsRestOperation {
   private String failureReason;
   private AbfsRetryPolicy retryPolicy;
 
+  private final AbfsConfiguration abfsConfiguration;
+
   /**
    * This variable stores the tracing context used for last Rest Operation.
    */
   private TracingContext lastUsedTracingContext;
 
+  private int apacheHttpClientIoExceptions = 0;

Review Comment:
   javadocs



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsManagedHttpClientContext.java:
##########
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.client.protocol.HttpClientContext;
+
+public class AbfsManagedHttpClientContext extends HttpClientContext {

Review Comment:
   javadocs



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KAC_DEFAULT_CONN_TTL;
+
+/**
+ * Connection-pooling heuristics adapted from JDK's connection pooling 
`KeepAliveCache`
+ * <p>
+ * Why this implementation is required in comparison to {@link 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable 
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit 
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults 
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to 
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can 
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager. 
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache
+    extends HashMap<KeepAliveCache.KeepAliveKey, KeepAliveCache.ClientVector>
+    implements Runnable {
+
+  private int maxConn;
+
+  private long connectionIdleTTL = KAC_DEFAULT_CONN_TTL;
+
+  private Thread keepAliveTimer = null;
+
+  private boolean isPaused = false;
+
+  private KeepAliveCache() {
+    setMaxConn();
+  }
+
+  synchronized void pauseThread() {
+    isPaused = true;
+  }
+
+  synchronized void resumeThread() {
+    isPaused = false;
+    notify();
+  }
+
+  private void setMaxConn() {
+    String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+    if (sysPropMaxConn == null) {
+      maxConn = DEFAULT_MAX_CONN_SYS_PROP;
+    } else {
+      maxConn = Integer.parseInt(sysPropMaxConn);
+    }
+  }
+
+  public void setAbfsConfig(AbfsConfiguration abfsConfiguration) {
+    this.maxConn = abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+    this.connectionIdleTTL = 
abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+  }
+
+  public long getConnectionIdleTTL() {
+    return connectionIdleTTL;
+  }
+
+  private static final KeepAliveCache INSTANCE = new KeepAliveCache();
+
+  public static KeepAliveCache getInstance() {
+    return INSTANCE;
+  }
+
+  @VisibleForTesting
+  void clearThread() {
+    clear();
+    setMaxConn();
+  }
+
+  private int getKacSize() {
+    return INSTANCE.maxConn;
+  }
+
+  @Override
+  public void run() {
+    do {
+      synchronized (this) {
+        while (isPaused) {
+          try {
+            wait();
+          } catch (InterruptedException ignored) {
+          }
+        }
+      }
+      kacCleanup();
+    } while (size() > 0);
+  }
+
+  private void kacCleanup() {
+    try {
+      Thread.sleep(connectionIdleTTL);
+    } catch (InterruptedException ex) {
+      return;
+    }
+    synchronized (this) {
+      long currentTime = System.currentTimeMillis();
+
+      ArrayList<KeepAliveKey> keysToRemove
+          = new ArrayList<KeepAliveKey>();
+
+      for (Map.Entry<KeepAliveKey, ClientVector> entry : entrySet()) {
+        KeepAliveKey key = entry.getKey();
+        ClientVector v = entry.getValue();
+        synchronized (v) {
+          int i;
+
+          for (i = 0; i < v.size(); i++) {
+            KeepAliveEntry e = v.elementAt(i);
+            if ((currentTime - e.idleStartTime) > v.nap
+                || e.httpClientConnection.isStale()) {
+              HttpClientConnection hc = e.httpClientConnection;
+              closeHtpClientConnection(hc);
+            } else {
+              break;
+            }
+          }
+          v.subList(0, i).clear();
+
+          if (v.size() == 0) {
+            keysToRemove.add(key);
+          }
+        }
+      }
+
+      for (KeepAliveKey key : keysToRemove) {
+        removeVector(key);
+      }
+    }
+  }
+
+  synchronized void removeVector(KeepAliveKey k) {
+    super.remove(k);
+  }
+
+  public synchronized void put(final HttpRoute httpRoute,
+      final HttpClientConnection httpClientConnection) {
+    boolean startThread = (keepAliveTimer == null);
+    if (!startThread) {
+      if (!keepAliveTimer.isAlive()) {
+        startThread = true;
+      }
+    }
+    if (startThread) {
+      clear();
+      final KeepAliveCache cache = this;
+      ThreadGroup grp = Thread.currentThread().getThreadGroup();
+      ThreadGroup parent = null;
+      while ((parent = grp.getParent()) != null) {
+        grp = parent;
+      }
+
+      keepAliveTimer = new Thread(grp, cache, "Keep-Alive-Timer");

Review Comment:
   OK, this is line-for-line the same as the JDK. It doesn't matter what 
license microsoft has with oracle, nobody is allowed to paste bits from the JDK 
in.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls 
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsAHCHttpOperation.class);
+
+  private HttpRequestBase httpRequestBase;
+
+  private HttpResponse httpResponse;
+
+  private boolean connectionDisconnectedOnError = false;
+
+  private final boolean isPayloadRequest;
+
+  public AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final List<AbfsHttpHeader> requestHeaders,
+      final int connectionTimeout,
+      final int readTimeout) {
+    super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
+    this.isPayloadRequest = isPayloadRequest(method);
+  }
+
+  @VisibleForTesting
+  AbfsManagedHttpClientContext setFinalAbfsClientContext() {
+    return new AbfsManagedHttpClientContext();
+  }
+
+  private boolean isPayloadRequest(final String method) {
+    return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method)
+        || HTTP_METHOD_POST.equals(method);
+  }
+
+  @Override
+  protected InputStream getErrorStream() throws IOException {
+    HttpEntity entity = httpResponse.getEntity();
+    if (entity == null) {
+      return null;
+    }
+    return entity.getContent();
+  }
+
+  @Override
+  String getConnProperty(final String key) {
+    for (AbfsHttpHeader header : getRequestHeaders()) {
+      if (header.getName().equals(key)) {
+        return header.getValue();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  URL getConnUrl() {
+    return getUrl();
+  }
+
+  @Override
+  String getConnRequestMethod() {
+    return getMethod();
+  }
+
+  @Override
+  Integer getConnResponseCode() throws IOException {
+    return getStatusCode();
+  }
+
+  @Override
+  String getConnResponseMessage() throws IOException {
+    return getStatusDescription();
+  }
+
+  public void processResponse(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    try {
+      if (!isPayloadRequest) {
+        prepareRequest();
+        httpResponse = executeRequest();
+      }
+      parseResponseHeaderAndBody(buffer, offset, length);
+    } finally {
+      if (httpResponse != null) {
+        try {
+          EntityUtils.consume(httpResponse.getEntity());
+        } finally {
+          if (httpResponse instanceof CloseableHttpResponse) {
+            ((CloseableHttpResponse) httpResponse).close();
+          }
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void parseResponseHeaderAndBody(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    setStatusCode(parseStatusCode(httpResponse));
+
+    setStatusDescription(httpResponse.getStatusLine().getReasonPhrase());
+
+    String requestId = getResponseHeader(
+        HttpHeaderConfigurations.X_MS_REQUEST_ID);
+    if (requestId == null) {
+      requestId = AbfsHttpConstants.EMPTY_STRING;
+    }
+    setRequestId(requestId);
+
+    // dump the headers
+    AbfsIoUtils.dumpHeadersToDebugLog("Response Headers",

Review Comment:
   only do this if the log is at debug, otherwise getResponseHeaders is doing 
needless work



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java:
##########
@@ -165,5 +165,12 @@ public static ApiVersion getCurrentVersion() {
    */
   public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;
 
+  public static final String HTTP_MAX_CONN_SYS_PROP = "http.maxConnections";
+  public static final Integer DEFAULT_MAX_CONN_SYS_PROP = 5;

Review Comment:
   i know it's the jdk default, but it's pretty awful. Why not increasse?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls 
using

Review Comment:
   needs javadocs for each non-inherited method and field.
   just assume that I *always* expect these and save review time by adding them



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls 
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsAHCHttpOperation.class);
+
+  private HttpRequestBase httpRequestBase;

Review Comment:
   add a little javadoc for these fields



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java:
##########
@@ -165,5 +165,12 @@ public static ApiVersion getCurrentVersion() {
    */
   public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;
 
+  public static final String HTTP_MAX_CONN_SYS_PROP = "http.maxConnections";
+  public static final Integer DEFAULT_MAX_CONN_SYS_PROP = 5;
+  public static final int KAC_DEFAULT_CONN_TTL = 5_000;

Review Comment:
   going to prefer using a Duration.ofSeconds(5) and extracting the millis 
value later...removes ambiguity of unit



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls 
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsAHCHttpOperation.class);
+
+  private HttpRequestBase httpRequestBase;
+
+  private HttpResponse httpResponse;
+
+  private boolean connectionDisconnectedOnError = false;
+
+  private final boolean isPayloadRequest;
+
+  public AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final List<AbfsHttpHeader> requestHeaders,
+      final int connectionTimeout,
+      final int readTimeout) {
+    super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
+    this.isPayloadRequest = isPayloadRequest(method);
+  }
+
+  @VisibleForTesting
+  AbfsManagedHttpClientContext setFinalAbfsClientContext() {
+    return new AbfsManagedHttpClientContext();
+  }
+
+  private boolean isPayloadRequest(final String method) {
+    return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method)
+        || HTTP_METHOD_POST.equals(method);
+  }
+
+  @Override
+  protected InputStream getErrorStream() throws IOException {
+    HttpEntity entity = httpResponse.getEntity();
+    if (entity == null) {
+      return null;
+    }
+    return entity.getContent();
+  }
+
+  @Override
+  String getConnProperty(final String key) {
+    for (AbfsHttpHeader header : getRequestHeaders()) {
+      if (header.getName().equals(key)) {
+        return header.getValue();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  URL getConnUrl() {
+    return getUrl();
+  }
+
+  @Override
+  String getConnRequestMethod() {
+    return getMethod();
+  }
+
+  @Override
+  Integer getConnResponseCode() throws IOException {
+    return getStatusCode();
+  }
+
+  @Override
+  String getConnResponseMessage() throws IOException {
+    return getStatusDescription();
+  }
+
+  public void processResponse(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    try {
+      if (!isPayloadRequest) {
+        prepareRequest();
+        httpResponse = executeRequest();
+      }
+      parseResponseHeaderAndBody(buffer, offset, length);
+    } finally {
+      if (httpResponse != null) {
+        try {
+          EntityUtils.consume(httpResponse.getEntity());
+        } finally {
+          if (httpResponse instanceof CloseableHttpResponse) {
+            ((CloseableHttpResponse) httpResponse).close();
+          }
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void parseResponseHeaderAndBody(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    setStatusCode(parseStatusCode(httpResponse));
+
+    setStatusDescription(httpResponse.getStatusLine().getReasonPhrase());
+
+    String requestId = getResponseHeader(
+        HttpHeaderConfigurations.X_MS_REQUEST_ID);
+    if (requestId == null) {
+      requestId = AbfsHttpConstants.EMPTY_STRING;
+    }
+    setRequestId(requestId);
+
+    // dump the headers
+    AbfsIoUtils.dumpHeadersToDebugLog("Response Headers",
+        getResponseHeaders(httpResponse));
+    parseResponse(buffer, offset, length);
+  }
+
+  @VisibleForTesting
+  int parseStatusCode(HttpResponse httpResponse) {
+    return httpResponse.getStatusLine().getStatusCode();
+  }
+
+  @VisibleForTesting
+  HttpResponse executeRequest() throws IOException {
+    AbfsManagedHttpClientContext abfsHttpClientContext
+        = setFinalAbfsClientContext();
+    HttpResponse response
+        = AbfsApacheHttpClient.getClient().execute(httpRequestBase,
+        abfsHttpClientContext, getConnectionTimeout(), getReadTimeout());
+    setConnectionTimeMs(abfsHttpClientContext.getConnectTime());
+    setSendRequestTimeMs(abfsHttpClientContext.getSendTime());
+    setRecvResponseTimeMs(abfsHttpClientContext.getReadTime());
+    return response;
+  }
+
+  private Map<String, List<String>> getResponseHeaders(final HttpResponse 
httpResponse) {
+    if (httpResponse == null || httpResponse.getAllHeaders() == null) {
+      return new HashMap<>();
+    }
+    Map<String, List<String>> map = new HashMap<>();
+    for (Header header : httpResponse.getAllHeaders()) {
+      map.put(header.getName(), new ArrayList<String>(
+          Collections.singleton(header.getValue())));
+    }
+    return map;
+  }
+
+  @Override
+  public void setRequestProperty(final String key, final String value) {
+    setHeader(key, value);
+  }
+
+  @Override
+  Map<String, List<String>> getRequestProperties() {
+    Map<String, List<String>> map = new HashMap<>();
+    for (AbfsHttpHeader header : getRequestHeaders()) {
+      map.put(header.getName(),
+          new ArrayList<String>() {{
+            add(header.getValue());
+          }});
+    }
+    return map;
+  }
+
+  @Override
+  public String getResponseHeader(final String headerName) {
+    if (httpResponse == null) {
+      return null;
+    }
+    Header header = httpResponse.getFirstHeader(headerName);
+    if (header != null) {
+      return header.getValue();
+    }
+    return null;
+  }
+
+  @Override
+  InputStream getContentInputStream()
+      throws IOException {
+    if (httpResponse == null || httpResponse.getEntity() == null) {
+      return null;
+    }
+    return httpResponse.getEntity().getContent();
+  }
+
+  public void sendPayload(final byte[] buffer,
+      final int offset,
+      final int length)
+      throws IOException {
+    if (!isPayloadRequest) {
+      return;
+    }
+
+    switch (getMethod()) {
+    case HTTP_METHOD_PUT:
+      httpRequestBase = new HttpPut(getUri());
+      break;
+    case HTTP_METHOD_PATCH:
+      httpRequestBase = new HttpPatch(getUri());
+      break;
+    case HTTP_METHOD_POST:
+      httpRequestBase = new HttpPost(getUri());
+      break;
+    default:
+      /*
+       * This should never happen as the method is already validated in
+       * isPayloadRequest.
+       */
+      return;
+    }
+
+    setExpectedBytesToBeSent(length);
+    if (buffer != null) {
+      HttpEntity httpEntity = new ByteArrayEntity(buffer, offset, length,
+          TEXT_PLAIN);
+      ((HttpEntityEnclosingRequestBase) httpRequestBase).setEntity(
+          httpEntity);
+    }
+
+    translateHeaders(httpRequestBase, getRequestHeaders());
+    try {
+      httpResponse = executeRequest();
+    } catch (AbfsApacheHttpExpect100Exception ex) {
+      LOG.debug(
+          "Getting output stream failed with expect header enabled, returning 
back."
+              + "Expect 100 assertion failed for uri {} with status code: {}",
+          getMaskedUrl(), parseStatusCode(ex.getHttpResponse()),
+          ex);
+      connectionDisconnectedOnError = true;
+      httpResponse = ex.getHttpResponse();
+    } finally {
+      if (!connectionDisconnectedOnError
+          && httpRequestBase instanceof HttpEntityEnclosingRequestBase) {
+        setBytesSent(length);
+      }
+    }
+  }
+
+  private void prepareRequest() throws IOException {
+    switch (getMethod()) {
+    case HTTP_METHOD_GET:
+      httpRequestBase = new HttpGet(getUri());
+      break;
+    case HTTP_METHOD_DELETE:
+      httpRequestBase = new HttpDelete(getUri());
+      break;
+    case HTTP_METHOD_HEAD:
+      httpRequestBase = new HttpHead(getUri());
+      break;
+    default:
+      /*
+       * This would not happen as the AbfsClient would always be sending valid
+       * method.
+       */
+      throw new PathIOException(getUrl().toString(),
+          "Unsupported HTTP method: " + getMethod());
+    }
+    translateHeaders(httpRequestBase, getRequestHeaders());
+  }
+
+  private URI getUri() throws IOException {
+    try {
+      return getUrl().toURI();
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private void translateHeaders(final HttpRequestBase httpRequestBase,
+      final List<AbfsHttpHeader> requestHeaders) {
+    for (AbfsHttpHeader header : requestHeaders) {
+      httpRequestBase.setHeader(header.getName(), header.getValue());
+    }
+  }
+
+  private void setHeader(String name, String val) {
+    addHeaderToRequestHeaderList(new AbfsHttpHeader(name, val));
+  }
+
+  @Override
+  public String getRequestProperty(String name) {
+    for (AbfsHttpHeader header : getRequestHeaders()) {
+      if (header.getName().equals(name)) {
+        return header.getValue();
+      }
+    }
+    return EMPTY_STRING;
+  }
+
+  @Override
+  boolean getConnectionDisconnectedOnError() {
+    return connectionDisconnectedOnError;
+  }
+
+  @Override
+  public String getTracingContextSuffix() {
+    return APACHE_IMPL;
+  }
+
+  public String getClientRequestId() {
+    for (AbfsHttpHeader header : getRequestHeaders()) {

Review Comment:
   replace with `getRequestProperty(X_MS_CLIENT_REQUEST_ID)`



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsApacheHttpExpect100Exception.java:
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
+
+import java.io.IOException;
+
+import org.apache.http.HttpResponse;
+
+public class AbfsApacheHttpExpect100Exception extends IOException {

Review Comment:
   How about a generic HttpResponseException which this then subclasses?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTP_SCHEME;
+import static 
org.apache.http.conn.ssl.SSLConnectionSocketFactory.getDefaultHostnameVerifier;
+
+final class AbfsApacheHttpClient {

Review Comment:
   guess what I'm going to stay about javadocs here?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java:
##########
@@ -165,5 +165,12 @@ public static ApiVersion getCurrentVersion() {
    */
   public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;
 
+  public static final String HTTP_MAX_CONN_SYS_PROP = "http.maxConnections";

Review Comment:
   add javadoc to explain why this value is a sysprop (I know, but others may 
not)



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls 
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsAHCHttpOperation.class);
+
+  private HttpRequestBase httpRequestBase;
+
+  private HttpResponse httpResponse;
+
+  private boolean connectionDisconnectedOnError = false;
+
+  private final boolean isPayloadRequest;
+
+  public AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final List<AbfsHttpHeader> requestHeaders,
+      final int connectionTimeout,
+      final int readTimeout) {
+    super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
+    this.isPayloadRequest = isPayloadRequest(method);
+  }
+
+  @VisibleForTesting
+  AbfsManagedHttpClientContext setFinalAbfsClientContext() {
+    return new AbfsManagedHttpClientContext();
+  }
+
+  private boolean isPayloadRequest(final String method) {
+    return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method)
+        || HTTP_METHOD_POST.equals(method);
+  }
+
+  @Override
+  protected InputStream getErrorStream() throws IOException {
+    HttpEntity entity = httpResponse.getEntity();
+    if (entity == null) {
+      return null;
+    }
+    return entity.getContent();
+  }
+
+  @Override
+  String getConnProperty(final String key) {
+    for (AbfsHttpHeader header : getRequestHeaders()) {
+      if (header.getName().equals(key)) {
+        return header.getValue();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  URL getConnUrl() {
+    return getUrl();
+  }
+
+  @Override
+  String getConnRequestMethod() {
+    return getMethod();
+  }
+
+  @Override
+  Integer getConnResponseCode() throws IOException {
+    return getStatusCode();
+  }
+
+  @Override
+  String getConnResponseMessage() throws IOException {
+    return getStatusDescription();
+  }
+
+  public void processResponse(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    try {
+      if (!isPayloadRequest) {
+        prepareRequest();
+        httpResponse = executeRequest();
+      }
+      parseResponseHeaderAndBody(buffer, offset, length);
+    } finally {
+      if (httpResponse != null) {
+        try {
+          EntityUtils.consume(httpResponse.getEntity());
+        } finally {
+          if (httpResponse instanceof CloseableHttpResponse) {
+            ((CloseableHttpResponse) httpResponse).close();
+          }
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void parseResponseHeaderAndBody(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    setStatusCode(parseStatusCode(httpResponse));
+
+    setStatusDescription(httpResponse.getStatusLine().getReasonPhrase());
+
+    String requestId = getResponseHeader(
+        HttpHeaderConfigurations.X_MS_REQUEST_ID);
+    if (requestId == null) {
+      requestId = AbfsHttpConstants.EMPTY_STRING;
+    }
+    setRequestId(requestId);
+
+    // dump the headers
+    AbfsIoUtils.dumpHeadersToDebugLog("Response Headers",
+        getResponseHeaders(httpResponse));
+    parseResponse(buffer, offset, length);
+  }
+
+  @VisibleForTesting
+  int parseStatusCode(HttpResponse httpResponse) {
+    return httpResponse.getStatusLine().getStatusCode();
+  }
+
+  @VisibleForTesting
+  HttpResponse executeRequest() throws IOException {
+    AbfsManagedHttpClientContext abfsHttpClientContext
+        = setFinalAbfsClientContext();
+    HttpResponse response

Review Comment:
   recommend logging at debug that this is about to be called



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls 
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsAHCHttpOperation.class);
+
+  private HttpRequestBase httpRequestBase;
+
+  private HttpResponse httpResponse;
+
+  private boolean connectionDisconnectedOnError = false;
+
+  private final boolean isPayloadRequest;
+
+  public AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final List<AbfsHttpHeader> requestHeaders,
+      final int connectionTimeout,
+      final int readTimeout) {
+    super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
+    this.isPayloadRequest = isPayloadRequest(method);

Review Comment:
   log at debug that this will be used, helps to track down what is going on 
when there are network issues



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls 
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsAHCHttpOperation.class);
+
+  private HttpRequestBase httpRequestBase;
+
+  private HttpResponse httpResponse;
+
+  private boolean connectionDisconnectedOnError = false;
+
+  private final boolean isPayloadRequest;
+
+  public AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final List<AbfsHttpHeader> requestHeaders,
+      final int connectionTimeout,
+      final int readTimeout) {
+    super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
+    this.isPayloadRequest = isPayloadRequest(method);
+  }
+
+  @VisibleForTesting
+  AbfsManagedHttpClientContext setFinalAbfsClientContext() {
+    return new AbfsManagedHttpClientContext();
+  }
+
+  private boolean isPayloadRequest(final String method) {
+    return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method)
+        || HTTP_METHOD_POST.equals(method);
+  }
+
+  @Override
+  protected InputStream getErrorStream() throws IOException {
+    HttpEntity entity = httpResponse.getEntity();
+    if (entity == null) {
+      return null;
+    }
+    return entity.getContent();
+  }
+
+  @Override
+  String getConnProperty(final String key) {
+    for (AbfsHttpHeader header : getRequestHeaders()) {
+      if (header.getName().equals(key)) {
+        return header.getValue();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  URL getConnUrl() {
+    return getUrl();
+  }
+
+  @Override
+  String getConnRequestMethod() {
+    return getMethod();
+  }
+
+  @Override
+  Integer getConnResponseCode() throws IOException {
+    return getStatusCode();
+  }
+
+  @Override
+  String getConnResponseMessage() throws IOException {
+    return getStatusDescription();
+  }
+
+  public void processResponse(final byte[] buffer,

Review Comment:
   add @Override



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -1217,7 +1226,7 @@ public AbfsRestOperation deletePath(final String path, 
final boolean recursive,
             this,
             HTTP_METHOD_DELETE,
             url,
-            requestHeaders);
+            requestHeaders, abfsConfiguration);

Review Comment:
   put on a new line for consistency



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnFactory.java:
##########
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.http.config.ConnectionConfig;
+import org.apache.http.conn.ManagedHttpClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+
+/**
+ * Custom implementation of {@link ManagedHttpClientConnectionFactory} and 
overrides
+ * {@link ManagedHttpClientConnectionFactory#create(HttpRoute, 
ConnectionConfig)} to return
+ * {@link AbfsManagedApacheHttpConnection}.
+ */
+public class AbfsConnFactory extends ManagedHttpClientConnectionFactory {

Review Comment:
   give it a better name, e.g AbfsHttpClientConnectionFactory



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link 
HttpClientConnectionManager}.
+ * This implementation manages connection-pooling heuristics and custom 
implementation
+ * of {@link ManagedHttpClientConnectionFactory}.
+ */
+class AbfsConnectionManager implements HttpClientConnectionManager {
+
+  private final KeepAliveCache kac = KeepAliveCache.getInstance();
+
+  private final AbfsConnFactory httpConnectionFactory;
+
+  private final HttpClientConnectionOperator connectionOperator;
+
+  AbfsConnectionManager(Registry<ConnectionSocketFactory> 
socketFactoryRegistry,
+      AbfsConnFactory connectionFactory) {
+    this.httpConnectionFactory = connectionFactory;
+    connectionOperator = new DefaultHttpClientConnectionOperator(
+        socketFactoryRegistry, null, null);
+  }
+
+  @Override
+  public ConnectionRequest requestConnection(final HttpRoute route,
+      final Object state) {
+    return new ConnectionRequest() {
+      @Override
+      public HttpClientConnection get(final long timeout,
+          final TimeUnit timeUnit)
+          throws InterruptedException, ExecutionException,
+          ConnectionPoolTimeoutException {
+        try {
+          HttpClientConnection clientConn = kac.get(route);
+          if (clientConn != null) {
+            return clientConn;
+          }
+          return httpConnectionFactory.create(route, null);
+        } catch (IOException ex) {
+          throw new ExecutionException(ex);
+        }
+      }
+
+      @Override
+      public boolean cancel() {
+        return false;
+      }
+    };
+  }
+
+  /**
+   * Releases a connection for reuse. It can be reused only if validDuration 
is greater than 0.
+   * This method is called by {@link org.apache.http.impl.execchain} internal 
class `ConnectionHolder`.
+   * If it wants to reuse the connection, it will send a non-zero 
validDuration, else it will send 0.
+   * @param conn the connection to release
+   * @param newState the new state of the connection
+   * @param validDuration the duration for which the connection is valid
+   * @param timeUnit the time unit for the validDuration
+   */
+  @Override
+  public void releaseConnection(final HttpClientConnection conn,
+      final Object newState,
+      final long validDuration,
+      final TimeUnit timeUnit) {
+    if (validDuration == 0) {
+      return;
+    }
+    if (conn.isOpen() && conn instanceof AbfsManagedApacheHttpConnection) {
+      HttpRoute route = ((AbfsManagedApacheHttpConnection) 
conn).getHttpRoute();

Review Comment:
   log at debug that a connection was released



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link 
HttpClientConnectionManager}.
+ * This implementation manages connection-pooling heuristics and custom 
implementation
+ * of {@link ManagedHttpClientConnectionFactory}.
+ */
+class AbfsConnectionManager implements HttpClientConnectionManager {

Review Comment:
   add a log and log every callback so that when debugging we stand a chance of 
understanding what is happening



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link 
HttpClientConnectionManager}.

Review Comment:
   this link won't resolve in javadocs, best to say httpclient {@code 
HttpClientConnectionManager}.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java:
##########
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.ProtocolException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JDK_FALLBACK;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JDK_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating calls using 
JDK's HttpURLConnection.
+ */
+public class AbfsJdkHttpOperation extends AbfsHttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsJdkHttpOperation.class);
+
+  private HttpURLConnection connection;

Review Comment:
   final?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -254,158 +311,19 @@ public String getMaskedEncodedUrl() {
     return maskedEncodedUrl;
   }
 
-  /**
-   * Initializes a new HTTP request and opens the connection.
-   *
-   * @param url The full URL including query string parameters.
-   * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
-   * @param requestHeaders The HTTP request headers.READ_TIMEOUT
-   * @param connectionTimeout The Connection Timeout value to be used while 
establishing http connection
-   * @param readTimeout The Read Timeout value to be used with http connection 
while making a request
-   * @throws IOException if an error occurs.
-   */
-  public AbfsHttpOperation(final URL url, final String method, final 
List<AbfsHttpHeader> requestHeaders,

Review Comment:
   1. I'd prefer to split onto separate lines
   2. I'd prefer timeouts to be Duration. If not, at least include unit in 
their name



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -20,57 +20,51 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.HttpURLConnection;
-import java.net.ProtocolException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.List;
-
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLSocketFactory;
-
-import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
-import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+import java.util.Map;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
-
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
-import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
+import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
 
 /**
- * Represents an HTTP operation.
+ * Base Http operation class for orchestrating server IO calls. Child classes 
would
+ * define the certain orchestration implementation on the basis of network 
library used.
+ * <p>
+ * For JDK netlib usage, the child class would be {@link 
AbfsJdkHttpOperation}. <br>
+ * For ApacheHttpClient netlib usage, the child class would be {@link 
AbfsAHCHttpOperation}.
+ * </p>
  */
-public class AbfsHttpOperation implements AbfsPerfLoggable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbfsHttpOperation.class);
+public abstract class AbfsHttpOperation implements AbfsPerfLoggable {

Review Comment:
   add javadocs for new fields, methods.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -103,13 +111,31 @@ public static AbfsHttpOperation 
getAbfsHttpOperationWithFixedResult(
   protected AbfsHttpOperation(final URL url,
       final String method,
       final int httpStatus) {
+    this.log = null;

Review Comment:
   should be set to something



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link 
HttpClientConnectionManager}.
+ * This implementation manages connection-pooling heuristics and custom 
implementation
+ * of {@link ManagedHttpClientConnectionFactory}.
+ */
+class AbfsConnectionManager implements HttpClientConnectionManager {
+
+  private final KeepAliveCache kac = KeepAliveCache.getInstance();
+
+  private final AbfsConnFactory httpConnectionFactory;
+
+  private final HttpClientConnectionOperator connectionOperator;
+
+  AbfsConnectionManager(Registry<ConnectionSocketFactory> 
socketFactoryRegistry,
+      AbfsConnFactory connectionFactory) {
+    this.httpConnectionFactory = connectionFactory;
+    connectionOperator = new DefaultHttpClientConnectionOperator(

Review Comment:
   use a `this.` prefix for consistency



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java:
##########
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import org.apache.hadoop.fs.azurebfs.constants.AbfsRestOperationType;
+
 final class AbfsNoOpThrottlingIntercept implements AbfsThrottlingIntercept {

Review Comment:
   i know this code exists already, but lets document it



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link 
HttpClientConnectionManager}.
+ * This implementation manages connection-pooling heuristics and custom 
implementation
+ * of {@link ManagedHttpClientConnectionFactory}.
+ */
+class AbfsConnectionManager implements HttpClientConnectionManager {
+
+  private final KeepAliveCache kac = KeepAliveCache.getInstance();
+
+  private final AbfsConnFactory httpConnectionFactory;
+
+  private final HttpClientConnectionOperator connectionOperator;
+
+  AbfsConnectionManager(Registry<ConnectionSocketFactory> 
socketFactoryRegistry,
+      AbfsConnFactory connectionFactory) {
+    this.httpConnectionFactory = connectionFactory;
+    connectionOperator = new DefaultHttpClientConnectionOperator(
+        socketFactoryRegistry, null, null);
+  }
+
+  @Override
+  public ConnectionRequest requestConnection(final HttpRoute route,
+      final Object state) {
+    return new ConnectionRequest() {
+      @Override
+      public HttpClientConnection get(final long timeout,
+          final TimeUnit timeUnit)
+          throws InterruptedException, ExecutionException,
+          ConnectionPoolTimeoutException {
+        try {
+          HttpClientConnection clientConn = kac.get(route);
+          if (clientConn != null) {
+            return clientConn;
+          }
+          return httpConnectionFactory.create(route, null);
+        } catch (IOException ex) {
+          throw new ExecutionException(ex);
+        }
+      }
+
+      @Override
+      public boolean cancel() {
+        return false;
+      }
+    };
+  }
+
+  /**
+   * Releases a connection for reuse. It can be reused only if validDuration 
is greater than 0.
+   * This method is called by {@link org.apache.http.impl.execchain} internal 
class `ConnectionHolder`.
+   * If it wants to reuse the connection, it will send a non-zero 
validDuration, else it will send 0.
+   * @param conn the connection to release
+   * @param newState the new state of the connection
+   * @param validDuration the duration for which the connection is valid
+   * @param timeUnit the time unit for the validDuration
+   */
+  @Override
+  public void releaseConnection(final HttpClientConnection conn,
+      final Object newState,
+      final long validDuration,
+      final TimeUnit timeUnit) {
+    if (validDuration == 0) {
+      return;
+    }
+    if (conn.isOpen() && conn instanceof AbfsManagedApacheHttpConnection) {
+      HttpRoute route = ((AbfsManagedApacheHttpConnection) 
conn).getHttpRoute();
+      if (route != null) {
+        kac.put(route, conn);
+      }
+    }
+  }
+
+  @Override
+  public void connect(final HttpClientConnection conn,
+      final HttpRoute route,
+      final int connectTimeout,
+      final HttpContext context) throws IOException {
+    long start = System.currentTimeMillis();
+    connectionOperator.connect((AbfsManagedApacheHttpConnection) conn,
+        route.getTargetHost(), route.getLocalSocketAddress(),
+        connectTimeout, SocketConfig.DEFAULT, context);
+    if (context instanceof AbfsManagedHttpClientContext) {
+      ((AbfsManagedHttpClientContext) context).setConnectTime(
+          System.currentTimeMillis() - start);
+    }
+  }
+
+  @Override
+  public void upgrade(final HttpClientConnection conn,
+      final HttpRoute route,
+      final HttpContext context) throws IOException {
+    connectionOperator.upgrade((AbfsManagedApacheHttpConnection) conn,
+        route.getTargetHost(), context);
+  }
+
+  @Override
+  public void routeComplete(final HttpClientConnection conn,
+      final HttpRoute route,
+      final HttpContext context) throws IOException {
+
+  }
+
+  @Override
+  public void closeIdleConnections(final long idletime,
+      final TimeUnit timeUnit) {
+
+  }
+
+  @Override
+  public void closeExpiredConnections() {

Review Comment:
   why not do this? are you sure the java KeepAliveCache does the right thing?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KAC_DEFAULT_CONN_TTL;
+
+/**
+ * Connection-pooling heuristics adapted from JDK's connection pooling 
`KeepAliveCache`
+ * <p>
+ * Why this implementation is required in comparison to {@link 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable 
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit 
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults 
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to 
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can 
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager. 
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache
+    extends HashMap<KeepAliveCache.KeepAliveKey, KeepAliveCache.ClientVector>
+    implements Runnable {
+
+  private int maxConn;
+
+  private long connectionIdleTTL = KAC_DEFAULT_CONN_TTL;
+
+  private Thread keepAliveTimer = null;
+
+  private boolean isPaused = false;
+
+  private KeepAliveCache() {
+    setMaxConn();
+  }
+
+  synchronized void pauseThread() {
+    isPaused = true;
+  }
+
+  synchronized void resumeThread() {
+    isPaused = false;
+    notify();
+  }
+
+  private void setMaxConn() {
+    String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+    if (sysPropMaxConn == null) {
+      maxConn = DEFAULT_MAX_CONN_SYS_PROP;
+    } else {
+      maxConn = Integer.parseInt(sysPropMaxConn);
+    }
+  }
+
+  public void setAbfsConfig(AbfsConfiguration abfsConfiguration) {
+    this.maxConn = abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+    this.connectionIdleTTL = 
abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+  }
+
+  public long getConnectionIdleTTL() {
+    return connectionIdleTTL;
+  }
+
+  private static final KeepAliveCache INSTANCE = new KeepAliveCache();
+
+  public static KeepAliveCache getInstance() {
+    return INSTANCE;
+  }
+
+  @VisibleForTesting
+  void clearThread() {
+    clear();
+    setMaxConn();
+  }
+
+  private int getKacSize() {

Review Comment:
   give a better name



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KAC_DEFAULT_CONN_TTL;
+
+/**
+ * Connection-pooling heuristics adapted from JDK's connection pooling 
`KeepAliveCache`

Review Comment:
   this is not copied from the JDK is it?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KAC_DEFAULT_CONN_TTL;
+
+/**
+ * Connection-pooling heuristics adapted from JDK's connection pooling 
`KeepAliveCache`
+ * <p>
+ * Why this implementation is required in comparison to {@link 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable 
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit 
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults 
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to 
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can 
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager. 
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache

Review Comment:
   add javadocs all the way down



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org


Reply via email to