This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 28a7199e80b [Java] Add job name to GCS custom audit info (#31316)
28a7199e80b is described below

commit 28a7199e80bba9be8eb7f87ae7ab88c343492b40
Author: Shunping Huang <shunp...@google.com>
AuthorDate: Fri May 17 11:21:40 2024 -0400

    [Java] Add job name to GCS custom audit info (#31316)
    
    * Add job name to GCS custom audit info in Java
    
    * Apply spotless
---
 .../gcp/util/RetryHttpRequestInitializer.java      | 11 +++++++
 .../beam/sdk/extensions/gcp/util/Transport.java    | 32 +++++++++++++++-----
 .../sdk/extensions/gcp/util/TransportTest.java     | 35 ++++++++++++++++++----
 3 files changed, 65 insertions(+), 13 deletions(-)

diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
index edda1abc25f..cfc925a515e 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java
@@ -32,6 +32,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
@@ -208,6 +209,8 @@ public class RetryHttpRequestInitializer implements 
HttpRequestInitializer {
 
   private Set<Integer> ignoredResponseCodes = new 
HashSet<>(DEFAULT_IGNORED_RESPONSE_CODES);
 
+  private Map<String, String> httpHeaders = null;
+
   public RetryHttpRequestInitializer() {
     this(Collections.emptyList());
   }
@@ -270,6 +273,10 @@ public class RetryHttpRequestInitializer implements 
HttpRequestInitializer {
     request.setUnsuccessfulResponseHandler(loggingHttpBackOffHandler);
     request.setIOExceptionHandler(loggingHttpBackOffHandler);
 
+    if (this.httpHeaders != null) {
+      request.getHeaders().putAll(this.httpHeaders);
+    }
+
     // Set response initializer
     if (responseInterceptor != null) {
       request.setResponseInterceptor(responseInterceptor);
@@ -284,4 +291,8 @@ public class RetryHttpRequestInitializer implements 
HttpRequestInitializer {
   public void setWriteTimeout(int writeTimeout) {
     this.writeTimeout = writeTimeout;
   }
+
+  public void setHttpHeaders(Map<String, String> httpHeaders) {
+    this.httpHeaders = httpHeaders;
+  }
 }
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/Transport.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/Transport.java
index a789b594f46..f7ecbfeda77 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/Transport.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/Transport.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.extensions.gcp.util;
 
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings.isNullOrEmpty;
+
 import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
 import com.google.api.client.http.HttpRequestInitializer;
 import com.google.api.client.http.HttpTransport;
@@ -31,9 +33,12 @@ import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.file.Paths;
 import java.security.GeneralSecurityException;
+import java.util.Optional;
 import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.util.ReleaseInfo;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 
 /** Helpers for cloud communication. */
 public class Transport {
@@ -89,20 +94,31 @@ public class Transport {
 
   /** Returns a Cloud Storage client builder using the specified {@link 
GcsOptions}. */
   public static Storage.Builder newStorageClient(GcsOptions options) {
-    String applicationNameSuffix = " (GPN:Beam)";
+    String jobName = 
Optional.ofNullable(options.getJobName()).orElse("UNKNOWN");
+
+    String applicationName =
+        String.format(
+            "%sapache-beam/%s (GPN:Beam)",
+            isNullOrEmpty(options.getAppName()) ? "" : options.getAppName() + 
" ",
+            ReleaseInfo.getReleaseInfo().getSdkVersion());
+
     String servicePath = options.getGcsEndpoint();
+
+    // Do not log the code 404. Code up the stack will deal with 404's if 
needed,
+    // and logging it by default clutters the output during file staging.
+    RetryHttpRequestInitializer retryHttpRequestInitializer =
+        new RetryHttpRequestInitializer(ImmutableList.of(404), new 
UploadIdResponseInterceptor());
+
+    // Set custom audit info in request headers
+    
retryHttpRequestInitializer.setHttpHeaders(ImmutableMap.of("x-goog-custom-audit-job",
 jobName));
+
     Storage.Builder storageBuilder =
         new Storage.Builder(
                 getTransport(),
                 getJsonFactory(),
                 chainHttpRequestInitializer(
-                    options.getGcpCredential(),
-                    // Do not log the code 404. Code up the stack will deal 
with 404's if needed,
-                    // and
-                    // logging it by default clutters the output during file 
staging.
-                    new RetryHttpRequestInitializer(
-                        ImmutableList.of(404), new 
UploadIdResponseInterceptor())))
-            .setApplicationName(options.getAppName() + applicationNameSuffix)
+                    options.getGcpCredential(), retryHttpRequestInitializer))
+            .setApplicationName(applicationName)
             .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
     if (servicePath != null) {
       ApiComponents components = apiComponentsFromUrl(servicePath);
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/TransportTest.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/TransportTest.java
index 4df3dfb680e..4554e64c237 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/TransportTest.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/TransportTest.java
@@ -19,12 +19,18 @@ package org.apache.beam.sdk.extensions.gcp.util;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
 
+import com.google.api.client.http.HttpRequest;
 import com.google.api.services.storage.Storage;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.ReleaseInfo;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -33,17 +39,36 @@ import org.junit.runners.JUnit4;
 public class TransportTest {
 
   @Test
-  public void testUserAgentInGcsRequestHeaders() throws IOException {
+  public void testUserAgentAndCustomAuditInGcsRequestHeaders() throws 
IOException {
     GcsOptions gcsOptions = PipelineOptionsFactory.as(GcsOptions.class);
+    gcsOptions.setGcpCredential(new TestCredential());
+    gcsOptions.setJobName("test-job");
+    gcsOptions.setAppName("test-app");
+
     Storage storageClient = Transport.newStorageClient(gcsOptions).build();
-    Storage.Objects.Get getObject = storageClient.objects().get("testbucket", 
"testobject");
+    Storage.Objects.Get getObject = storageClient.objects().get("test-bucket", 
"test-object");
+    HttpRequest request = getObject.buildHttpRequest();
+
     // An example of user agent string will be like
-    // "TransportTest (GPN:Beam) Google-API-Java-Client/2.0.0"
+    // "test-app apache-beam/2.57.0.dev (GPN:Beam) 
Google-API-Java-Client/2.0.0"
     // For a valid user-agent string, a comment like "(GPN:Beam)" cannot be 
the first token.
     // https://www.rfc-editor.org/rfc/rfc7231#section-5.5.3
     assertThat(
-        Arrays.asList(getObject.getRequestHeaders().getUserAgent().split(" "))
-            .indexOf("(GPN:Beam)"),
+        Arrays.asList(request.getHeaders().getUserAgent().split(" 
")).indexOf("test-app"),
+        greaterThanOrEqualTo(0));
+
+    assertThat(
+        Arrays.asList(request.getHeaders().getUserAgent().split(" "))
+            .indexOf(String.format("apache-beam/%s", 
ReleaseInfo.getReleaseInfo().getSdkVersion())),
         greaterThan(0));
+
+    assertThat(
+        Arrays.asList(request.getHeaders().getUserAgent().split(" 
")).indexOf("(GPN:Beam)"),
+        greaterThan(0));
+
+    // there should be one and only one custom audit entry for job name
+    assertEquals(
+        request.getHeaders().getHeaderStringValues("x-goog-custom-audit-job"),
+        Collections.singletonList("test-job"));
   }
 }

Reply via email to