anmolanmol1234 commented on code in PR #6633:
URL: https://github.com/apache/hadoop/pull/6633#discussion_r1562362652


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link HttpOperation} for orchestrating server calls using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends HttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsAHCHttpOperation.class);
+
+  /**
+   * Map to store the AbfsApacheHttpClient. Each instance of AbfsClient to have
+   * a unique AbfsApacheHttpClient instance. The key of the map is the UUID of 
the client.
+   */
+  private static final Map<String, AbfsApacheHttpClient>
+      ABFS_APACHE_HTTP_CLIENT_MAP = new HashMap<>();
+
+  private AbfsApacheHttpClient abfsApacheHttpClient;
+
+  private HttpRequestBase httpRequestBase;
+
+  private HttpResponse httpResponse;
+
+  private AbfsManagedHttpContext abfsHttpClientContext;
+
+  private final AbfsRestOperationType abfsRestOperationType;
+
+  private boolean connectionDisconnectedOnError = false;
+
+  private AbfsApacheHttpExpect100Exception abfsApacheHttpExpect100Exception;
+
+  private final boolean isPayloadRequest;
+
+  private List<AbfsHttpHeader> requestHeaders;
+
+  private AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final List<AbfsHttpHeader> requestHeaders,
+      final AbfsRestOperationType abfsRestOperationType) {
+    super(LOG, url, method);
+    this.abfsRestOperationType = abfsRestOperationType;
+    this.requestHeaders = requestHeaders;
+    this.isPayloadRequest = isPayloadRequest(method);
+  }
+
+  public AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final List<AbfsHttpHeader> requestHeaders,
+      final AbfsConfiguration abfsConfiguration,
+      final String clientId,
+      final AbfsRestOperationType abfsRestOperationType) {
+    super(LOG, url, method);
+    this.abfsRestOperationType = abfsRestOperationType;
+    this.requestHeaders = requestHeaders;
+    setAbfsApacheHttpClient(abfsConfiguration, clientId);
+    this.isPayloadRequest = isPayloadRequest(method);
+  }
+
+  public AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final ArrayList<AbfsHttpHeader> requestHeaders,
+      final int httpStatus) {
+    this(url, method, requestHeaders, null);
+    setStatusCode(httpStatus);
+  }
+
+  private void setAbfsApacheHttpClient(final AbfsConfiguration 
abfsConfiguration,
+      final String clientId) {
+    AbfsApacheHttpClient client = ABFS_APACHE_HTTP_CLIENT_MAP.get(clientId);
+    if (client == null) {
+      synchronized (ABFS_APACHE_HTTP_CLIENT_MAP) {
+        client = ABFS_APACHE_HTTP_CLIENT_MAP.get(clientId);
+        if (client == null) {
+          client = new AbfsApacheHttpClient(
+              DelegatingSSLSocketFactory.getDefaultFactory(),
+              abfsConfiguration);
+          ABFS_APACHE_HTTP_CLIENT_MAP.put(clientId, client);
+        }
+      }
+    }
+    abfsApacheHttpClient = client;
+  }
+
+  static void removeClient(final String clientId) throws IOException {
+    AbfsApacheHttpClient client = ABFS_APACHE_HTTP_CLIENT_MAP.remove(clientId);
+    if (client != null) {
+      client.close();
+    }
+  }
+
+  @VisibleForTesting
+  AbfsManagedHttpContext setFinalAbfsClientContext() {
+    return new AbfsManagedHttpContext();
+  }
+
+  private boolean isPayloadRequest(final String method) {
+    return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method)
+        || HTTP_METHOD_POST.equals(method);
+  }
+
+
+  public static AbfsAHCHttpOperation 
getAbfsApacheHttpClientHttpOperationWithFixedResult(
+      final URL url,
+      final String method,
+      final int httpStatus) {
+    return new AbfsAHCHttpOperation(url, method, new ArrayList<>(), 
httpStatus);
+  }
+
+  @Override
+  protected InputStream getErrorStream() throws IOException {
+    HttpEntity entity = httpResponse.getEntity();
+    if (entity == null) {
+      return null;
+    }
+    return entity.getContent();
+  }
+
+  @Override
+  String getConnProperty(final String key) {
+    for (AbfsHttpHeader header : requestHeaders) {
+      if (header.getName().equals(key)) {
+        return header.getValue();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  URL getConnUrl() {
+    return getUrl();
+  }
+
+  @Override
+  String getConnRequestMethod() {
+    return getMethod();
+  }
+
+  @Override
+  Integer getConnResponseCode() throws IOException {
+    return getStatusCode();
+  }
+
+  @Override
+  String getConnResponseMessage() throws IOException {
+    return getStatusDescription();
+  }
+
+  public void processResponse(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    try {
+      if (!isPayloadRequest) {
+        prepareRequest();
+        httpResponse = executeRequest();
+      }
+      parseResponseHeaderAndBody(buffer, offset, length);
+    } finally {
+      if (httpResponse != null) {
+        EntityUtils.consume(httpResponse.getEntity());
+      }
+      if (httpResponse != null
+          && httpResponse instanceof CloseableHttpResponse) {
+        ((CloseableHttpResponse) httpResponse).close();
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void parseResponseHeaderAndBody(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    setStatusCode(httpResponse.getStatusLine().getStatusCode());
+
+    setStatusDescription(httpResponse.getStatusLine().getReasonPhrase());
+
+    String requestId = getResponseHeader(
+        HttpHeaderConfigurations.X_MS_REQUEST_ID);
+    if (requestId == null) {
+      requestId = AbfsHttpConstants.EMPTY_STRING;
+    }
+    setRequestId(requestId);
+
+    // dump the headers
+    AbfsIoUtils.dumpHeadersToDebugLog("Response Headers",
+        getResponseHeaders(httpResponse));
+    parseResponse(buffer, offset, length);
+  }
+
+  @VisibleForTesting
+  HttpResponse executeRequest() throws IOException {

Review Comment:
   I was referring to the client managed context



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to