anmolanmol1234 commented on code in PR #6633:
URL: https://github.com/apache/hadoop/pull/6633#discussion_r1535437602


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link HttpOperation} for orchestrating server calls using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends HttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsAHCHttpOperation.class);
+
+  /**
+   * Map to store the AbfsApacheHttpClient. Each instance of AbfsClient to have
+   * a unique AbfsApacheHttpClient instance. The key of the map is the UUID of 
the client.
+   */
+  private static final Map<String, AbfsApacheHttpClient>
+      ABFS_APACHE_HTTP_CLIENT_MAP = new HashMap<>();
+
+  private AbfsApacheHttpClient abfsApacheHttpClient;
+
+  private HttpRequestBase httpRequestBase;
+
+  private HttpResponse httpResponse;
+
+  private AbfsManagedHttpContext abfsHttpClientContext;
+
+  private final AbfsRestOperationType abfsRestOperationType;
+
+  private boolean connectionDisconnectedOnError = false;
+
+  private AbfsApacheHttpExpect100Exception abfsApacheHttpExpect100Exception;
+
+  private final boolean isPayloadRequest;
+
+  private List<AbfsHttpHeader> requestHeaders;
+
+  private AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final List<AbfsHttpHeader> requestHeaders,
+      final AbfsRestOperationType abfsRestOperationType) {
+    super(LOG, url, method);
+    this.abfsRestOperationType = abfsRestOperationType;
+    this.requestHeaders = requestHeaders;
+    this.isPayloadRequest = isPayloadRequest(method);
+  }
+
+  public AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final List<AbfsHttpHeader> requestHeaders,
+      final AbfsConfiguration abfsConfiguration,
+      final String clientId,
+      final AbfsRestOperationType abfsRestOperationType) {
+    super(LOG, url, method);
+    this.abfsRestOperationType = abfsRestOperationType;
+    this.requestHeaders = requestHeaders;
+    setAbfsApacheHttpClient(abfsConfiguration, clientId);
+    this.isPayloadRequest = isPayloadRequest(method);
+  }
+
+  public AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final ArrayList<AbfsHttpHeader> requestHeaders,
+      final int httpStatus) {
+    this(url, method, requestHeaders, null);
+    setStatusCode(httpStatus);
+  }
+
+  private void setAbfsApacheHttpClient(final AbfsConfiguration 
abfsConfiguration,
+      final String clientId) {
+    AbfsApacheHttpClient client = ABFS_APACHE_HTTP_CLIENT_MAP.get(clientId);
+    if (client == null) {
+      synchronized (ABFS_APACHE_HTTP_CLIENT_MAP) {
+        client = ABFS_APACHE_HTTP_CLIENT_MAP.get(clientId);
+        if (client == null) {
+          client = new AbfsApacheHttpClient(
+              DelegatingSSLSocketFactory.getDefaultFactory(),
+              abfsConfiguration);
+          ABFS_APACHE_HTTP_CLIENT_MAP.put(clientId, client);
+        }
+      }
+    }
+    abfsApacheHttpClient = client;
+  }
+
+  static void removeClient(final String clientId) throws IOException {
+    AbfsApacheHttpClient client = ABFS_APACHE_HTTP_CLIENT_MAP.remove(clientId);
+    if (client != null) {
+      client.close();
+    }
+  }
+
+  @VisibleForTesting
+  AbfsManagedHttpContext setFinalAbfsClientContext() {
+    return new AbfsManagedHttpContext();
+  }
+
+  private boolean isPayloadRequest(final String method) {
+    return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method)
+        || HTTP_METHOD_POST.equals(method);
+  }
+
+
+  public static AbfsAHCHttpOperation 
getAbfsApacheHttpClientHttpOperationWithFixedResult(
+      final URL url,
+      final String method,
+      final int httpStatus) {
+    return new AbfsAHCHttpOperation(url, method, new ArrayList<>(), 
httpStatus);
+  }
+
+  @Override
+  protected InputStream getErrorStream() throws IOException {
+    HttpEntity entity = httpResponse.getEntity();
+    if (entity == null) {
+      return null;
+    }
+    return entity.getContent();
+  }
+
+  @Override
+  String getConnProperty(final String key) {
+    for (AbfsHttpHeader header : requestHeaders) {
+      if (header.getName().equals(key)) {
+        return header.getValue();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  URL getConnUrl() {
+    return getUrl();
+  }
+
+  @Override
+  String getConnRequestMethod() {
+    return getMethod();
+  }
+
+  @Override
+  Integer getConnResponseCode() throws IOException {
+    return getStatusCode();
+  }
+
+  @Override
+  String getConnResponseMessage() throws IOException {
+    return getStatusDescription();
+  }
+
+  public void processResponse(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    try {
+      if (!isPayloadRequest) {
+        prepareRequest();
+        httpResponse = executeRequest();
+      }
+      parseResponseHeaderAndBody(buffer, offset, length);
+    } finally {
+      if (httpResponse != null) {
+        EntityUtils.consume(httpResponse.getEntity());
+      }
+      if (httpResponse != null
+          && httpResponse instanceof CloseableHttpResponse) {
+        ((CloseableHttpResponse) httpResponse).close();
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void parseResponseHeaderAndBody(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    setStatusCode(httpResponse.getStatusLine().getStatusCode());
+
+    setStatusDescription(httpResponse.getStatusLine().getReasonPhrase());
+
+    String requestId = getResponseHeader(
+        HttpHeaderConfigurations.X_MS_REQUEST_ID);
+    if (requestId == null) {
+      requestId = AbfsHttpConstants.EMPTY_STRING;
+    }
+    setRequestId(requestId);
+
+    // dump the headers
+    AbfsIoUtils.dumpHeadersToDebugLog("Response Headers",
+        getResponseHeaders(httpResponse));
+    parseResponse(buffer, offset, length);
+  }
+
+  @VisibleForTesting
+  HttpResponse executeRequest() throws IOException {
+    abfsHttpClientContext = setFinalAbfsClientContext();
+    HttpResponse response = abfsApacheHttpClient.execute(httpRequestBase,
+        abfsHttpClientContext);
+    setConnectionTimeMs(abfsHttpClientContext.getConnectTime());
+    setSendRequestTimeMs(abfsHttpClientContext.getSendTime());
+    setRecvResponseTimeMs(abfsHttpClientContext.getReadTime());
+    return response;
+  }
+
+  private Map<String, List<String>> getResponseHeaders(final HttpResponse 
httpResponse) {
+    if (httpResponse == null || httpResponse.getAllHeaders() == null) {
+      return new HashMap<>();
+    }
+    Map<String, List<String>> map = new HashMap<>();
+    for (Header header : httpResponse.getAllHeaders()) {
+      map.put(header.getName(), new ArrayList<String>(
+          Collections.singleton(header.getValue())));
+    }
+    return map;
+  }
+
+  @Override
+  public void setRequestProperty(final String key, final String value) {
+    setHeader(key, value);
+  }
+
+  @Override
+  Map<String, List<String>> getRequestProperties() {
+    Map<String, List<String>> map = new HashMap<>();
+    for (AbfsHttpHeader header : requestHeaders) {
+      map.put(header.getName(),
+          new ArrayList<String>() {{
+            add(header.getValue());
+          }});
+    }
+    return map;
+  }
+
+  @Override
+  public String getResponseHeader(final String headerName) {
+    if (httpResponse == null) {
+      return null;
+    }
+    Header header = httpResponse.getFirstHeader(headerName);
+    if (header != null) {
+      return header.getValue();
+    }
+    return null;
+  }
+
+  @Override
+  InputStream getContentInputStream()
+      throws IOException {
+    if (httpResponse == null) {
+      return null;
+    }
+    HttpEntity entity = httpResponse.getEntity();
+    if (entity != null) {
+      return httpResponse.getEntity().getContent();
+    }
+    return null;
+  }
+
+  public void sendPayload(final byte[] buffer,
+      final int offset,
+      final int length)
+      throws IOException {
+    if (!isPayloadRequest) {
+      return;
+    }
+
+    if (HTTP_METHOD_PUT.equals(getMethod())) {
+      httpRequestBase = new HttpPut(getUri());
+    }
+    if (HTTP_METHOD_PATCH.equals(getMethod())) {
+      httpRequestBase = new HttpPatch(getUri());
+    }
+    if (HTTP_METHOD_POST.equals(getMethod())) {
+      httpRequestBase = new HttpPost(getUri());
+    }
+
+    setExpectedBytesToBeSent(length);
+    if (buffer != null) {
+      HttpEntity httpEntity = new ByteArrayEntity(buffer, offset, length,
+          TEXT_PLAIN);
+      ((HttpEntityEnclosingRequestBase) httpRequestBase).setEntity(
+          httpEntity);
+    }
+
+    translateHeaders(httpRequestBase, requestHeaders);
+    try {
+      httpResponse = executeRequest();
+    } catch (AbfsApacheHttpExpect100Exception ex) {
+      LOG.debug(
+          "Getting output stream failed with expect header enabled, returning 
back ",
+          ex);
+      connectionDisconnectedOnError = true;
+      httpResponse = ex.getHttpResponse();
+      abfsApacheHttpExpect100Exception = ex;
+    } finally {
+      if (!connectionDisconnectedOnError
+          && httpRequestBase instanceof HttpEntityEnclosingRequestBase) {

Review Comment:
   close the httpRequestBase in the finally block accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to