This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cfd9662aebf Update `it` module. (#28113)
cfd9662aebf is described below

commit cfd9662aebfb87497b117f870941ea47c589bbc9
Author: Pranav Bhandari <bhandari.prana...@gmail.com>
AuthorDate: Wed Aug 23 13:16:18 2023 -0400

    Update `it` module. (#28113)
---
 .../it/cassandra/CassandraResourceManager.java     | 28 +++++++----
 .../it/cassandra/CassandraResourceManagerTest.java |  3 ++
 .../ElasticsearchResourceManager.java              | 27 +++++++++--
 .../ElasticsearchResourceManagerTest.java          |  3 ++
 it/google-cloud-platform/build.gradle              | 13 +++---
 .../org/apache/beam/it/gcp/IOLoadTestBase.java     |  2 +-
 .../java/org/apache/beam/it/gcp/LoadTestBase.java  |  6 +--
 .../beam/it/gcp/artifacts/ArtifactClient.java      |  8 ++++
 .../it/gcp/bigtable/BigtableResourceManager.java   | 54 ++++++++++++++++++----
 .../it/gcp/dataflow/DefaultPipelineLauncher.java   | 13 ++----
 .../beam/it/gcp/datagenerator/DataGenerator.java   |  4 +-
 .../beam/it/gcp/datagenerator/package-info.java    |  1 +
 .../it/gcp/datastore/matchers/package-info.java    |  1 +
 .../apache/beam/it/gcp/kms/KMSResourceManager.java |  4 +-
 .../beam/it/gcp/pubsub/PubsubResourceManager.java  |  2 +
 .../beam/it/gcp/storage/GcsResourceManager.java    |  8 +++-
 .../apache/beam/it/gcp/bigquery/BigQueryIOLT.java  |  6 +--
 .../gcp/bigquery/BigQueryResourceManagerTest.java  |  3 +-
 .../gcp/bigtable/BigtableResourceManagerTest.java  | 47 ++++++++++---------
 .../it/gcp/dataflow/ClassicTemplateClientTest.java |  2 +-
 .../gcp/dataflow/DefaultPipelineLauncherTest.java  |  2 +-
 .../it/gcp/dataflow/FlexTemplateClientTest.java    |  2 +-
 .../gcp/datastore/DatastoreResourceManagerIT.java  |  7 ++-
 .../datastream/DatastreamResourceManagerTest.java  |  3 +-
 .../beam/it/gcp/kms/KMSResourceManagerIT.java      |  3 +-
 .../beam/it/gcp/kms/KMSResourceManagerTest.java    |  3 +-
 .../it/gcp/spanner/SpannerResourceManagerTest.java | 18 --------
 .../apache/beam/it/gcp/storage/FileBasedIOLT.java  |  2 +-
 .../beam/it/jdbc/MSSQLResourceManagerTest.java     |  2 +
 .../beam/it/jdbc/MySQLResourceManagerTest.java     |  2 +
 .../beam/it/jdbc/OracleResourceManagerTest.java    |  2 +
 .../beam/it/jdbc/PostgresResourceManagerTest.java  |  2 +
 .../apache/beam/it/kafka/KafkaResourceManager.java |  3 +-
 .../java/org/apache/beam/it/kafka/KafkaIOLT.java   |  2 +-
 .../beam/it/kafka/KafkaResourceManagerTest.java    |  2 +
 .../it/mongodb/MongoDBResourceManagerTest.java     |  2 +
 .../beam/it/neo4j/Neo4jResourceManagerTest.java    |  2 +
 .../beam/it/splunk/conditions/package-info.java    | 22 +++++----
 .../beam/it/splunk/matchers/package-info.java      | 22 +++++----
 .../org/apache/beam/it/splunk/package-info.java    | 22 +++++----
 .../beam/it/splunk/SplunkResourceManagerTest.java  |  2 +
 .../it/splunk/SplunkResourceManagerUtilsTest.java  |  2 +-
 .../TestContainerResourceManager.java              |  7 ++-
 .../TestContainerResourceManagerTest.java          | 11 +++++
 44 files changed, 240 insertions(+), 142 deletions(-)

diff --git 
a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManager.java
 
b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManager.java
index dc55709fff0..f7bf15eb764 100644
--- 
a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManager.java
+++ 
b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/CassandraResourceManager.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.it.common.ResourceManager;
+import org.apache.beam.it.common.utils.ExceptionUtils;
 import org.apache.beam.it.testcontainers.TestContainerResourceManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,12 +99,7 @@ public class CassandraResourceManager extends 
TestContainerResourceManager<Gener
 
     if (!usingStaticDatabase) {
       // Keyspace request may timeout on a few environments, if Cassandra is 
warming up
-      Failsafe.with(
-              RetryPolicy.builder()
-                  .withMaxRetries(5)
-                  .withDelay(Duration.ofSeconds(1))
-                  .handle(DriverTimeoutException.class)
-                  .build())
+      Failsafe.with(buildRetryPolicy())
           .run(
               () ->
                   this.cassandraClient.execute(
@@ -141,8 +137,11 @@ public class CassandraResourceManager extends 
TestContainerResourceManager<Gener
     LOG.info("Executing statement: {}", statement);
 
     try {
-      return cassandraClient.execute(
-          
SimpleStatement.newInstance(statement).setKeyspace(this.keyspaceName));
+      return Failsafe.with(buildRetryPolicy())
+          .get(
+              () ->
+                  cassandraClient.execute(
+                      
SimpleStatement.newInstance(statement).setKeyspace(this.keyspaceName)));
     } catch (Exception e) {
       throw new CassandraResourceManagerException("Error reading collection.", 
e);
     }
@@ -226,8 +225,9 @@ public class CassandraResourceManager extends 
TestContainerResourceManager<Gener
       } catch (Exception e) {
         LOG.error("Failed to drop Cassandra keyspace {}.", keyspaceName, e);
 
-        // Only bubble exception if the cause is not timeout, as it will be 
dropped with container.
-        if (e.getCause() == null || !(e.getCause() instanceof 
DriverTimeoutException)) {
+        // Only bubble exception if the cause is not timeout or does not exist
+        if (!ExceptionUtils.containsType(e, DriverTimeoutException.class)
+            && !ExceptionUtils.containsMessage(e, "does not exist")) {
           producedError = true;
         }
       }
@@ -277,6 +277,14 @@ public class CassandraResourceManager extends 
TestContainerResourceManager<Gener
     return String.format("INSERT INTO %s (%s) VALUES (%s)", tableName, 
columns, values);
   }
 
+  private static RetryPolicy<Object> buildRetryPolicy() {
+    return RetryPolicy.builder()
+        .withMaxRetries(5)
+        .withDelay(Duration.ofSeconds(1))
+        .handle(DriverTimeoutException.class)
+        .build();
+  }
+
   /** Builder for {@link CassandraResourceManager}. */
   public static final class Builder
       extends TestContainerResourceManager.Builder<CassandraResourceManager> {
diff --git 
a/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java
 
b/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java
index ef5e8433434..fe00457159f 100644
--- 
a/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java
+++ 
b/it/cassandra/src/test/java/org/apache/beam/it/cassandra/CassandraResourceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.it.cassandra;
 import static com.google.common.truth.Truth.assertThat;
 import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -57,6 +58,8 @@ public class CassandraResourceManagerTest {
 
   @Before
   public void setUp() {
+    doReturn(container).when(container).withLogConsumer(any());
+
     testManager =
         new CassandraResourceManager(
             cassandraClient, container, 
CassandraResourceManager.builder(TEST_ID));
diff --git 
a/it/elasticsearch/src/main/java/org/apache/beam/it/elasticsearch/ElasticsearchResourceManager.java
 
b/it/elasticsearch/src/main/java/org/apache/beam/it/elasticsearch/ElasticsearchResourceManager.java
index 775f0dd3105..c58875d7df8 100644
--- 
a/it/elasticsearch/src/main/java/org/apache/beam/it/elasticsearch/ElasticsearchResourceManager.java
+++ 
b/it/elasticsearch/src/main/java/org/apache/beam/it/elasticsearch/ElasticsearchResourceManager.java
@@ -20,6 +20,7 @@ package org.apache.beam.it.elasticsearch;
 import static 
org.apache.beam.it.elasticsearch.ElasticsearchUtils.checkValidIndexName;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -48,6 +49,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 import org.testcontainers.utility.DockerImageName;
 
@@ -81,11 +83,28 @@ public class ElasticsearchResourceManager extends 
TestContainerResourceManager<G
   private final List<String> managedIndexNames = new ArrayList<>();
 
   private ElasticsearchResourceManager(Builder builder) {
-    this(
-        /* elasticsearchClient= */ null,
+    this(/* elasticsearchClient= */ null, buildContainer(builder), builder);
+  }
+
+  /**
+   * Create the {@link ElasticsearchContainer} instance for the given builder.
+   *
+   * <p>The method override the wait strategy from the base container using 
the same regex, but
+   * increase the timeout to 2 minutes.
+   */
+  private static ElasticsearchContainer buildContainer(Builder builder) {
+    ElasticsearchContainer container =
         new ElasticsearchContainer(
-            
DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)),
-        builder);
+            
DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag));
+
+    //
+    // The regex is based on Elasticsearch container, but it's not exposed 
anywhere.
+    String regex = ".*(\"message\":\\s?\"started[\\s?|\"].*|] started\n$)";
+    Duration startupTimeout = Duration.ofMinutes(2);
+    container.setWaitStrategy(
+        new 
LogMessageWaitStrategy().withRegEx(regex).withStartupTimeout(startupTimeout));
+
+    return container;
   }
 
   @VisibleForTesting
diff --git 
a/it/elasticsearch/src/test/java/org/apache/beam/it/elasticsearch/ElasticsearchResourceManagerTest.java
 
b/it/elasticsearch/src/test/java/org/apache/beam/it/elasticsearch/ElasticsearchResourceManagerTest.java
index 2b233c1793a..5778f24e466 100644
--- 
a/it/elasticsearch/src/test/java/org/apache/beam/it/elasticsearch/ElasticsearchResourceManagerTest.java
+++ 
b/it/elasticsearch/src/test/java/org/apache/beam/it/elasticsearch/ElasticsearchResourceManagerTest.java
@@ -21,6 +21,7 @@ import static com.google.common.truth.Truth.assertThat;
 import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -66,6 +67,8 @@ public class ElasticsearchResourceManagerTest {
   @Before
   public void setUp() {
     when(container.getHttpHostAddress()).thenReturn(HOST + ":" + MAPPED_PORT);
+    doReturn(container).when(container).withLogConsumer(any());
+
     testManager =
         new ElasticsearchResourceManager(
             elasticsearchClient, container, 
ElasticsearchResourceManager.builder(TEST_ID));
diff --git a/it/google-cloud-platform/build.gradle 
b/it/google-cloud-platform/build.gradle
index 320310049b9..f43b3f25720 100644
--- a/it/google-cloud-platform/build.gradle
+++ b/it/google-cloud-platform/build.gradle
@@ -44,26 +44,27 @@ dependencies {
     implementation library.java.protobuf_java_util
     implementation library.java.protobuf_java
     implementation library.java.threetenbp
+    implementation 'org.awaitility:awaitility:4.2.0'
     // Google Cloud Dependencies
     implementation library.java.google_api_services_bigquery
     implementation library.java.google_cloud_core
     implementation 'com.google.cloud:google-cloud-storage'
     implementation 'com.google.cloud:google-cloud-bigquery'
     implementation 'com.google.cloud:google-cloud-monitoring'
-    implementation 'com.google.api.grpc:proto-google-cloud-monitoring-v3'
+    provided 'com.google.api.grpc:proto-google-cloud-monitoring-v3'
     implementation 'com.google.cloud:google-cloud-bigtable'
     implementation 'com.google.cloud:google-cloud-spanner'
     implementation 'com.google.cloud:google-cloud-pubsub'
-    implementation 'com.google.api.grpc:proto-google-cloud-pubsub-v1'
+    provided 'com.google.api.grpc:proto-google-cloud-pubsub-v1'
     implementation 'com.google.cloud:google-cloud-pubsublite'
-    implementation 'com.google.api.grpc:proto-google-cloud-pubsublite-v1'
+    provided 'com.google.api.grpc:proto-google-cloud-pubsublite-v1'
     implementation 'com.google.cloud:google-cloud-datastore'
     implementation 'com.google.cloud:google-cloud-datastream'
-    implementation 'com.google.api.grpc:proto-google-cloud-datastream-v1'
+    provided 'com.google.api.grpc:proto-google-cloud-datastream-v1'
     implementation 'com.google.cloud:google-cloud-kms'
-    implementation 'com.google.api.grpc:proto-google-cloud-kms-v1'
+    provided 'com.google.api.grpc:proto-google-cloud-kms-v1'
     implementation 'com.google.cloud:google-cloud-dlp'
-    implementation 'com.google.api.grpc:proto-google-cloud-dlp-v2'
+    provided 'com.google.api.grpc:proto-google-cloud-dlp-v2'
     implementation 'com.google.cloud:google-cloud-secretmanager'
     provided 'com.google.api.grpc:proto-google-cloud-secretmanager-v1'
 
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java
index b22a0234fc6..32f262f2eac 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java
@@ -63,7 +63,7 @@ public class IOLoadTestBase extends LoadTestBase {
 
   @Override
   PipelineLauncher launcher() {
-    return DefaultPipelineLauncher.builder().build();
+    return DefaultPipelineLauncher.builder(CREDENTIALS).build();
   }
 
   /** A utility DoFn that counts elements passed through. */
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
index 139a485f90e..f6e359fed96 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
@@ -123,7 +123,7 @@ public abstract class LoadTestBase {
     monitoringClient.cleanupAll();
   }
 
-  abstract PipelineLauncher launcher() throws IOException;
+  abstract PipelineLauncher launcher();
 
   /**
    * Exports the metrics of given dataflow job to BigQuery.
@@ -131,8 +131,7 @@ public abstract class LoadTestBase {
    * @param launchInfo Job info of the job
    * @param metrics metrics to export
    */
-  protected void exportMetricsToBigQuery(LaunchInfo launchInfo, Map<String, 
Double> metrics)
-      throws IOException {
+  protected void exportMetricsToBigQuery(LaunchInfo launchInfo, Map<String, 
Double> metrics) {
     LOG.info("Exporting metrics:\n{}", formatForLogging(metrics));
     try {
       // either use the user specified project for exporting, or the same 
project
@@ -140,7 +139,6 @@ public abstract class LoadTestBase {
       BigQueryResourceManager bigQueryResourceManager =
           BigQueryResourceManager.builder(testName, exportProject, CREDENTIALS)
               .setDatasetId(TestProperties.exportDataset())
-              .setCredentials(CREDENTIALS)
               .build();
       // exporting metrics to bigQuery table
       Map<String, Object> rowContent = new HashMap<>();
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/artifacts/ArtifactClient.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/artifacts/ArtifactClient.java
index 201461f5db8..e463baffeaf 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/artifacts/ArtifactClient.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/artifacts/ArtifactClient.java
@@ -53,6 +53,14 @@ public interface ArtifactClient extends ResourceManager {
   /** Returns the id associated with the particular run of the test class. */
   String runId();
 
+  /**
+   * Returns a path the artifact will be created at.
+   *
+   * @param artifactName Artifact name
+   * @return GCS path where the artifact will be created
+   */
+  String getPathForArtifact(String artifactName);
+
   /**
    * Creates a new artifact in whatever service is being used to store them.
    *
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
index c2adea1e0dc..1e6750cc81e 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
@@ -21,6 +21,7 @@ import static 
org.apache.beam.it.common.utils.ResourceManagerUtils.checkValidPro
 import static 
org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.checkValidTableId;
 import static 
org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateDefaultClusters;
 import static 
org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateInstanceId;
+import static org.awaitility.Awaitility.await;
 
 import com.google.api.gax.core.CredentialsProvider;
 import com.google.api.gax.rpc.ServerStream;
@@ -37,6 +38,8 @@ import 
com.google.cloud.bigtable.admin.v2.models.CreateInstanceRequest;
 import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
 import com.google.cloud.bigtable.admin.v2.models.GCRules;
 import com.google.cloud.bigtable.admin.v2.models.StorageType;
+import com.google.cloud.bigtable.admin.v2.models.Table;
+import com.google.cloud.bigtable.admin.v2.models.UpdateTableRequest;
 import com.google.cloud.bigtable.data.v2.BigtableDataClient;
 import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
 import com.google.cloud.bigtable.data.v2.models.Query;
@@ -49,6 +52,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.it.common.ResourceManager;
 import org.apache.commons.lang3.StringUtils;
@@ -72,7 +77,7 @@ import org.threeten.bp.Duration;
 public class BigtableResourceManager implements ResourceManager {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BigtableResourceManager.class);
-  private static final String DEFAULT_CLUSTER_ZONE = "us-central1-a";
+  private static final String DEFAULT_CLUSTER_ZONE = "us-central1-b";
   private static final int DEFAULT_CLUSTER_NUM_NODES = 1;
   private static final StorageType DEFAULT_CLUSTER_STORAGE_TYPE = 
StorageType.SSD;
 
@@ -84,6 +89,8 @@ public class BigtableResourceManager implements 
ResourceManager {
   private final List<String> createdTables;
   // List to store created app profiles for static RM
   private final List<String> createdAppProfiles;
+  // List of tables we enabled CDC for
+  private final Set<String> cdcEnabledTables;
 
   private boolean hasInstance;
   private final boolean usingStaticInstance;
@@ -103,6 +110,7 @@ public class BigtableResourceManager implements 
ResourceManager {
     this.projectId = builder.projectId;
     this.createdTables = new ArrayList<>();
     this.createdAppProfiles = new ArrayList<>();
+    this.cdcEnabledTables = new HashSet<>();
 
     // Check if RM was configured to use static Bigtable instance.
     if (builder.useStaticInstance) {
@@ -191,8 +199,10 @@ public class BigtableResourceManager implements 
ResourceManager {
 
     // Check to see if instance already exists, and throw error if it does
     if (hasInstance) {
-      throw new IllegalStateException(
-          "Instance " + instanceId + " already exists for project " + 
projectId + ".");
+      LOG.warn(
+          "Skipping instance creation. Instance was already created or static 
instance was passed. Reusing : {}.",
+          instanceId);
+      return;
     }
 
     LOG.info("Creating instance {} in project {}.", instanceId, projectId);
@@ -341,8 +351,22 @@ public class BigtableResourceManager implements 
ResourceManager {
           createTableRequest.addFamily(
               columnFamily, 
GCRules.GCRULES.maxAge(bigtableTableSpec.getMaxAge()));
         }
-        // TODO: Set CDC enabled
+        if (bigtableTableSpec.getCdcEnabled()) {
+          createTableRequest.addChangeStreamRetention(Duration.ofDays(7));
+          cdcEnabledTables.add(tableId);
+        }
         tableAdminClient.createTable(createTableRequest);
+
+        await("Waiting for all tables to be replicated.")
+            .atMost(java.time.Duration.ofMinutes(10))
+            .pollInterval(java.time.Duration.ofSeconds(5))
+            .until(
+                () -> {
+                  Table t = tableAdminClient.getTable(tableId);
+                  Map<String, Table.ReplicationState> rs = 
t.getReplicationStatesByClusterId();
+                  return 
rs.values().stream().allMatch(Table.ReplicationState.READY::equals);
+                });
+
       } else {
         throw new IllegalStateException(
             "Table " + tableId + " already exists for instance " + instanceId 
+ ".");
@@ -534,12 +558,26 @@ public class BigtableResourceManager implements 
ResourceManager {
   public synchronized void cleanupAll() throws 
BigtableResourceManagerException {
     LOG.info("Attempting to cleanup manager.");
 
-    if (usingStaticInstance) {
-      try (BigtableTableAdminClient tableAdminClient =
-          bigtableResourceManagerClientFactory.bigtableTableAdminClient()) {
-        createdTables.forEach(tableAdminClient::deleteTable);
+    try (BigtableTableAdminClient tableAdminClient =
+        bigtableResourceManagerClientFactory.bigtableTableAdminClient()) {
+      // Change streams must be disabled before table or instance can be 
deleted
+      for (String tableId : cdcEnabledTables) {
+        
tableAdminClient.updateTable(UpdateTableRequest.of(tableId).disableChangeStreamRetention());
+      }
+
+      if (usingStaticInstance) {
         LOG.info(
             "This manager was configured to use a static instance that will 
not be cleaned up.");
+
+        // Remove managed tables
+        createdTables.forEach(tableAdminClient::deleteTable);
+
+        // Remove managed app profiles
+        try (BigtableInstanceAdminClient instanceAdminClient =
+            
bigtableResourceManagerClientFactory.bigtableInstanceAdminClient()) {
+          createdAppProfiles.forEach(
+              profile -> instanceAdminClient.deleteAppProfile(instanceId, 
profile, true));
+        }
         return;
       }
     }
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
index 9da7f67d4d7..7918dd6227d 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
@@ -109,8 +109,8 @@ public class DefaultPipelineLauncher extends 
AbstractPipelineLauncher {
                 : new HttpCredentialsAdapter(builder.getCredentials())));
   }
 
-  public static Builder builder() {
-    return new Builder();
+  public static Builder builder(Credentials credentials) {
+    return new Builder(credentials);
   }
 
   @Override
@@ -462,17 +462,14 @@ public class DefaultPipelineLauncher extends 
AbstractPipelineLauncher {
   public static final class Builder {
     private Credentials credentials;
 
-    private Builder() {}
+    private Builder(Credentials credentials) {
+      this.credentials = credentials;
+    }
 
     public Credentials getCredentials() {
       return credentials;
     }
 
-    public Builder setCredentials(Credentials value) {
-      credentials = value;
-      return this;
-    }
-
     public DefaultPipelineLauncher build() {
       return new DefaultPipelineLauncher(this);
     }
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
index 76d7f32286e..99016b5dd3a 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
@@ -51,7 +51,7 @@ public class DataGenerator {
   private final PipelineLauncher pipelineLauncher;
   private final PipelineOperator pipelineOperator;
 
-  private DataGenerator(Builder builder) throws IOException {
+  private DataGenerator(Builder builder) {
     pipelineLauncher = FlexTemplateClient.builder(CREDENTIALS).build();
     pipelineOperator = new PipelineOperator(pipelineLauncher);
     this.dataGeneratorOptions =
@@ -249,7 +249,7 @@ public class DataGenerator {
       return this;
     }
 
-    public DataGenerator build() throws IOException {
+    public DataGenerator build() {
       return new DataGenerator(this);
     }
   }
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/package-info.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/package-info.java
index 04e2dd4b73a..d563025ebeb 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/package-info.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/package-info.java
@@ -15,5 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 /** Data generator for load tests. */
 package org.apache.beam.it.gcp.datagenerator;
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/package-info.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/package-info.java
index bcdf04716c5..e68f7b244d9 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/package-info.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/package-info.java
@@ -15,5 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 /** Package for Datastore Truth matchers / subjects to have reusable 
assertions. */
 package org.apache.beam.it.gcp.datastore.matchers;
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
index 6b6d7194f49..7e1a403c735 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
@@ -110,8 +110,8 @@ public class KMSResourceManager implements ResourceManager {
   }
 
   /**
-   * Retrieves a KMS crypto key from GCS, creating it if it does not exist. If 
the given keyring
-   * also does not already exist, it will be created.
+   * Retrieves a KMS crypto key, creating it if it does not exist. If the 
given keyring also does
+   * not already exist, it will be created.
    *
    * @param keyRingId The name of the keyring to insert the key to.
    * @param keyName The name of the KMS crypto key to retrieve.
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
index 2d4d9a7d7de..3a684d34c04 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
@@ -28,6 +28,7 @@ import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
 import com.google.cloud.pubsub.v1.TopicAdminClient;
 import com.google.cloud.pubsub.v1.TopicAdminSettings;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.FieldMask;
 import com.google.pubsub.v1.Encoding;
 import com.google.pubsub.v1.ProjectName;
 import com.google.pubsub.v1.PubsubMessage;
@@ -271,6 +272,7 @@ public final class PubsubResourceManager implements 
ResourceManager {
     createdSchemas.add(SchemaName.parse(schema.getName()));
     topicAdminClient.updateTopic(
         UpdateTopicRequest.newBuilder()
+            .setUpdateMask(FieldMask.newBuilder().addPaths("schema_settings"))
             .setTopic(
                 Topic.newBuilder()
                     .setName(schemaTopic.toString())
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/storage/GcsResourceManager.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/storage/GcsResourceManager.java
index 1c09f5a6f71..730ca23ee54 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/storage/GcsResourceManager.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/storage/GcsResourceManager.java
@@ -84,8 +84,7 @@ public final class GcsResourceManager implements 
ArtifactClient, ResourceManager
   }
 
   /** Returns a new {@link Builder} for configuring a client. */
-  public static Builder builder(String bucket, String testClassName, 
Credentials credentials)
-      throws IOException {
+  public static Builder builder(String bucket, String testClassName, 
Credentials credentials) {
     checkArgument(!bucket.equals(""));
     checkArgument(!testClassName.equals(""));
 
@@ -97,6 +96,11 @@ public final class GcsResourceManager implements 
ArtifactClient, ResourceManager
     return runId;
   }
 
+  @Override
+  public String getPathForArtifact(String artifactName) {
+    return joinPathParts(testClassName, runId, artifactName);
+  }
+
   @Override
   public Artifact createArtifact(String artifactName, String contents) {
     return this.createArtifact(artifactName, 
contents.getBytes(StandardCharsets.UTF_8));
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java
index b179019a162..03f6e8abfd4 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java
@@ -109,11 +109,9 @@ public final class BigQueryIOLT extends IOLoadTestBase {
   @Rule public TestPipeline readPipeline = TestPipeline.create();
 
   @BeforeClass
-  public static void beforeClass() throws IOException {
+  public static void beforeClass() {
     resourceManager =
-        BigQueryResourceManager.builder("io-bigquery-lt", project, CREDENTIALS)
-            .setCredentials(CREDENTIALS)
-            .build();
+        BigQueryResourceManager.builder("io-bigquery-lt", project, 
CREDENTIALS).build();
     resourceManager.createDataset(region);
   }
 
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManagerTest.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManagerTest.java
index 53debe3ffa9..e9cd2523875 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManagerTest.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManagerTest.java
@@ -39,7 +39,6 @@ import com.google.cloud.bigquery.StandardSQLTypeName;
 import com.google.cloud.bigquery.Table;
 import com.google.cloud.bigquery.TableId;
 import com.google.cloud.bigquery.TableInfo;
-import java.io.IOException;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.junit.Before;
@@ -85,7 +84,7 @@ public class BigQueryResourceManagerTest {
   }
 
   @Test
-  public void testGetDatasetIdReturnsCorrectValue() throws IOException {
+  public void testGetDatasetIdReturnsCorrectValue() {
     BigQueryResourceManager tm = BigQueryResourceManager.builder(TEST_ID, 
PROJECT_ID, null).build();
 
     assertThat(tm.getDatasetId()).matches(TEST_ID.replace('-', '_') + 
"_\\d{8}_\\d{6}_\\d{6}");
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java
index a0f4d963c0b..65745aea49b 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java
@@ -33,11 +33,14 @@ import com.google.api.gax.rpc.ServerStream;
 import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient;
 import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
 import com.google.cloud.bigtable.admin.v2.models.StorageType;
+import com.google.cloud.bigtable.admin.v2.models.Table.ReplicationState;
 import com.google.cloud.bigtable.data.v2.BigtableDataClient;
 import com.google.cloud.bigtable.data.v2.models.Row;
 import com.google.cloud.bigtable.data.v2.models.RowMutation;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import org.junit.Before;
 import org.junit.Rule;
@@ -95,13 +98,6 @@ public class BigtableResourceManagerTest {
     assertThat(rm.getProjectId()).matches(PROJECT_ID);
   }
 
-  @Test
-  public void 
testCreateInstanceShouldThrowExceptionWhenInstanceAlreadyExists() {
-    testManager.createInstance(cluster);
-
-    assertThrows(IllegalStateException.class, () -> 
testManager.createInstance(cluster));
-  }
-
   @Test
   public void 
testCreateInstanceShouldThrowExceptionWhenClientFailsToCreateInstance() {
     
when(bigtableResourceManagerClientFactory.bigtableInstanceAdminClient().createInstance(any()))
@@ -119,20 +115,6 @@ public class BigtableResourceManagerTest {
     assertThrows(BigtableResourceManagerException.class, () -> 
testManager.createInstance(cluster));
   }
 
-  @Test
-  public void testCreateInstanceShouldThrowErrorWhenUsingStaticInstance() 
throws IOException {
-    String instanceId = "static-instance";
-    testManager =
-        new BigtableResourceManager(
-            BigtableResourceManager.builder(TEST_ID, PROJECT_ID, null)
-                .setInstanceId(instanceId)
-                .useStaticInstance(),
-            bigtableResourceManagerClientFactory);
-
-    assertThrows(IllegalStateException.class, () -> 
testManager.createInstance(cluster));
-    assertThat(testManager.getInstanceId()).matches(instanceId);
-  }
-
   @Test
   public void testCreateInstanceShouldWorkWhenBigtableDoesNotThrowAnyError() {
     testManager.createInstance(cluster);
@@ -143,6 +125,8 @@ public class BigtableResourceManagerTest {
 
   @Test
   public void 
testCreateTableShouldNotCreateInstanceWhenInstanceAlreadyExists() {
+    setupReadyTable();
+
     testManager.createInstance(cluster);
     Mockito.lenient()
         .when(
@@ -195,6 +179,7 @@ public class BigtableResourceManagerTest {
 
   @Test
   public void 
testCreateTableShouldThrowErrorWhenTableAdminClientFailsToClose() {
+    setupReadyTable();
     BigtableTableAdminClient mockClient =
         bigtableResourceManagerClientFactory.bigtableTableAdminClient();
     doThrow(RuntimeException.class).when(mockClient).close();
@@ -206,6 +191,8 @@ public class BigtableResourceManagerTest {
 
   @Test
   public void testCreateTableShouldWorkWhenBigtableDoesNotThrowAnyError() {
+    setupReadyTable();
+
     
when(bigtableResourceManagerClientFactory.bigtableTableAdminClient().exists(anyString()))
         .thenReturn(false);
 
@@ -427,17 +414,33 @@ public class BigtableResourceManagerTest {
     
when(bigtableResourceManagerClientFactory.bigtableTableAdminClient().exists(anyString()))
         .thenReturn(false);
 
+    setupReadyTable();
+
     testManager.createTable(TABLE_ID, ImmutableList.of("cf1"));
 
     testManager.cleanupAll();
     
verify(bigtableResourceManagerClientFactory.bigtableTableAdminClient()).deleteTable(TABLE_ID);
     verify(bigtableResourceManagerClientFactory.bigtableTableAdminClient(), 
new Times(1))
         .deleteTable(anyString());
-    verify(bigtableResourceManagerClientFactory, 
never()).bigtableInstanceAdminClient();
+    verify(bigtableResourceManagerClientFactory.bigtableInstanceAdminClient(), 
never())
+        .deleteInstance(any());
+  }
+
+  private void setupReadyTable() {
+    Map<String, ReplicationState> allReplicated = new HashMap<>();
+    allReplicated.put(CLUSTER_ID, ReplicationState.READY);
+
+    when(bigtableResourceManagerClientFactory
+            .bigtableTableAdminClient()
+            .getTable(TABLE_ID)
+            .getReplicationStatesByClusterId())
+        .thenReturn(allReplicated);
   }
 
   @Test
   public void testCleanupAllShouldWorkWhenBigtableDoesNotThrowAnyError() {
+    setupReadyTable();
+
     testManager.createTable(TABLE_ID, ImmutableList.of("cf1"));
     
when(bigtableResourceManagerClientFactory.bigtableTableAdminClient().exists(anyString()))
         .thenReturn(true);
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/ClassicTemplateClientTest.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/ClassicTemplateClientTest.java
index 41dc6763738..cfd56e596e5 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/ClassicTemplateClientTest.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/ClassicTemplateClientTest.java
@@ -79,7 +79,7 @@ public final class ClassicTemplateClientTest {
   @Captor private ArgumentCaptor<CreateJobFromTemplateRequest> requestCaptor;
 
   @Test
-  public void testCreateWithCredentials() throws IOException {
+  public void testCreateWithCredentials() {
     Credentials credentials = mock(Credentials.class);
     ClassicTemplateClient.builder(credentials).build();
     // Lack of exception is all we really can test
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncherTest.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncherTest.java
index 92d222fee30..b6c0f8cdc58 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncherTest.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncherTest.java
@@ -41,7 +41,7 @@ public class DefaultPipelineLauncherTest {
 
   @Test
   public void testPipelineMetrics() throws IOException {
-    DefaultPipelineLauncher launcher = 
DefaultPipelineLauncher.builder().build();
+    DefaultPipelineLauncher launcher = 
DefaultPipelineLauncher.builder(null).build();
     final String timeMetrics = "run_time";
     final String counterMetrics = "counter";
     final long numElements = 1000L;
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/FlexTemplateClientTest.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/FlexTemplateClientTest.java
index ae49972bd06..4088efe6751 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/FlexTemplateClientTest.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/FlexTemplateClientTest.java
@@ -81,7 +81,7 @@ public final class FlexTemplateClientTest {
   @Captor private ArgumentCaptor<LaunchFlexTemplateRequest> requestCaptor;
 
   @Test
-  public void testCreateWithCredentials() throws IOException {
+  public void testCreateWithCredentials() {
     Credentials credentials = mock(Credentials.class);
     FlexTemplateClient.builder(credentials).build();
     // Lack of exception is all we really can test
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/datastore/DatastoreResourceManagerIT.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/datastore/DatastoreResourceManagerIT.java
index ccb0ea0bafe..02bf43f3275 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/datastore/DatastoreResourceManagerIT.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/datastore/DatastoreResourceManagerIT.java
@@ -20,7 +20,6 @@ package org.apache.beam.it.gcp.datastore;
 import static com.google.common.truth.Truth.assertThat;
 
 import com.google.cloud.datastore.Entity;
-import java.io.IOException;
 import java.util.List;
 import org.apache.beam.it.common.TestProperties;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -30,7 +29,7 @@ import org.junit.Test;
 public class DatastoreResourceManagerIT {
 
   @Test
-  public void testInsert() throws IOException {
+  public void testInsert() {
     DatastoreResourceManager resourceManager =
         DatastoreResourceManager.builder(
                 TestProperties.project(),
@@ -51,7 +50,7 @@ public class DatastoreResourceManagerIT {
   }
 
   @Test
-  public void testInsertQuery() throws IOException {
+  public void testInsertQuery() {
     DatastoreResourceManager resourceManager =
         DatastoreResourceManager.builder(
                 TestProperties.project(),
@@ -75,7 +74,7 @@ public class DatastoreResourceManagerIT {
   }
 
   @Test
-  public void testInsertCleanUp() throws IOException {
+  public void testInsertCleanUp() {
     DatastoreResourceManager resourceManager =
         DatastoreResourceManager.builder(
                 TestProperties.project(),
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/datastream/DatastreamResourceManagerTest.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/datastream/DatastreamResourceManagerTest.java
index cb9e9f8f9bc..d0afe8ae90f 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/datastream/DatastreamResourceManagerTest.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/datastream/DatastreamResourceManagerTest.java
@@ -36,7 +36,6 @@ import com.google.cloud.datastream.v1.SourceConfig;
 import com.google.cloud.datastream.v1.Stream;
 import com.google.cloud.datastream.v1.Stream.State;
 import com.google.cloud.datastream.v1.UpdateStreamRequest;
-import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import org.junit.Before;
 import org.junit.Rule;
@@ -66,7 +65,7 @@ public class DatastreamResourceManagerTest {
   private DatastreamResourceManager testManager;
 
   @Before
-  public void setup() throws IOException {
+  public void setup() {
     testManager =
         new DatastreamResourceManager(
             datastreamClient, DatastreamResourceManager.builder(PROJECT_ID, 
LOCATION, null));
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/kms/KMSResourceManagerIT.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/kms/KMSResourceManagerIT.java
index 8cbb13b95f6..1dd0a5eda30 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/kms/KMSResourceManagerIT.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/kms/KMSResourceManagerIT.java
@@ -19,7 +19,6 @@ package org.apache.beam.it.gcp.kms;
 
 import static com.google.common.truth.Truth.assertThat;
 
-import java.io.IOException;
 import org.apache.beam.it.gcp.GCPBaseIT;
 import org.apache.beam.it.gcp.GoogleCloudIntegrationTest;
 import org.apache.commons.lang3.RandomStringUtils;
@@ -41,7 +40,7 @@ public class KMSResourceManagerIT extends GCPBaseIT {
   private KMSResourceManager kmsResourceManager;
 
   @Before
-  public void setUp() throws IOException {
+  public void setUp() {
     kmsResourceManager =
         KMSResourceManager.builder(PROJECT, 
credentialsProvider).setRegion(KMS_REGION).build();
   }
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/kms/KMSResourceManagerTest.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/kms/KMSResourceManagerTest.java
index 8a0c98cfbf1..a386eb63fc6 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/kms/KMSResourceManagerTest.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/kms/KMSResourceManagerTest.java
@@ -35,7 +35,6 @@ import com.google.cloud.kms.v1.KeyRing;
 import com.google.cloud.kms.v1.KeyRingName;
 import com.google.cloud.kms.v1.LocationName;
 import com.google.protobuf.ByteString;
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -68,7 +67,7 @@ public class KMSResourceManagerTest {
   private KMSResourceManager testManager;
 
   @Before
-  public void setUp() throws IOException {
+  public void setUp() {
     testManager =
         new KMSResourceManager(
             kmsClientFactory, KMSResourceManager.builder(PROJECT_ID, 
null).setRegion(REGION));
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerResourceManagerTest.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerResourceManagerTest.java
index ded5bda4de8..0d3aed34f3a 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerResourceManagerTest.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerResourceManagerTest.java
@@ -196,7 +196,6 @@ public final class SpannerResourceManagerTest {
     // arrange
     prepareTable();
     
when(spanner.getDatabaseClient(any()).write(any())).thenReturn(Timestamp.now());
-    // spotless:off
     Mutation testMutation =
         Mutation.newInsertOrUpdateBuilder("SingerId")
             .set("SingerId")
@@ -206,7 +205,6 @@ public final class SpannerResourceManagerTest {
             .set("LastName")
             .to("Richards")
             .build();
-    // spotless:on
 
     // act
     testManager.write(testMutation);
@@ -220,7 +218,6 @@ public final class SpannerResourceManagerTest {
   @Test
   public void 
testWriteSingleRecordShouldThrowExceptionWhenCalledBeforeExecuteDdlStatement() {
     // arrange
-    // spotless:off
     Mutation testMutation =
         Mutation.newInsertOrUpdateBuilder("SingerId")
             .set("SingerId")
@@ -230,7 +227,6 @@ public final class SpannerResourceManagerTest {
             .set("LastName")
             .to("Richards")
             .build();
-    // spotless:on
 
     // act & assert
     assertThrows(IllegalStateException.class, () -> 
testManager.write(testMutation));
@@ -242,7 +238,6 @@ public final class SpannerResourceManagerTest {
     // arrange
     prepareTable();
     
when(spanner.getDatabaseClient(any()).write(any())).thenThrow(SpannerException.class);
-    // spotless:off
     Mutation testMutation =
         Mutation.newInsertOrUpdateBuilder("SingerId")
             .set("SingerId")
@@ -252,7 +247,6 @@ public final class SpannerResourceManagerTest {
             .set("LastName")
             .to("Richards")
             .build();
-    // spotless:on
 
     // act & assert
     assertThrows(SpannerResourceManagerException.class, () -> 
testManager.write(testMutation));
@@ -264,7 +258,6 @@ public final class SpannerResourceManagerTest {
     // arrange
     prepareTable();
     
when(spanner.getDatabaseClient(any()).write(any())).thenReturn(Timestamp.now());
-    // spotless:off
     ImmutableList<Mutation> testMutations =
         ImmutableList.of(
             Mutation.newInsertOrUpdateBuilder("SingerId")
@@ -283,7 +276,6 @@ public final class SpannerResourceManagerTest {
                 .set("LastName")
                 .to("Smith")
                 .build());
-    // spotless:on
 
     // act
     testManager.write(testMutations);
@@ -298,7 +290,6 @@ public final class SpannerResourceManagerTest {
   @Test
   public void 
testWriteMultipleRecordsShouldThrowExceptionWhenCalledBeforeExecuteDdlStatement()
 {
     // arrange
-    // spotless:off
     ImmutableList<Mutation> testMutations =
         ImmutableList.of(
             Mutation.newInsertOrUpdateBuilder("SingerId")
@@ -317,7 +308,6 @@ public final class SpannerResourceManagerTest {
                 .set("LastName")
                 .to("Smith")
                 .build());
-    // spotless:on
 
     // act & assert
     assertThrows(IllegalStateException.class, () -> 
testManager.write(testMutations));
@@ -329,7 +319,6 @@ public final class SpannerResourceManagerTest {
     // arrange
     prepareTable();
     
when(spanner.getDatabaseClient(any()).write(any())).thenThrow(SpannerException.class);
-    // spotless:off
     ImmutableList<Mutation> testMutations =
         ImmutableList.of(
             Mutation.newInsertOrUpdateBuilder("SingerId")
@@ -348,7 +337,6 @@ public final class SpannerResourceManagerTest {
                 .set("LastName")
                 .to("Smith")
                 .build());
-    // spotless:on
 
     // act & assert
     assertThrows(SpannerResourceManagerException.class, () -> 
testManager.write(testMutations));
@@ -360,7 +348,6 @@ public final class SpannerResourceManagerTest {
     // arrange
     prepareTable();
     when(resultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false);
-    // spotless:off
     Struct struct1 =
         Struct.newBuilder()
             .set("SingerId")
@@ -379,7 +366,6 @@ public final class SpannerResourceManagerTest {
             .set("LastName")
             .to(string("Smith"))
             .build();
-    // spotless:on
     
when(resultSet.getCurrentRowAsStruct()).thenReturn(struct1).thenReturn(struct2);
     when(spanner.getDatabaseClient(any()).singleUse().read(any(), any(), 
any()))
         .thenReturn(resultSet);
@@ -399,7 +385,6 @@ public final class SpannerResourceManagerTest {
     // arrange
     prepareTable();
     when(resultSet.next()).thenReturn(true).thenReturn(false);
-    // spotless:off
     Struct struct =
         Struct.newBuilder()
             .set("SingerId")
@@ -409,7 +394,6 @@ public final class SpannerResourceManagerTest {
             .set("LastName")
             .to(string("Richards"))
             .build();
-    // spotless:on
     when(resultSet.getCurrentRowAsStruct()).thenReturn(struct);
     when(spanner.getDatabaseClient(any()).singleUse().read(any(), any(), 
any()))
         .thenReturn(resultSet);
@@ -494,7 +478,6 @@ public final class SpannerResourceManagerTest {
             + "  FirstName  STRING(1024),\n"
             + "  LastName   STRING(1024),\n"
             + ") PRIMARY KEY (SingerId)";
-    // spotless:off
     Mutation testMutation =
         Mutation.newInsertOrUpdateBuilder("SingerId")
             .set("SingerId")
@@ -504,7 +487,6 @@ public final class SpannerResourceManagerTest {
             .set("LastName")
             .to("Richards")
             .build();
-    // spotless:on
     ImmutableList<String> columnNames = ImmutableList.of("SingerId");
 
     // act & assert
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
index 8de05ddf18c..fd1bc1772f2 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
@@ -117,7 +117,7 @@ public class FileBasedIOLT extends IOLoadTestBase {
   }
 
   @BeforeClass
-  public static void beforeClass() throws IOException {
+  public static void beforeClass() {
     resourceManager =
         GcsResourceManager.builder(TestProperties.artifactBucket(), 
"textiolt", CREDENTIALS)
             .build();
diff --git 
a/it/jdbc/src/test/java/org/apache/beam/it/jdbc/MSSQLResourceManagerTest.java 
b/it/jdbc/src/test/java/org/apache/beam/it/jdbc/MSSQLResourceManagerTest.java
index 40c8fc32ae0..0ed192bf3c4 100644
--- 
a/it/jdbc/src/test/java/org/apache/beam/it/jdbc/MSSQLResourceManagerTest.java
+++ 
b/it/jdbc/src/test/java/org/apache/beam/it/jdbc/MSSQLResourceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.it.jdbc;
 import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 
 import org.junit.Before;
@@ -53,6 +54,7 @@ public class MSSQLResourceManagerTest<
     when(container.withPassword(any())).thenReturn(container);
     when(container.withDatabaseName(anyString())).thenReturn(container);
     when(container.getDatabaseName()).thenReturn(DATABASE_NAME);
+    doReturn(container).when(container).withLogConsumer(any());
     testManager = new MSSQLResourceManager(container, new 
MSSQLResourceManager.Builder(TEST_ID));
   }
 
diff --git 
a/it/jdbc/src/test/java/org/apache/beam/it/jdbc/MySQLResourceManagerTest.java 
b/it/jdbc/src/test/java/org/apache/beam/it/jdbc/MySQLResourceManagerTest.java
index 0f90dfccb71..402c45ec8ce 100644
--- 
a/it/jdbc/src/test/java/org/apache/beam/it/jdbc/MySQLResourceManagerTest.java
+++ 
b/it/jdbc/src/test/java/org/apache/beam/it/jdbc/MySQLResourceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.it.jdbc;
 import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 
 import org.junit.Before;
@@ -49,6 +50,7 @@ public class MySQLResourceManagerTest<T extends 
MySQLContainer<T>> {
     when(container.withUsername(any())).thenReturn(container);
     when(container.withPassword(any())).thenReturn(container);
     when(container.withDatabaseName(anyString())).thenReturn(container);
+    doReturn(container).when(container).withLogConsumer(any());
     testManager = new MySQLResourceManager(container, new 
MySQLResourceManager.Builder(TEST_ID));
   }
 
diff --git 
a/it/jdbc/src/test/java/org/apache/beam/it/jdbc/OracleResourceManagerTest.java 
b/it/jdbc/src/test/java/org/apache/beam/it/jdbc/OracleResourceManagerTest.java
index 6bf7b71c311..a2da44d7849 100644
--- 
a/it/jdbc/src/test/java/org/apache/beam/it/jdbc/OracleResourceManagerTest.java
+++ 
b/it/jdbc/src/test/java/org/apache/beam/it/jdbc/OracleResourceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.it.jdbc;
 import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 
 import org.junit.Before;
@@ -50,6 +51,7 @@ public class OracleResourceManagerTest<T extends 
OracleContainer> {
     when(container.withUsername(any())).thenReturn(container);
     when(container.withPassword(any())).thenReturn(container);
     when(container.withDatabaseName(anyString())).thenReturn(container);
+    doReturn(container).when(container).withLogConsumer(any());
     testManager = new OracleResourceManager(container, new 
OracleResourceManager.Builder(TEST_ID));
   }
 
diff --git 
a/it/jdbc/src/test/java/org/apache/beam/it/jdbc/PostgresResourceManagerTest.java
 
b/it/jdbc/src/test/java/org/apache/beam/it/jdbc/PostgresResourceManagerTest.java
index 3d939758c2f..a8a29816a6e 100644
--- 
a/it/jdbc/src/test/java/org/apache/beam/it/jdbc/PostgresResourceManagerTest.java
+++ 
b/it/jdbc/src/test/java/org/apache/beam/it/jdbc/PostgresResourceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.it.jdbc;
 import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 
 import org.junit.Before;
@@ -49,6 +50,7 @@ public class PostgresResourceManagerTest<T extends 
PostgreSQLContainer<T>> {
     when(container.withUsername(any())).thenReturn(container);
     when(container.withPassword(any())).thenReturn(container);
     when(container.withDatabaseName(anyString())).thenReturn(container);
+    doReturn(container).when(container).withLogConsumer(any());
     testManager =
         new PostgresResourceManager(container, new 
PostgresResourceManager.Builder(TEST_ID));
   }
diff --git 
a/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java 
b/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
index e32f73b70f6..d9a647dbeeb 100644
--- a/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
+++ b/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
@@ -19,7 +19,6 @@ package org.apache.beam.it.kafka;
 
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -103,7 +102,7 @@ public class KafkaResourceManager extends 
TestContainerResourceManager<GenericCo
             : AdminClient.create(ImmutableMap.of("bootstrap.servers", 
this.connectionString));
   }
 
-  public static Builder builder(String testId) throws IOException {
+  public static Builder builder(String testId) {
     return new Builder(testId);
   }
 
diff --git a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOLT.java 
b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOLT.java
index 0fd50e6989a..a03030664de 100644
--- a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOLT.java
+++ b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOLT.java
@@ -69,7 +69,7 @@ public final class KafkaIOLT extends IOLoadTestBase {
   @Rule public TestPipeline readPipeline = TestPipeline.create();
 
   @BeforeClass
-  public static void beforeClass() throws IOException {
+  public static void beforeClass() {
     resourceManager = KafkaResourceManager.builder("io-kafka-lt").build();
   }
 
diff --git 
a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaResourceManagerTest.java 
b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaResourceManagerTest.java
index d830e3d3e67..8c870815efc 100644
--- 
a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaResourceManagerTest.java
+++ 
b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaResourceManagerTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyCollection;
 import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -65,6 +66,7 @@ public final class KafkaResourceManagerTest {
 
   @Before
   public void setUp() throws IOException {
+    doReturn(container).when(container).withLogConsumer(any());
     testManager =
         new KafkaResourceManager(kafkaClient, container, 
KafkaResourceManager.builder(TEST_ID));
   }
diff --git 
a/it/mongodb/src/test/java/org/apache/beam/it/mongodb/MongoDBResourceManagerTest.java
 
b/it/mongodb/src/test/java/org/apache/beam/it/mongodb/MongoDBResourceManagerTest.java
index 6979c168de8..b3ad34b70ff 100644
--- 
a/it/mongodb/src/test/java/org/apache/beam/it/mongodb/MongoDBResourceManagerTest.java
+++ 
b/it/mongodb/src/test/java/org/apache/beam/it/mongodb/MongoDBResourceManagerTest.java
@@ -21,6 +21,7 @@ import static com.google.common.truth.Truth.assertThat;
 import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -65,6 +66,7 @@ public class MongoDBResourceManagerTest {
 
   @Before
   public void setUp() {
+    doReturn(container).when(container).withLogConsumer(any());
     testManager =
         new MongoDBResourceManager(mongoClient, container, 
MongoDBResourceManager.builder(TEST_ID));
   }
diff --git 
a/it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerTest.java 
b/it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerTest.java
index 26042eca023..5014505ece9 100644
--- 
a/it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerTest.java
+++ 
b/it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerTest.java
@@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.startsWith;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -68,6 +69,7 @@ public class Neo4jResourceManagerTest {
     when(container.getMappedPort(NEO4J_BOLT_PORT)).thenReturn(MAPPED_PORT);
     when(session.run(anyString(), anyMap())).thenReturn(result);
     when(neo4jDriver.session(any())).thenReturn(session);
+    doReturn(container).when(container).withLogConsumer(any());
 
     testManager =
         new Neo4jResourceManager(neo4jDriver, container, 
Neo4jResourceManager.builder(TEST_ID));
diff --git 
a/it/splunk/src/main/java/org/apache/beam/it/splunk/conditions/package-info.java
 
b/it/splunk/src/main/java/org/apache/beam/it/splunk/conditions/package-info.java
index a291d65b7a6..d4ae6db311a 100644
--- 
a/it/splunk/src/main/java/org/apache/beam/it/splunk/conditions/package-info.java
+++ 
b/it/splunk/src/main/java/org/apache/beam/it/splunk/conditions/package-info.java
@@ -1,17 +1,19 @@
 /*
- * Copyright (C) 2023 Google LLC
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 /** Package that contains reusable Splunk conditions. */
diff --git 
a/it/splunk/src/main/java/org/apache/beam/it/splunk/matchers/package-info.java 
b/it/splunk/src/main/java/org/apache/beam/it/splunk/matchers/package-info.java
index 92777c6ab7c..f56eff15599 100644
--- 
a/it/splunk/src/main/java/org/apache/beam/it/splunk/matchers/package-info.java
+++ 
b/it/splunk/src/main/java/org/apache/beam/it/splunk/matchers/package-info.java
@@ -1,17 +1,19 @@
 /*
- * Copyright (C) 2023 Google LLC
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 /** Package for Splunk Truth matchers / subjects to have reusable assertions. 
*/
diff --git 
a/it/splunk/src/main/java/org/apache/beam/it/splunk/package-info.java 
b/it/splunk/src/main/java/org/apache/beam/it/splunk/package-info.java
index 858500edc82..6097113cd70 100644
--- a/it/splunk/src/main/java/org/apache/beam/it/splunk/package-info.java
+++ b/it/splunk/src/main/java/org/apache/beam/it/splunk/package-info.java
@@ -1,17 +1,19 @@
 /*
- * Copyright (C) 2023 Google LLC
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 /** Package for managing Splunk resources within integration tests. */
diff --git 
a/it/splunk/src/test/java/org/apache/beam/it/splunk/SplunkResourceManagerTest.java
 
b/it/splunk/src/test/java/org/apache/beam/it/splunk/SplunkResourceManagerTest.java
index eeada3d0838..ed97bd9f367 100644
--- 
a/it/splunk/src/test/java/org/apache/beam/it/splunk/SplunkResourceManagerTest.java
+++ 
b/it/splunk/src/test/java/org/apache/beam/it/splunk/SplunkResourceManagerTest.java
@@ -21,6 +21,7 @@ import static com.google.common.truth.Truth.assertThat;
 import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -79,6 +80,7 @@ public class SplunkResourceManagerTest {
     when(container.withPassword(anyString())).thenReturn(container);
     when(container.getMappedPort(DEFAULT_SPLUNKD_INTERNAL_PORT))
         .thenReturn(MAPPED_SPLUNKD_INTERNAL_PORT);
+    doReturn(container).when(container).withLogConsumer(any());
 
     testManager =
         new SplunkResourceManager(clientFactory, container, 
SplunkResourceManager.builder(TEST_ID));
diff --git 
a/it/splunk/src/test/java/org/apache/beam/it/splunk/SplunkResourceManagerUtilsTest.java
 
b/it/splunk/src/test/java/org/apache/beam/it/splunk/SplunkResourceManagerUtilsTest.java
index 2f27340b6ae..b4aaec3b45c 100644
--- 
a/it/splunk/src/test/java/org/apache/beam/it/splunk/SplunkResourceManagerUtilsTest.java
+++ 
b/it/splunk/src/test/java/org/apache/beam/it/splunk/SplunkResourceManagerUtilsTest.java
@@ -30,7 +30,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/** Unit tests for {@link SplunkResourceManager}. */
+/** Unit tests for {@link SplunkResourceManagerUtils}. */
 @RunWith(JUnit4.class)
 public class SplunkResourceManagerUtilsTest {
 
diff --git 
a/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
 
b/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
index b0279303866..098938a291d 100644
--- 
a/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
+++ 
b/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
@@ -68,7 +68,12 @@ public abstract class TestContainerResourceManager<T extends 
GenericContainer<?>
     }
 
     if (!usingStaticContainer) {
-      container.start();
+      // TODO(pranavbhandari): Change this to use 
log.getUtf8StringWithoutLineEnding() when
+      // testcontainers dependency is updated.
+      container
+          .withLogConsumer(
+              log -> LOG.info("{}: {}", container.getDockerImageName(), 
log.getUtf8String()))
+          .start();
     } else if (builder.host == null || builder.port < 0) {
       throw new TestContainerResourceManagerException(
           "This manager was configured to use a static resource, but the host 
and port were not properly set.");
diff --git 
a/it/testcontainers/src/test/java/org/apache/beam/it/testcontainers/TestContainerResourceManagerTest.java
 
b/it/testcontainers/src/test/java/org/apache/beam/it/testcontainers/TestContainerResourceManagerTest.java
index ad79d6ea848..7e2d686d28a 100644
--- 
a/it/testcontainers/src/test/java/org/apache/beam/it/testcontainers/TestContainerResourceManagerTest.java
+++ 
b/it/testcontainers/src/test/java/org/apache/beam/it/testcontainers/TestContainerResourceManagerTest.java
@@ -19,7 +19,9 @@ package org.apache.beam.it.testcontainers;
 
 import static com.google.common.truth.Truth.assertThat;
 import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -54,6 +56,7 @@ public class TestContainerResourceManagerTest {
     testManagerBuilder =
         new 
TestContainerResourceManager.Builder<TestContainerResourceManagerImpl>(
             TEST_ID, null, null) {
+
           @Override
           public TestContainerResourceManagerImpl build() {
             return new TestContainerResourceManagerImpl(container, this);
@@ -64,6 +67,7 @@ public class TestContainerResourceManagerTest {
   @Test
   public void testCreateResourceManagerSetsCorrectDockerImageName() {
     when(container.getDockerImageName()).thenReturn("container-test:test");
+    doReturn(container).when(container).withLogConsumer(any());
 
     
testManagerBuilder.setContainerImageName("container-test").setContainerImageTag("test").build();
 
@@ -74,6 +78,7 @@ public class TestContainerResourceManagerTest {
 
   @Test
   public void 
testCreateResourceManagerShouldStartContainerWhenNotUsingStaticResource() {
+    doReturn(container).when(container).withLogConsumer(any());
     testManagerBuilder.build();
 
     verify(container).start();
@@ -110,6 +115,7 @@ public class TestContainerResourceManagerTest {
 
   @Test
   public void testGetHostShouldReturnCorrectHostWhenManuallySet() {
+    doReturn(container).when(container).withLogConsumer(any());
     TestContainerResourceManager<?> testManager = 
testManagerBuilder.setHost(HOST).build();
 
     assertThat(testManager.getHost()).matches(HOST);
@@ -117,6 +123,7 @@ public class TestContainerResourceManagerTest {
 
   @Test
   public void testGetHostShouldReturnCorrectHostWhenHostNotSet() {
+    doReturn(container).when(container).withLogConsumer(any());
     String host = TestProperties.hostIp();
     TestContainerResourceManager<?> testManager = testManagerBuilder.build();
 
@@ -125,6 +132,7 @@ public class TestContainerResourceManagerTest {
 
   @Test
   public void testGetPortShouldReturnCorrectPortWhenManuallySet() {
+    doReturn(container).when(container).withLogConsumer(any());
     TestContainerResourceManager<?> testManager =
         testManagerBuilder.setHost(HOST).setPort(PORT).build();
 
@@ -135,6 +143,7 @@ public class TestContainerResourceManagerTest {
   public void testGetPortShouldReturnContainerHostWhenPortNotSet() {
     int mappedPort = 5000;
     when(container.getMappedPort(anyInt())).thenReturn(mappedPort);
+    doReturn(container).when(container).withLogConsumer(any());
 
     TestContainerResourceManager<?> testManager = testManagerBuilder.build();
 
@@ -143,6 +152,7 @@ public class TestContainerResourceManagerTest {
 
   @Test
   public void testCleanupAllShouldCloseContainerWhenNotUsingStaticResource() {
+    doReturn(container).when(container).withLogConsumer(any());
     TestContainerResourceManager<?> testManager = testManagerBuilder.build();
 
     testManager.cleanupAll();
@@ -152,6 +162,7 @@ public class TestContainerResourceManagerTest {
   @Test
   public void testCleanupAllShouldReturnFalseWhenContainerFailsToClose() {
     doThrow(RuntimeException.class).when(container).close();
+    doReturn(container).when(container).withLogConsumer(any());
 
     TestContainerResourceManager<?> testManager = testManagerBuilder.build();
 

Reply via email to