This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dd5bbb2d77d Add a read timeout and cache BigQueryIOMetadata (#29662)
dd5bbb2d77d is described below

commit dd5bbb2d77d7f3d057baad3d872677671c730345
Author: Sam Whittle <scwhit...@users.noreply.github.com>
AuthorDate: Tue Jan 16 09:56:10 2024 +0100

    Add a read timeout and cache BigQueryIOMetadata (#29662)
---
 .../sdk/extensions/gcp/util/GceMetadataUtil.java   |  1 +
 .../sdk/io/gcp/bigquery/BigQueryIOMetadata.java    | 38 ++++++++++++----------
 2 files changed, 21 insertions(+), 18 deletions(-)

diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
index fd49b759fd6..e63aa7dc677 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
@@ -44,6 +44,7 @@ public class GceMetadataUtil {
     int timeoutMillis = 5000;
     final HttpParams httpParams = new BasicHttpParams();
     HttpConnectionParams.setConnectionTimeout(httpParams, timeoutMillis);
+    HttpConnectionParams.setSoTimeout(httpParams, timeoutMillis);
     String ret = "";
     try {
       HttpClient client = new DefaultHttpClient(httpParams);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java
index 9cce436fe35..f8d261d3bf6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java
@@ -18,19 +18,25 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.beam.sdk.extensions.gcp.util.GceMetadataUtil;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Metadata class for BigQueryIO. i.e. to use as BQ job labels. */
 final class BigQueryIOMetadata {
 
-  private @Nullable String beamJobId;
+  private final @Nullable String beamJobId;
 
-  private @Nullable String beamJobName;
+  private final @Nullable String beamJobName;
 
-  private @Nullable String beamWorkerId;
+  private final @Nullable String beamWorkerId;
+
+  static final Supplier<BigQueryIOMetadata> INSTANCE =
+      Suppliers.memoizeWithExpiration(() -> refreshInstance(), 5, 
TimeUnit.MINUTES);
 
   private BigQueryIOMetadata(
       @Nullable String beamJobId, @Nullable String beamJobName, @Nullable 
String beamWorkerId) {
@@ -47,25 +53,21 @@ final class BigQueryIOMetadata {
    * being used.
    */
   public static BigQueryIOMetadata create() {
-    String dataflowJobId = GceMetadataUtil.fetchDataflowJobId();
-    String dataflowJobName = GceMetadataUtil.fetchDataflowJobName();
-    String dataflowWorkerId = GceMetadataUtil.fetchDataflowWorkerId();
+    return INSTANCE.get();
+  }
 
+  private static BigQueryIOMetadata refreshInstance() {
+    String dataflowJobId = GceMetadataUtil.fetchDataflowJobId();
     // If a Dataflow job id is returned on GCE metadata. Then it means
     // this program is running on a Dataflow GCE VM.
-    boolean isDataflowRunner = !dataflowJobId.isEmpty();
-
-    String beamJobId = null;
-    String beamJobName = null;
-    String beamWorkerId = null;
-    if (isDataflowRunner) {
-      if (BigQueryIOMetadata.isValidCloudLabel(dataflowJobId)) {
-        beamJobId = dataflowJobId;
-        beamJobName = dataflowJobName;
-        beamWorkerId = dataflowWorkerId;
-      }
+    if (dataflowJobId.isEmpty() || 
!BigQueryIOMetadata.isValidCloudLabel(dataflowJobId)) {
+      return new BigQueryIOMetadata(null, null, null);
     }
-    return new BigQueryIOMetadata(beamJobId, beamJobName, beamWorkerId);
+
+    return new BigQueryIOMetadata(
+        dataflowJobId,
+        GceMetadataUtil.fetchDataflowJobName(),
+        GceMetadataUtil.fetchDataflowWorkerId());
   }
 
   public Map<String, String> addAdditionalJobLabels(Map<String, String> 
jobLabels) {

Reply via email to