[ https://issues.apache.org/jira/browse/HADOOP-19120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17829864#comment-17829864 ]
ASF GitHub Bot commented on HADOOP-19120: ----------------------------------------- anmolanmol1234 commented on code in PR #6633: URL: https://github.com/apache/hadoop/pull/6633#discussion_r1535516995 ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/HttpOperation.java: ########## @@ -0,0 +1,510 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; + +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable; +import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; +import org.apache.hadoop.fs.azurebfs.utils.UriUtils; + +/** + * Base Http operation class for orchestrating server IO calls. Child classes would + * define the certain orchestration implementation on the basis of network library used. + * <p> + * For JDK netlib usage, the child class would be {@link AbfsHttpOperation}. <br> + * For ApacheHttpClient netlib usage, the child class would be {@link AbfsAHCHttpOperation}. + * </p> + */ +public abstract class HttpOperation implements AbfsPerfLoggable { + + private final Logger log; + + private static final int CLEAN_UP_BUFFER_SIZE = 64 * 1024; + + private static final int ONE_THOUSAND = 1000; + + private static final int ONE_MILLION = ONE_THOUSAND * ONE_THOUSAND; + + private String method; + + private URL url; + + private String maskedUrl; + + private String maskedEncodedUrl; + + private int statusCode; + + private String statusDescription; + + private String storageErrorCode = ""; + + private String storageErrorMessage = ""; + + private String requestId = ""; + + private String expectedAppendPos = ""; + + private ListResultSchema listResultSchema = null; + + // metrics + private int bytesSent; + + private int expectedBytesToBeSent; + + private long bytesReceived; + + private long connectionTimeMs; + + private long sendRequestTimeMs; + + private long recvResponseTimeMs; + + private boolean shouldMask = false; + + public HttpOperation(Logger logger, + final URL url, + final String method, + final int httpStatus) { + this.log = logger; + this.url = url; + this.method = method; + this.statusCode = httpStatus; + } + + public HttpOperation(final Logger log, final URL url, final String method) { + this.log = log; + this.url = url; + this.method = method; + } + + public String getMethod() { + return method; + } + + public String getHost() { + return url.getHost(); + } + + public int getStatusCode() { + return statusCode; + } + + public String getStatusDescription() { + return statusDescription; + } + + public String getStorageErrorCode() { + return storageErrorCode; + } + + public String getStorageErrorMessage() { + return storageErrorMessage; + } + + public abstract String getClientRequestId(); + + public String getExpectedAppendPos() { + return expectedAppendPos; + } + + public String getRequestId() { + return requestId; + } + + public void setMaskForSAS() { + shouldMask = true; + } + + public int getBytesSent() { + return bytesSent; + } + + public int getExpectedBytesToBeSent() { + return expectedBytesToBeSent; + } + + public long getBytesReceived() { + return bytesReceived; + } + + public URL getUrl() { + return url; + } + + public ListResultSchema getListResultSchema() { + return listResultSchema; + } + + public abstract String getResponseHeader(String httpHeader); + + void setExpectedBytesToBeSent(int expectedBytesToBeSent) { + this.expectedBytesToBeSent = expectedBytesToBeSent; + } + + void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } + + void setStatusDescription(String statusDescription) { + this.statusDescription = statusDescription; + } + + void setBytesSent(int bytesSent) { + this.bytesSent = bytesSent; + } + + void setSendRequestTimeMs(long sendRequestTimeMs) { + this.sendRequestTimeMs = sendRequestTimeMs; + } + + void setRecvResponseTimeMs(long recvResponseTimeMs) { + this.recvResponseTimeMs = recvResponseTimeMs; + } + + void setRequestId(String requestId) { + this.requestId = requestId; + } + + void setConnectionTimeMs(long connectionTimeMs) { + this.connectionTimeMs = connectionTimeMs; + } + + // Returns a trace message for the request + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append(statusCode); + sb.append(","); + sb.append(storageErrorCode); + sb.append(","); + sb.append(expectedAppendPos); + sb.append(",cid="); + sb.append(getClientRequestId()); + sb.append(",rid="); + sb.append(requestId); + sb.append(",connMs="); + sb.append(connectionTimeMs); + sb.append(",sendMs="); + sb.append(sendRequestTimeMs); + sb.append(",recvMs="); + sb.append(recvResponseTimeMs); + sb.append(",sent="); + sb.append(bytesSent); + sb.append(",recv="); + sb.append(bytesReceived); + sb.append(","); + sb.append(method); + sb.append(","); + sb.append(getMaskedUrl()); + return sb.toString(); + } + + // Returns a trace message for the ABFS API logging service to consume + public String getLogString() { + + final StringBuilder sb = new StringBuilder(); + sb.append("s=") + .append(statusCode) + .append(" e=") + .append(storageErrorCode) + .append(" ci=") + .append(getClientRequestId()) + .append(" ri=") + .append(requestId) + + .append(" ct=") + .append(connectionTimeMs) + .append(" st=") + .append(sendRequestTimeMs) + .append(" rt=") + .append(recvResponseTimeMs) + + .append(" bs=") + .append(bytesSent) + .append(" br=") + .append(bytesReceived) + .append(" m=") + .append(method) + .append(" u=") + .append(getMaskedEncodedUrl()); + + return sb.toString(); + } + + public String getMaskedUrl() { + if (!shouldMask) { + return url.toString(); + } + if (maskedUrl != null) { + return maskedUrl; + } + maskedUrl = UriUtils.getMaskedUrl(url); + return maskedUrl; + } + + public String getMaskedEncodedUrl() { + if (maskedEncodedUrl != null) { + return maskedEncodedUrl; + } + maskedEncodedUrl = UriUtils.encodedUrlStr(getMaskedUrl()); + return maskedEncodedUrl; + } + + public abstract void sendPayload(byte[] buffer, int offset, int length) throws + IOException; + + public abstract void processResponse(byte[] buffer, + int offset, + int length) throws IOException; + + public abstract void setRequestProperty(String key, String value); + + void parseResponse(final byte[] buffer, + final int offset, + final int length) throws IOException { + long startTime; + if (AbfsHttpConstants.HTTP_METHOD_HEAD.equals(this.method)) { + // If it is HEAD, and it is ERROR + return; + } + + startTime = System.nanoTime(); + + if (statusCode >= HttpURLConnection.HTTP_BAD_REQUEST) { + processStorageErrorResponse(); + this.recvResponseTimeMs += elapsedTimeMs(startTime); + String contentLength = getResponseHeader( + HttpHeaderConfigurations.CONTENT_LENGTH); + if (contentLength != null) { + this.bytesReceived = Long.parseLong(contentLength); + } else { + this.bytesReceived = 0L; + } + + } else { + // consume the input stream to release resources + int totalBytesRead = 0; + + try (InputStream stream = getContentInputStream()) { + if (isNullInputStream(stream)) { + return; + } + boolean endOfStream = false; + + // this is a list operation and need to retrieve the data + // need a better solution + if (AbfsHttpConstants.HTTP_METHOD_GET.equals(this.method) + && buffer == null) { + parseListFilesResponse(stream); + } else { + if (buffer != null) { + while (totalBytesRead < length) { + int bytesRead = stream.read(buffer, offset + totalBytesRead, + length + - totalBytesRead); + if (bytesRead == -1) { + endOfStream = true; + break; + } + totalBytesRead += bytesRead; + } + } + if (!endOfStream && stream.read() != -1) { Review Comment: didn't understand the purpose of this read > [ABFS]: ApacheHttpClient adaptation as network library > ------------------------------------------------------ > > Key: HADOOP-19120 > URL: https://issues.apache.org/jira/browse/HADOOP-19120 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/azure > Affects Versions: 3.5.0 > Reporter: Pranav Saxena > Assignee: Pranav Saxena > Priority: Major > Labels: pull-request-available > > Apache HttpClient is more feature-rich and flexible and gives application > more granular control over networking parameter. > ABFS currently relies on the JDK-net library. This library is managed by > OpenJDK and has no performance problem. However, it limits the application's > control over networking, and there are very few APIs and hooks exposed that > the application can use to get metrics, choose which and when a connection > should be reused. ApacheHttpClient will give important hooks to fetch > important metrics and control networking parameters. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org