This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 28a7199e80b [Java] Add job name to GCS custom audit info (#31316) 28a7199e80b is described below commit 28a7199e80bba9be8eb7f87ae7ab88c343492b40 Author: Shunping Huang <shunp...@google.com> AuthorDate: Fri May 17 11:21:40 2024 -0400 [Java] Add job name to GCS custom audit info (#31316) * Add job name to GCS custom audit info in Java * Apply spotless --- .../gcp/util/RetryHttpRequestInitializer.java | 11 +++++++ .../beam/sdk/extensions/gcp/util/Transport.java | 32 +++++++++++++++----- .../sdk/extensions/gcp/util/TransportTest.java | 35 ++++++++++++++++++---- 3 files changed, 65 insertions(+), 13 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java index edda1abc25f..cfc925a515e 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Set; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; @@ -208,6 +209,8 @@ public class RetryHttpRequestInitializer implements HttpRequestInitializer { private Set<Integer> ignoredResponseCodes = new HashSet<>(DEFAULT_IGNORED_RESPONSE_CODES); + private Map<String, String> httpHeaders = null; + public RetryHttpRequestInitializer() { this(Collections.emptyList()); } @@ -270,6 +273,10 @@ public class RetryHttpRequestInitializer implements HttpRequestInitializer { request.setUnsuccessfulResponseHandler(loggingHttpBackOffHandler); request.setIOExceptionHandler(loggingHttpBackOffHandler); + if (this.httpHeaders != null) { + request.getHeaders().putAll(this.httpHeaders); + } + // Set response initializer if (responseInterceptor != null) { request.setResponseInterceptor(responseInterceptor); @@ -284,4 +291,8 @@ public class RetryHttpRequestInitializer implements HttpRequestInitializer { public void setWriteTimeout(int writeTimeout) { this.writeTimeout = writeTimeout; } + + public void setHttpHeaders(Map<String, String> httpHeaders) { + this.httpHeaders = httpHeaders; + } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/Transport.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/Transport.java index a789b594f46..f7ecbfeda77 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/Transport.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/Transport.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.extensions.gcp.util; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings.isNullOrEmpty; + import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.HttpTransport; @@ -31,9 +33,12 @@ import java.net.MalformedURLException; import java.net.URL; import java.nio.file.Paths; import java.security.GeneralSecurityException; +import java.util.Optional; import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; /** Helpers for cloud communication. */ public class Transport { @@ -89,20 +94,31 @@ public class Transport { /** Returns a Cloud Storage client builder using the specified {@link GcsOptions}. */ public static Storage.Builder newStorageClient(GcsOptions options) { - String applicationNameSuffix = " (GPN:Beam)"; + String jobName = Optional.ofNullable(options.getJobName()).orElse("UNKNOWN"); + + String applicationName = + String.format( + "%sapache-beam/%s (GPN:Beam)", + isNullOrEmpty(options.getAppName()) ? "" : options.getAppName() + " ", + ReleaseInfo.getReleaseInfo().getSdkVersion()); + String servicePath = options.getGcsEndpoint(); + + // Do not log the code 404. Code up the stack will deal with 404's if needed, + // and logging it by default clutters the output during file staging. + RetryHttpRequestInitializer retryHttpRequestInitializer = + new RetryHttpRequestInitializer(ImmutableList.of(404), new UploadIdResponseInterceptor()); + + // Set custom audit info in request headers + retryHttpRequestInitializer.setHttpHeaders(ImmutableMap.of("x-goog-custom-audit-job", jobName)); + Storage.Builder storageBuilder = new Storage.Builder( getTransport(), getJsonFactory(), chainHttpRequestInitializer( - options.getGcpCredential(), - // Do not log the code 404. Code up the stack will deal with 404's if needed, - // and - // logging it by default clutters the output during file staging. - new RetryHttpRequestInitializer( - ImmutableList.of(404), new UploadIdResponseInterceptor()))) - .setApplicationName(options.getAppName() + applicationNameSuffix) + options.getGcpCredential(), retryHttpRequestInitializer)) + .setApplicationName(applicationName) .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); if (servicePath != null) { ApiComponents components = apiComponentsFromUrl(servicePath); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/TransportTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/TransportTest.java index 4df3dfb680e..4554e64c237 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/TransportTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/TransportTest.java @@ -19,12 +19,18 @@ package org.apache.beam.sdk.extensions.gcp.util; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import com.google.api.client.http.HttpRequest; import com.google.api.services.storage.Storage; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.ReleaseInfo; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -33,17 +39,36 @@ import org.junit.runners.JUnit4; public class TransportTest { @Test - public void testUserAgentInGcsRequestHeaders() throws IOException { + public void testUserAgentAndCustomAuditInGcsRequestHeaders() throws IOException { GcsOptions gcsOptions = PipelineOptionsFactory.as(GcsOptions.class); + gcsOptions.setGcpCredential(new TestCredential()); + gcsOptions.setJobName("test-job"); + gcsOptions.setAppName("test-app"); + Storage storageClient = Transport.newStorageClient(gcsOptions).build(); - Storage.Objects.Get getObject = storageClient.objects().get("testbucket", "testobject"); + Storage.Objects.Get getObject = storageClient.objects().get("test-bucket", "test-object"); + HttpRequest request = getObject.buildHttpRequest(); + // An example of user agent string will be like - // "TransportTest (GPN:Beam) Google-API-Java-Client/2.0.0" + // "test-app apache-beam/2.57.0.dev (GPN:Beam) Google-API-Java-Client/2.0.0" // For a valid user-agent string, a comment like "(GPN:Beam)" cannot be the first token. // https://www.rfc-editor.org/rfc/rfc7231#section-5.5.3 assertThat( - Arrays.asList(getObject.getRequestHeaders().getUserAgent().split(" ")) - .indexOf("(GPN:Beam)"), + Arrays.asList(request.getHeaders().getUserAgent().split(" ")).indexOf("test-app"), + greaterThanOrEqualTo(0)); + + assertThat( + Arrays.asList(request.getHeaders().getUserAgent().split(" ")) + .indexOf(String.format("apache-beam/%s", ReleaseInfo.getReleaseInfo().getSdkVersion())), greaterThan(0)); + + assertThat( + Arrays.asList(request.getHeaders().getUserAgent().split(" ")).indexOf("(GPN:Beam)"), + greaterThan(0)); + + // there should be one and only one custom audit entry for job name + assertEquals( + request.getHeaders().getHeaderStringValues("x-goog-custom-audit-job"), + Collections.singletonList("test-job")); } }