[ 
https://issues.apache.org/jira/browse/HADOOP-17511?focusedWorklogId=597753&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-597753
 ]

ASF GitHub Bot logged work on HADOOP-17511:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/May/21 16:01
            Start Date: 17/May/21 16:01
    Worklog Time Spent: 10m 
      Work Description: bogthe commented on a change in pull request #2807:
URL: https://github.com/apache/hadoop/pull/2807#discussion_r633141325



##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/HttpReferrerAuditHeader.java
##########
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.store.audit.CommonAuditContext;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.store.audit.AuditConstants.PARAM_ID;
+import static org.apache.hadoop.fs.store.audit.AuditConstants.PARAM_OP;
+import static org.apache.hadoop.fs.store.audit.AuditConstants.PARAM_PATH;
+import static org.apache.hadoop.fs.store.audit.AuditConstants.PARAM_PATH2;
+import static 
org.apache.hadoop.fs.store.audit.AuditConstants.REFERRER_ORIGIN_HOST;
+
+/**
+ * Contains all the logic for generating an HTTP "Referer"
+ * entry; includes escaping query params.
+ * Tests for this are in
+ * {@code org.apache.hadoop.fs.s3a.audit.TestHttpReferrerAuditHeader}
+ * so as to verify that header generation in the S3A auditors, and
+ * S3 log parsing, all work.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class HttpReferrerAuditHeader {
+
+  /**
+   * Format of path to build: {@value}.
+   * the params passed in are (context ID, span ID, op)
+   */
+  public static final String REFERRER_PATH_FORMAT = "/%3$s/%2$s/";
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(HttpReferrerAuditHeader.class);
+
+  /**
+   * Log for warning of problems creating headers will only log of
+   * a problem once per process instance.
+   * This is to avoid logs being flooded with errors.
+   */
+  private static final LogExactlyOnce WARN_OF_URL_CREATION =
+      new LogExactlyOnce(LOG);
+
+  /** Context ID. */
+  private final String contextId;
+
+  /** operation name. */
+  private final String operationName;
+
+  /** Span ID. */
+  private final String spanId;
+
+  /** optional first path. */
+  private final String path1;
+
+  /** optional second path. */
+  private final String path2;
+
+  /**
+   * The header as created in the constructor; used in toString().
+   * A new header is built on demand in {@link #buildHttpReferrer()}
+   * so that evaluated attributes are dynamically evaluated
+   * in the correct thread/place.
+   */
+  private final String initialHeader;
+
+  /**
+   * Map of simple attributes.
+   */
+  private final Map<String, String> attributes;
+
+  /**
+   * Parameters dynamically evaluated on the thread just before
+   * the request is made.
+   */
+  private final Map<String, Supplier<String>> evaluated;
+
+  /**
+   * Elements to filter from the final header.
+   */
+  private final Set<String> filter;
+
+  /**
+   * Instantiate.
+   *
+   * Context and operationId are expected to be well formed
+   * numeric/hex strings, at least adequate to be
+   * used as individual path elements in a URL.
+   */
+  private HttpReferrerAuditHeader(
+      final Builder builder) {
+    this.contextId = requireNonNull(builder.contextId);
+    this.evaluated = builder.evaluated;
+    this.filter = builder.filter;
+    this.operationName = requireNonNull(builder.operationName);
+    this.path1 = builder.path1;
+    this.path2 = builder.path2;
+    this.spanId = requireNonNull(builder.spanId);
+
+    // copy the parameters from the builder and extend
+    attributes = builder.attributes;
+
+    addAttribute(PARAM_OP, operationName);
+    addAttribute(PARAM_PATH, path1);
+    addAttribute(PARAM_PATH2, path2);
+    addAttribute(PARAM_ID, spanId);
+
+    // patch in global context values where not set
+    Iterable<Map.Entry<String, String>> globalContextValues
+        = builder.globalContextValues;
+    if (globalContextValues != null) {
+      for (Map.Entry<String, String> entry : globalContextValues) {
+        attributes.putIfAbsent(entry.getKey(), entry.getValue());

Review comment:
       What are the implications of merging multiple `globalContextValues` maps 
into a single one (i.e. `attributes`). Will there be a situation where 
different contexts have the same `key` but different `values`? It doesn't seem 
too bad, maybe a warning in the comments / documentation for this scenario is 
enough?

##########
File path: 
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/audit/TestCommonAuditContext.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store.audit;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.assertj.core.api.AbstractStringAssert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.apache.hadoop.fs.store.audit.AuditConstants.PARAM_COMMAND;
+import static org.apache.hadoop.fs.store.audit.AuditConstants.PARAM_PROCESS;
+import static org.apache.hadoop.fs.store.audit.AuditConstants.PARAM_THREAD1;
+import static org.apache.hadoop.fs.store.audit.CommonAuditContext.PROCESS_ID;
+import static 
org.apache.hadoop.fs.store.audit.CommonAuditContext.clearGlobalContextEntry;
+import static 
org.apache.hadoop.fs.store.audit.CommonAuditContext.currentAuditContext;
+import static 
org.apache.hadoop.fs.store.audit.CommonAuditContext.getGlobalContextEntry;
+import static 
org.apache.hadoop.fs.store.audit.CommonAuditContext.getGlobalContextEntries;
+import static 
org.apache.hadoop.fs.store.audit.CommonAuditContext.noteEntryPoint;
+import static 
org.apache.hadoop.fs.store.audit.CommonAuditContext.setGlobalContextEntry;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests of the common audit context.
+ */
+public class TestCommonAuditContext extends AbstractHadoopTestBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestCommonAuditContext.class);
+
+  private final CommonAuditContext context = currentAuditContext();
+  /**
+   * We can set, get and enumerate global context values.
+   */
+  @Test
+  public void testGlobalSetGetEnum() throws Throwable {
+
+    String s = "command";
+    setGlobalContextEntry(PARAM_COMMAND, s);
+    assertGlobalEntry(PARAM_COMMAND)
+        .isEqualTo(s);
+    // and the iterators.
+    List<Map.Entry<String, String>> list = StreamSupport
+        .stream(getGlobalContextEntries().spliterator(),
+            false)
+        .filter(e -> e.getKey().equals(PARAM_COMMAND))
+        .collect(Collectors.toList());
+    assertThat(list)
+        .hasSize(1)
+        .allMatch(e -> e.getValue().equals(s));
+  }
+
+  @Test
+  public void testVerifyProcessID() throws Throwable {
+    assertThat(
+        getGlobalContextEntry(PARAM_PROCESS))
+        .describedAs("global context value of %s", PARAM_PROCESS)
+        .isEqualTo(PROCESS_ID);
+  }
+
+
+  @Test
+  public void testNullValue() throws Throwable {
+    assertThat(context.get(PARAM_PROCESS))
+        .describedAs("Value of context element %s", PARAM_PROCESS)
+        .isNull();
+  }
+
+  @Test
+  public void testThreadId() throws Throwable {
+    String t1 = getContextValue(PARAM_THREAD1);
+    Long tid = Long.valueOf(t1);
+    assertThat(tid).describedAs("thread ID")
+        .isEqualTo(Thread.currentThread().getId());
+  }
+
+  /**
+   * Verify functions are dynamically evaluated.
+   */
+  @Test
+  public void testDynamicEval() throws Throwable {
+    context.reset();
+    final AtomicBoolean ab = new AtomicBoolean(false);
+    context.put("key", () ->
+        Boolean.toString(ab.get()));
+    assertContextValue("key")
+        .isEqualTo("false");
+    // update the reference and the next get call will
+    // pick up the new value.
+    ab.set(true);
+    assertContextValue("key")
+        .isEqualTo("true");
+  }
+
+  private String getContextValue(final String key) {
+    String val = context.get(key);
+    assertThat(val).isNotBlank();
+    return val;
+  }
+
+  /**
+   * Start an assertion on a context value.
+   * @param key key to look up
+   * @return an assert which can be extended call
+   */
+  private AbstractStringAssert<?> assertContextValue(final String key) {
+    String val = context.get(key);
+    return assertThat(val)
+        .describedAs("Value of context element %s", key)
+        .isNotBlank();
+  }
+
+  @Test
+  public void testNoteEntryPoint() throws Throwable {
+    setAndAssertEntryPoint(this).isEqualTo("TestCommonAuditContext");
+

Review comment:
       nit: extra space

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
##########
@@ -117,13 +122,17 @@
   /**
    * Source of time.
    */
-  private ITtlTimeProvider timeProvider;
+
+  /** Time source for S3Guard TTLs. */
+  private final ITtlTimeProvider timeProvider;
+
+  /** Operation Auditor. */
+  private final AuditSpanSource<AuditSpanS3A> auditor;
 
   /**
    * Instantiate.
-   * @deprecated as public method: use {@link StoreContextBuilder}.
    */
-  public StoreContext(
+  StoreContext(

Review comment:
       nit: is access modifier intentionally left out?

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -2430,13 +2749,16 @@ PutObjectResult putObjectDirect(PutObjectRequest 
putObjectRequest)
     LOG.debug("PUT {} bytes to {}", len, putObjectRequest.getKey());
     incrementPutStartStatistics(len);
     try {
-      PutObjectResult result = s3.putObject(putObjectRequest);
+      PutObjectResult result = trackDurationOfSupplier(
+          getDurationTrackerFactory(),
+          OBJECT_PUT_REQUESTS.getSymbol(), () ->
+              s3.putObject(putObjectRequest));
       incrementPutCompletedStatistics(true, len);
       // update metadata
       finishedWrite(putObjectRequest.getKey(), len,
           result.getETag(), result.getVersionId(), null);
       return result;
-    } catch (AmazonClientException e) {
+    } catch (SdkBaseException e) {

Review comment:
       Any reason for moving to `SdkBaseException`? I see this 
`putObjectDirect` method signals it's throwing `AmazonClientException`, no bugs 
just small inconsistency.  

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/audit/AuditingFunctions.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store.audit;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
+import org.apache.hadoop.util.functional.FunctionRaisingIOE;
+import org.apache.hadoop.util.functional.InvocationRaisingIOE;
+
+/**
+ * Static methods to assist in working with Audit Spans.
+ * the {@code withinX} calls take a span and a closure/function etc.
+ * and return a new function of the same types but which will
+ * activate and the span.
+ * They do not deactivate it afterwards to avoid accidentally deactivating
+ * the already-active span during a chain of operations in the same thread.
+ * All they do is ensure that the given span is guaranteed to be
+ * active when the passed in callable/function/invokable is evaluated.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class AuditingFunctions {
+
+  /**
+   * Given a callable, return a new callable which
+   * activates and deactivates the span around the inner invocation.

Review comment:
       Comment out of date. This mentions the callable `activates` and 
`deactivates` the span while to class commend mentions that `They do not 
deactivate it afterwards...`. Callable also contains no call to deactivate.
   
   Same comment applies for all methods in this class. 

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
##########
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.DeleteObjectRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
+import com.amazonaws.services.s3.model.ListNextBatchOfObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
+import com.amazonaws.services.s3.model.SSECustomerKey;
+import com.amazonaws.services.s3.model.SelectObjectContentRequest;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecretOperations;
+import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
+
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The standard implementation of the request factory.
+ * This creates AWS SDK request classes for the specific bucket,
+ * with standard options/headers set.
+ * It is also where custom setting parameters can take place.
+ *
+ * All creation of AWS S3 requests MUST be through this class so that
+ * common options (encryption etc.) can be added here,
+ * and so that any chained transformation of requests can be applied.
+ *
+ * This is where audit span information is added to the requests,
+ * until it is done in the AWS SDK itself.
+ *
+ * All created requests will be passed through
+ * {@link PrepareRequest#prepareRequest(AmazonWebServiceRequest)} before
+ * being returned to the caller.
+ */
+public class RequestFactoryImpl implements RequestFactory {
+
+  public static final Logger LOG = LoggerFactory.getLogger(
+      RequestFactoryImpl.class);
+
+  /**
+   * Target bucket.
+   */
+  private final String bucket;
+
+  /**
+   * Encryption secrets.
+   */
+  private EncryptionSecrets encryptionSecrets;
+
+  /**
+   * ACL For new objects.
+   */
+  private final CannedAccessControlList cannedACL;
+
+  /**
+   * Max number of multipart entries allowed in a large
+   * upload. Tunable for testing only.
+   */
+  private final long multipartPartCountLimit;
+
+  /**
+   * Requester Pays.
+   * This is to be wired up in a PR with its
+   * own tests and docs.
+   */
+  private final boolean requesterPays;
+
+  /**
+   * Callback to prepare requests.
+   */
+  private final PrepareRequest requestPreparer;
+
+  /**
+   * Constructor.
+   * @param builder builder with all the configuration.
+   */
+  protected RequestFactoryImpl(
+      final RequestFactoryBuilder builder) {
+    this.bucket = builder.bucket;
+    this.cannedACL = builder.cannedACL;
+    this.encryptionSecrets = builder.encryptionSecrets;
+    this.multipartPartCountLimit = builder.multipartPartCountLimit;
+    this.requesterPays = builder.requesterPays;
+    this.requestPreparer = builder.requestPreparer;
+  }
+
+  /**
+   * Preflight preparation of AWS request.
+   * @param <T> web service request
+   * @return prepared entry.
+   */
+  @Retries.OnceRaw
+  private <T extends AmazonWebServiceRequest> T prepareRequest(T t) {
+    return requestPreparer != null
+        ? requestPreparer.prepareRequest(t)
+        : t;
+  }
+
+  /**
+   * Get the canned ACL of this FS.
+   * @return an ACL, if any
+   */
+  @Override
+  public CannedAccessControlList getCannedACL() {
+    return cannedACL;
+  }
+
+  /**
+   * Get the target bucket.
+   * @return the bucket.
+   */
+  protected String getBucket() {
+    return bucket;
+  }
+
+  /**
+   * Create the AWS SDK structure used to configure SSE,
+   * if the encryption secrets contain the information/settings for this.
+   * @return an optional set of KMS Key settings
+   */
+  @Override
+  public Optional<SSEAwsKeyManagementParams> generateSSEAwsKeyParams() {
+    return EncryptionSecretOperations.createSSEAwsKeyManagementParams(
+        encryptionSecrets);
+  }
+
+  /**
+   * Create the SSE-C structure for the AWS SDK, if the encryption secrets
+   * contain the information/settings for this.
+   * This will contain a secret extracted from the bucket/configuration.
+   * @return an optional customer key.
+   */
+  @Override
+  public Optional<SSECustomerKey> generateSSECustomerKey() {
+    return EncryptionSecretOperations.createSSECustomerKey(
+        encryptionSecrets);
+  }
+
+  /**
+   * Get the encryption algorithm of this endpoint.
+   * @return the encryption algorithm.
+   */
+  @Override
+  public S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
+    return encryptionSecrets.getEncryptionMethod();
+  }
+
+  /**
+   * Sets server side encryption parameters to the part upload
+   * request when encryption is enabled.
+   * @param request upload part request
+   */
+  protected void setOptionalUploadPartRequestParameters(
+      UploadPartRequest request) {
+    generateSSECustomerKey().ifPresent(request::setSSECustomerKey);
+  }
+
+  /**
+   * Sets server side encryption parameters to the GET reuquest.
+   * request when encryption is enabled.
+   * @param request upload part request
+   */
+  protected void setOptionalGetObjectMetadataParameters(
+      GetObjectMetadataRequest request) {
+    generateSSECustomerKey().ifPresent(request::setSSECustomerKey);
+  }
+
+  /**
+   * Set the optional parameters when initiating the request (encryption,
+   * headers, storage, etc).
+   * @param request request to patch.
+   */
+  protected void setOptionalMultipartUploadRequestParameters(
+      InitiateMultipartUploadRequest request) {
+    generateSSEAwsKeyParams().ifPresent(request::setSSEAwsKeyManagementParams);
+    generateSSECustomerKey().ifPresent(request::setSSECustomerKey);
+  }
+
+  /**
+   * Set the optional parameters for a PUT request.
+   * @param request request to patch.
+   */
+  protected void setOptionalPutRequestParameters(PutObjectRequest request) {
+    generateSSEAwsKeyParams().ifPresent(request::setSSEAwsKeyManagementParams);
+    generateSSECustomerKey().ifPresent(request::setSSECustomerKey);
+  }
+
+  /**
+   * Set the optional metadata for an object being created or copied.
+   * @param metadata to update.
+   */
+  protected void setOptionalObjectMetadata(ObjectMetadata metadata) {
+    final S3AEncryptionMethods algorithm
+        = getServerSideEncryptionAlgorithm();
+    if (S3AEncryptionMethods.SSE_S3 == algorithm) {
+      metadata.setSSEAlgorithm(algorithm.getMethod());
+    }
+  }
+
+  /**
+   * Create a new object metadata instance.
+   * Any standard metadata headers are added here, for example:
+   * encryption.
+   *
+   * @param length length of data to set in header; Ignored if negative
+   * @return a new metadata instance
+   */
+  @Override
+  public ObjectMetadata newObjectMetadata(long length) {
+    final ObjectMetadata om = new ObjectMetadata();
+    setOptionalObjectMetadata(om);
+    if (length >= 0) {
+      om.setContentLength(length);
+    }
+    return om;
+  }
+
+  @Override
+  public CopyObjectRequest newCopyObjectRequest(String srcKey,
+      String dstKey,
+      ObjectMetadata srcom) {
+    CopyObjectRequest copyObjectRequest =
+        new CopyObjectRequest(getBucket(), srcKey, getBucket(), dstKey);
+    ObjectMetadata dstom = newObjectMetadata(srcom.getContentLength());
+    HeaderProcessing.cloneObjectMetadata(srcom, dstom);
+    setOptionalObjectMetadata(dstom);
+    copyEncryptionParameters(srcom, copyObjectRequest);
+    copyObjectRequest.setCannedAccessControlList(cannedACL);
+    copyObjectRequest.setNewObjectMetadata(dstom);
+    Optional.ofNullable(srcom.getStorageClass())
+        .ifPresent(copyObjectRequest::setStorageClass);
+    return prepareRequest(copyObjectRequest);
+  }
+
+  /**
+   * Propagate encryption parameters from source file if set else use the
+   * current filesystem encryption settings.
+   * @param srcom source object metadata.
+   * @param copyObjectRequest copy object request body.
+   */
+  protected void copyEncryptionParameters(
+      ObjectMetadata srcom,
+      CopyObjectRequest copyObjectRequest) {
+    String sourceKMSId = srcom.getSSEAwsKmsKeyId();
+    if (isNotEmpty(sourceKMSId)) {
+      // source KMS ID is propagated
+      LOG.debug("Propagating SSE-KMS settings from source {}",
+          sourceKMSId);
+      copyObjectRequest.setSSEAwsKeyManagementParams(
+          new SSEAwsKeyManagementParams(sourceKMSId));
+    }
+    switch (getServerSideEncryptionAlgorithm()) {
+    case SSE_S3:
+      /* no-op; this is set in destination object metadata */
+      break;
+
+    case SSE_C:
+      generateSSECustomerKey().ifPresent(customerKey -> {
+        copyObjectRequest.setSourceSSECustomerKey(customerKey);
+        copyObjectRequest.setDestinationSSECustomerKey(customerKey);
+      });
+      break;
+
+    case SSE_KMS:
+      generateSSEAwsKeyParams().ifPresent(
+          copyObjectRequest::setSSEAwsKeyManagementParams);
+      break;
+    default:
+    }
+  }
+  /**
+   * Create a putObject request.
+   * Adds the ACL and metadata
+   * @param key key of object
+   * @param metadata metadata header
+   * @param srcfile source file
+   * @return the request
+   */
+  @Override
+  public PutObjectRequest newPutObjectRequest(String key,
+      ObjectMetadata metadata, File srcfile) {
+    Preconditions.checkNotNull(srcfile);
+    PutObjectRequest putObjectRequest = new PutObjectRequest(getBucket(), key,
+        srcfile);
+    setOptionalPutRequestParameters(putObjectRequest);
+    putObjectRequest.setCannedAcl(cannedACL);
+    putObjectRequest.setMetadata(metadata);
+    return prepareRequest(putObjectRequest);
+  }
+
+  /**
+   * Create a {@link PutObjectRequest} request.
+   * The metadata is assumed to have been configured with the size of the
+   * operation.
+   * @param key key of object
+   * @param metadata metadata header
+   * @param inputStream source data.
+   * @return the request
+   */
+  @Override
+  public PutObjectRequest newPutObjectRequest(String key,
+      ObjectMetadata metadata,
+      InputStream inputStream) {
+    Preconditions.checkNotNull(inputStream);
+    Preconditions.checkArgument(isNotEmpty(key), "Null/empty key");
+    PutObjectRequest putObjectRequest = new PutObjectRequest(getBucket(), key,
+        inputStream, metadata);
+    setOptionalPutRequestParameters(putObjectRequest);
+    putObjectRequest.setCannedAcl(cannedACL);
+    return prepareRequest(putObjectRequest);
+  }
+
+  @Override
+  public PutObjectRequest newDirectoryMarkerRequest(String directory) {
+    String key = directory.endsWith("/")
+        ? directory
+        : (directory + "/");
+    // an input stream which is laways empty
+    final InputStream im = new InputStream() {
+      @Override
+      public int read() throws IOException {
+        return -1;
+      }
+    };
+    // preparation happens in here
+    final ObjectMetadata md = newObjectMetadata(0L);
+    md.setContentType(HeaderProcessing.CONTENT_TYPE_X_DIRECTORY);
+    PutObjectRequest putObjectRequest =
+        newPutObjectRequest(key, md, im);
+    return putObjectRequest;
+  }
+
+  @Override
+  public ListMultipartUploadsRequest
+      newListMultipartUploadsRequest(String prefix) {
+    ListMultipartUploadsRequest request = new ListMultipartUploadsRequest(
+        getBucket());
+    if (prefix != null) {
+      request.setPrefix(prefix);
+    }
+    return prepareRequest(request);
+  }
+
+  @Override
+  public AbortMultipartUploadRequest newAbortMultipartUploadRequest(
+      String destKey,
+      String uploadId) {
+    return prepareRequest(new AbortMultipartUploadRequest(getBucket(),
+        destKey,
+        uploadId));
+  }
+
+  @Override
+  public InitiateMultipartUploadRequest newMultipartUploadRequest(
+      String destKey) {
+    final InitiateMultipartUploadRequest initiateMPURequest =
+        new InitiateMultipartUploadRequest(getBucket(),
+            destKey,
+            newObjectMetadata(-1));
+    initiateMPURequest.setCannedACL(getCannedACL());
+    setOptionalMultipartUploadRequestParameters(initiateMPURequest);
+    return prepareRequest(initiateMPURequest);
+  }
+
+  @Override
+  public CompleteMultipartUploadRequest newCompleteMultipartUploadRequest(
+      String destKey,
+      String uploadId,
+      List<PartETag> partETags) {
+    // a copy of the list is required, so that the AWS SDK doesn't
+    // attempt to sort an unmodifiable list.
+    return prepareRequest(new CompleteMultipartUploadRequest(bucket,
+        destKey, uploadId, new ArrayList<>(partETags)));
+

Review comment:
       nit: empty line

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3LogParser.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Class to help parse AWS S3 Logs.
+ * see https://docs.aws.amazon.com/AmazonS3/latest/userguide/LogFormat.html
+ *
+ * Getting the regexp right is surprisingly hard; this class does it
+ * explicitly and names each group in the process.
+ * All group names are included in {@link #GROUPS} in the order
+ * within the log entries.
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class S3LogParser {
+
+  /**
+   * Simple entry: anything up to a space.
+   * {@value}.
+   */
+  private static final String SIMPLE = "[^ ]*";
+
+  /**
+   * Date/Time. Everything within square braces.
+   * {@value}.
+   */
+  private static final String DATETIME = "\\[(.*?)\\]";
+
+  /**
+   * A natural number or "-".
+   * {@value}.
+   */
+  private static final String NUMBER = "(-|[0-9]*)";
+
+  /**
+   * A Quoted field or "-".
+   * {@value}.
+   */
+  private static final String QUOTED = "(-|\"[^\"]*\")";
+
+
+  /**
+   * An entry in the regexp.
+   * @param name name of the group
+   * @param pattern pattern to use in the regexp
+   * @return the pattern for the regexp
+   */
+  private static String e(String name , String pattern) {
+    return String.format("(?<%s>%s) ", name, pattern);
+  }
+
+  /**
+   * An entry in the regexp.
+   * @param name name of the group
+   * @param pattern pattern to use in the regexp
+   * @return the pattern for the regexp
+   */
+  private static String eNoTrailing(String name , String pattern) {
+    return String.format("(?<%s>%s)", name, pattern);
+  }
+
+
+  // simple entry
+
+  /**
+   * Simple entry using the {@link #SIMPLE} pattern.
+   * @param name name of the element (for code clarity only)
+   * @return the pattern for the regexp
+   */
+  private static String e(String name) {
+    return e(name, SIMPLE);
+  }
+
+  /**
+   * Quoted entry using the {@link #QUOTED} pattern.
+   * @param name name of the element (for code clarity only)
+   * @return the pattern for the regexp
+   */
+  private static String Q(String name) {

Review comment:
       nit: Why is this capital `Q` and the other is lowercase `e`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 597753)
    Time Spent: 18h  (was: 17h 50m)

> Add an Audit plugin point for S3A auditing/context
> --------------------------------------------------
>
>                 Key: HADOOP-17511
>                 URL: https://issues.apache.org/jira/browse/HADOOP-17511
>             Project: Hadoop Common
>          Issue Type: Sub-task
>    Affects Versions: 3.3.1
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 18h
>  Remaining Estimate: 0h
>
> Add a way for auditing tools to correlate S3 object calls with Hadoop FS API 
> calls.
> Initially just to log/forward to an auditing service.
> Later: let us attach them as parameters in S3 requests, such as opentrace 
> headeers or (my initial idea: http referrer header -where it will get into 
> the log)
> Challenges
> * ensuring the audit span is created for every public entry point. That will 
> have to include those used in s3guard tools, some defacto public APIs
> * and not re-entered for active spans. s3A code must not call back into the 
> FS API points
> * Propagation across worker threads



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to