This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b9a7585d7f4 [ErrorProne] Enable StaticAssignmentInConstructor check 
(#37786)
b9a7585d7f4 is described below

commit b9a7585d7f408cb58aee18de91be39723d2526c6
Author: RadosÅ‚aw Stankiewicz <[email protected]>
AuthorDate: Sun Mar 15 02:01:47 2026 +0100

    [ErrorProne] Enable StaticAssignmentInConstructor check (#37786)
    
    * Update static test state to use atomic variables for thread safety and 
re-enable StaticAssignmentInConstructor checkstyle rule.
    
    * checkstyle
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |  1 -
 .../direct/BoundedReadEvaluatorFactoryTest.java    | 11 +++---
 .../direct/UnboundedReadEvaluatorFactoryTest.java  | 44 ++++++++++++----------
 .../worker/StreamingDataflowWorkerTest.java        |  7 ++--
 .../aggregators/metrics/sink/InMemoryMetrics.java  | 26 +++++++------
 .../aggregators/metrics/sink/InMemoryMetrics.java  | 26 +++++++------
 .../coders/SparkRunnerKryoRegistratorTest.java     |  3 +-
 .../sdk/io/hcatalog/HiveDatabaseTestHelper.java    | 17 +++++----
 .../java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 10 ++---
 .../io/snowflake/test/FakeSnowflakeDatabase.java   | 10 ++---
 10 files changed, 83 insertions(+), 72 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 27bc588efaa..a6627cda2db 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -1554,7 +1554,6 @@ class BeamModulePlugin implements Plugin<Project> {
             "NonCanonicalType",
             "Slf4jFormatShouldBeConst",
             "Slf4jSignOnlyFormat",
-            "StaticAssignmentInConstructor",
             "ThreadPriorityCheck",
             "TimeUnitConversionChecker",
             "UndefinedEquals",
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 12f8eae152f..0a8ec4f21b5 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -36,6 +36,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import 
org.apache.beam.runners.direct.BoundedReadEvaluatorFactory.BoundedSourceShard;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -306,7 +307,7 @@ public class BoundedReadEvaluatorFactoryTest {
     evaluator.finishBundle();
     CommittedBundle<Long> committed = output.commit(Instant.now());
     assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), 
gw(1L)));
-    assertThat(TestSource.readerClosed, is(true));
+    assertThat(TestSource.readerClosed.get(), is(true));
   }
 
   @Test
@@ -326,7 +327,7 @@ public class BoundedReadEvaluatorFactoryTest {
     evaluator.finishBundle();
     CommittedBundle<Long> committed = output.commit(Instant.now());
     assertThat(committed.getElements(), emptyIterable());
-    assertThat(TestSource.readerClosed, is(true));
+    assertThat(TestSource.readerClosed.get(), is(true));
   }
 
   @Test
@@ -336,7 +337,7 @@ public class BoundedReadEvaluatorFactoryTest {
   }
 
   private static class TestSource<T> extends OffsetBasedSource<T> {
-    private static boolean readerClosed;
+    private static final AtomicBoolean readerClosed = new AtomicBoolean(false);
     private final Coder<T> coder;
     private final T[] elems;
     private final int firstSplitIndex;
@@ -352,7 +353,7 @@ public class BoundedReadEvaluatorFactoryTest {
       this.elems = elems;
       this.coder = coder;
       this.firstSplitIndex = firstSplitIndex;
-      readerClosed = false;
+      readerClosed.set(false);
 
       subrangesCompleted = new CountDownLatch(2);
     }
@@ -449,7 +450,7 @@ public class BoundedReadEvaluatorFactoryTest {
 
     @Override
     public void close() throws IOException {
-      TestSource.readerClosed = true;
+      TestSource.readerClosed.set(true);
     }
   }
 
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 5413a694e92..ca577aeb034 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -339,8 +339,8 @@ public class UnboundedReadEvaluatorFactoryTest {
     } while (!Iterables.isEmpty(residual.getElements()));
 
     verify(output, times(numElements)).add(any());
-    assertThat(TestUnboundedSource.readerCreatedCount, equalTo(1));
-    assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
+    assertThat(TestUnboundedSource.READER_CREATED_COUNT.get(), equalTo(1));
+    assertThat(TestUnboundedSource.READER_CLOSED_COUNT.get(), equalTo(1));
   }
 
   @Test
@@ -382,7 +382,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     
secondEvaluator.processElement(Iterables.getOnlyElement(residual.getElements()));
     secondEvaluator.finishBundle();
 
-    assertThat(TestUnboundedSource.readerClosedCount, equalTo(2));
+    assertThat(TestUnboundedSource.READER_CLOSED_COUNT.get(), equalTo(2));
     assertThat(
         
Iterables.getOnlyElement(residual.getElements()).getValue().getCheckpoint().isFinalized(),
         is(true));
@@ -421,12 +421,12 @@ public class UnboundedReadEvaluatorFactoryTest {
 
   @Test // before this was throwing a NPE
   public void emptySource() throws Exception {
-    TestUnboundedSource.readerClosedCount = 0;
+    TestUnboundedSource.READER_CLOSED_COUNT.set(0);
     final TestUnboundedSource<String> source = new 
TestUnboundedSource<>(StringUtf8Coder.of());
     source.advanceWatermarkToInfinity = true;
     processElement(source);
-    assertEquals(1, TestUnboundedSource.readerClosedCount);
-    TestUnboundedSource.readerClosedCount = 0; // reset
+    assertEquals(1, TestUnboundedSource.READER_CLOSED_COUNT.get());
+    TestUnboundedSource.READER_CLOSED_COUNT.set(0); // reset
   }
 
   @Test(expected = IOException.class)
@@ -472,7 +472,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     final WindowedValue<UnboundedSourceShard<String, TestCheckpointMark>> 
value =
         WindowedValues.of(
             shard, BoundedWindow.TIMESTAMP_MAX_VALUE, GlobalWindow.INSTANCE, 
PaneInfo.NO_FIRING);
-    TestUnboundedSource.readerClosedCount = 0;
+    TestUnboundedSource.READER_CLOSED_COUNT.set(0);
     evaluator.processElement(value);
   }
 
@@ -492,11 +492,15 @@ public class UnboundedReadEvaluatorFactoryTest {
   }
 
   private static class TestUnboundedSource<T> extends UnboundedSource<T, 
TestCheckpointMark> {
-    private static int getWatermarkCalls = 0;
-
-    static int readerCreatedCount;
-    static int readerClosedCount;
-    static int readerAdvancedCount;
+    private static final java.util.concurrent.atomic.AtomicInteger 
getWatermarkCalls =
+        new java.util.concurrent.atomic.AtomicInteger(0);
+
+    static final java.util.concurrent.atomic.AtomicInteger 
READER_CREATED_COUNT =
+        new java.util.concurrent.atomic.AtomicInteger(0);
+    static final java.util.concurrent.atomic.AtomicInteger READER_CLOSED_COUNT 
=
+        new java.util.concurrent.atomic.AtomicInteger(0);
+    static final java.util.concurrent.atomic.AtomicInteger 
READER_ADVANCED_COUNT =
+        new java.util.concurrent.atomic.AtomicInteger(0);
     private final Coder<T> coder;
     private final List<T> elems;
     private boolean dedupes = false;
@@ -508,9 +512,9 @@ public class UnboundedReadEvaluatorFactoryTest {
     }
 
     private TestUnboundedSource(Coder<T> coder, boolean throwOnClose, List<T> 
elems) {
-      readerCreatedCount = 0;
-      readerClosedCount = 0;
-      readerAdvancedCount = 0;
+      READER_CREATED_COUNT.set(0);
+      READER_CLOSED_COUNT.set(0);
+      READER_ADVANCED_COUNT.set(0);
       this.coder = coder;
       this.elems = elems;
       this.throwOnClose = throwOnClose;
@@ -528,7 +532,7 @@ public class UnboundedReadEvaluatorFactoryTest {
       checkState(
           checkpointMark == null || checkpointMark.decoded,
           "Cannot resume from a checkpoint that has not been decoded");
-      readerCreatedCount++;
+      READER_CREATED_COUNT.incrementAndGet();
       return new TestUnboundedReader(elems, checkpointMark == null ? -1 : 
checkpointMark.index);
     }
 
@@ -568,7 +572,7 @@ public class UnboundedReadEvaluatorFactoryTest {
 
       @Override
       public boolean advance() throws IOException {
-        readerAdvancedCount++;
+        READER_ADVANCED_COUNT.incrementAndGet();
         if (index + 1 < elems.size()) {
           index++;
           return true;
@@ -578,11 +582,11 @@ public class UnboundedReadEvaluatorFactoryTest {
 
       @Override
       public Instant getWatermark() {
-        getWatermarkCalls++;
+        getWatermarkCalls.incrementAndGet();
         if (index + 1 == elems.size() && 
TestUnboundedSource.this.advanceWatermarkToInfinity) {
           return BoundedWindow.TIMESTAMP_MAX_VALUE;
         } else {
-          return new Instant(index + getWatermarkCalls);
+          return new Instant(index + getWatermarkCalls.get());
         }
       }
 
@@ -618,7 +622,7 @@ public class UnboundedReadEvaluatorFactoryTest {
       @Override
       public void close() throws IOException {
         try {
-          readerClosedCount++;
+          READER_CLOSED_COUNT.incrementAndGet();
           // Enforce the AutoCloseable contract. Close is not idempotent.
           assertThat(closed, is(false));
           if (throwOnClose) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index fafa27f98fc..ad958dcdf76 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -4796,17 +4796,18 @@ public class StreamingDataflowWorkerTest {
 
   private static class FakeSlowDoFn extends DoFn<String, String> {
 
-    private static FakeClock clock; // A static variable keeps this DoFn 
serializable.
+    private static final AtomicReference<FakeClock> clock =
+        new AtomicReference<>(); // A static variable keeps this DoFn 
serializable.
     private final Duration sleep;
 
     FakeSlowDoFn(FakeClock clock, Duration sleep) {
-      FakeSlowDoFn.clock = clock;
+      FakeSlowDoFn.clock.set(clock);
       this.sleep = sleep;
     }
 
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
-      clock.sleep(sleep);
+      clock.get().sleep(sleep);
       c.output(c.element());
     }
   }
diff --git 
a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/InMemoryMetrics.java
 
b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/InMemoryMetrics.java
index 69f6abee1f6..69df5768e5d 100644
--- 
a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/InMemoryMetrics.java
+++ 
b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/InMemoryMetrics.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.MetricFilter;
 import com.codahale.metrics.MetricRegistry;
 import java.util.Collection;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
 import 
org.apache.beam.runners.spark.structuredstreaming.metrics.WithMetricsSupport;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.apache.spark.metrics.sink.Sink;
@@ -29,8 +30,10 @@ import org.apache.spark.metrics.sink.Sink;
 /** An in-memory {@link Sink} implementation for tests. */
 public class InMemoryMetrics implements Sink {
 
-  private static WithMetricsSupport extendedMetricsRegistry;
-  private static MetricRegistry internalMetricRegistry;
+  private static final AtomicReference<WithMetricsSupport> 
extendedMetricsRegistry =
+      new AtomicReference<>();
+  private static final AtomicReference<MetricRegistry> internalMetricRegistry =
+      new AtomicReference<>();
 
   // Constructor for Spark 3.1
   @SuppressWarnings("UnusedParameters")
@@ -38,24 +41,24 @@ public class InMemoryMetrics implements Sink {
       final Properties properties,
       final MetricRegistry metricRegistry,
       final org.apache.spark.SecurityManager securityMgr) {
-    extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
-    internalMetricRegistry = metricRegistry;
+    
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
+    internalMetricRegistry.set(metricRegistry);
   }
 
   // Constructor for Spark >= 3.2
   @SuppressWarnings("UnusedParameters")
   public InMemoryMetrics(final Properties properties, final MetricRegistry 
metricRegistry) {
-    extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
-    internalMetricRegistry = metricRegistry;
+    
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
+    internalMetricRegistry.set(metricRegistry);
   }
 
   @SuppressWarnings({"TypeParameterUnusedInFormals", "rawtypes"}) // because 
of getGauges
   public static <T> T valueOf(final String name) {
     // this might fail in case we have multiple aggregators with the same 
suffix after
     // the last dot, but it should be good enough for tests.
-    if (extendedMetricsRegistry != null) {
-      Collection<Gauge> matches =
-          extendedMetricsRegistry.getGauges((n, m) -> 
n.endsWith(name)).values();
+    WithMetricsSupport extended = extendedMetricsRegistry.get();
+    if (extended != null) {
+      Collection<Gauge> matches = extended.getGauges((n, m) -> 
n.endsWith(name)).values();
       return matches.isEmpty() ? null : (T) 
Iterables.getOnlyElement(matches).getValue();
     } else {
       return null;
@@ -64,8 +67,9 @@ public class InMemoryMetrics implements Sink {
 
   @SuppressWarnings("WeakerAccess")
   public static void clearAll() {
-    if (internalMetricRegistry != null) {
-      internalMetricRegistry.removeMatching(MetricFilter.ALL);
+    MetricRegistry internal = internalMetricRegistry.get();
+    if (internal != null) {
+      internal.removeMatching(MetricFilter.ALL);
     }
   }
 
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
index db040bbfcc4..2c12c42bbf7 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.MetricFilter;
 import com.codahale.metrics.MetricRegistry;
 import java.util.Collection;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.apache.spark.metrics.sink.Sink;
@@ -29,8 +30,10 @@ import org.apache.spark.metrics.sink.Sink;
 /** An in-memory {@link Sink} implementation for tests. */
 public class InMemoryMetrics implements Sink {
 
-  private static WithMetricsSupport extendedMetricsRegistry;
-  private static MetricRegistry internalMetricRegistry;
+  private static final AtomicReference<WithMetricsSupport> 
extendedMetricsRegistry =
+      new AtomicReference<>();
+  private static final AtomicReference<MetricRegistry> internalMetricRegistry =
+      new AtomicReference<>();
 
   // Constructor for Spark 3.1
   @SuppressWarnings("UnusedParameters")
@@ -38,24 +41,24 @@ public class InMemoryMetrics implements Sink {
       final Properties properties,
       final MetricRegistry metricRegistry,
       final org.apache.spark.SecurityManager securityMgr) {
-    extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
-    internalMetricRegistry = metricRegistry;
+    
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
+    internalMetricRegistry.set(metricRegistry);
   }
 
   // Constructor for Spark >= 3.2
   @SuppressWarnings("UnusedParameters")
   public InMemoryMetrics(final Properties properties, final MetricRegistry 
metricRegistry) {
-    extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
-    internalMetricRegistry = metricRegistry;
+    
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
+    internalMetricRegistry.set(metricRegistry);
   }
 
   @SuppressWarnings({"TypeParameterUnusedInFormals", "rawtypes"})
   public static <T> T valueOf(final String name) {
     // this might fail in case we have multiple aggregators with the same 
suffix after
     // the last dot, but it should be good enough for tests.
-    if (extendedMetricsRegistry != null) {
-      Collection<Gauge> matches =
-          extendedMetricsRegistry.getGauges((n, m) -> 
n.endsWith(name)).values();
+    WithMetricsSupport extended = extendedMetricsRegistry.get();
+    if (extended != null) {
+      Collection<Gauge> matches = extended.getGauges((n, m) -> 
n.endsWith(name)).values();
       return matches.isEmpty() ? null : (T) 
Iterables.getOnlyElement(matches).getValue();
     } else {
       return null;
@@ -64,8 +67,9 @@ public class InMemoryMetrics implements Sink {
 
   @SuppressWarnings("WeakerAccess")
   public static void clearAll() {
-    if (internalMetricRegistry != null) {
-      internalMetricRegistry.removeMatching(MetricFilter.ALL);
+    MetricRegistry internal = internalMetricRegistry.get();
+    if (internal != null) {
+      internal.removeMatching(MetricFilter.ALL);
     }
   }
 
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java
index 9cb82f27722..ddd0e74d1c9 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java
@@ -38,7 +38,8 @@ import org.junit.runner.RunWith;
  * tests requiring a different context have to be forked using separate test 
classes.
  */
 @SuppressWarnings({
-  "rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "StaticAssignmentInConstructor" // used for testing purposes
 })
 @RunWith(Enclosed.class)
 public class SparkRunnerKryoRegistratorTest {
diff --git 
a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java
 
b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java
index fb83c0060f4..616ad10edb4 100644
--- 
a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java
+++ 
b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java
@@ -21,12 +21,13 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.io.common.DatabaseTestHelper;
 
 /** Helper for creating connection and test tables on hive database via JDBC 
driver. */
 class HiveDatabaseTestHelper {
-  private static Connection con;
-  private static Statement stmt;
+  private static final AtomicReference<Connection> con = new 
AtomicReference<>();
+  private static final AtomicReference<Statement> stmt = new 
AtomicReference<>();
 
   HiveDatabaseTestHelper(
       String hiveHost,
@@ -36,24 +37,24 @@ class HiveDatabaseTestHelper {
       String hivePassword)
       throws Exception {
     String hiveUrl = String.format("jdbc:hive2://%s:%s/%s", hiveHost, 
hivePort, hiveDatabase);
-    con = DriverManager.getConnection(hiveUrl, hiveUsername, hivePassword);
-    stmt = con.createStatement();
+    con.set(DriverManager.getConnection(hiveUrl, hiveUsername, hivePassword));
+    stmt.set(con.get().createStatement());
   }
 
   /** Create hive table. */
   String createHiveTable(String testIdentifier) throws Exception {
     String tableName = DatabaseTestHelper.getTestTableName(testIdentifier);
-    stmt.execute(" CREATE TABLE IF NOT EXISTS " + tableName + " (id STRING)");
+    stmt.get().execute(" CREATE TABLE IF NOT EXISTS " + tableName + " (id 
STRING)");
     return tableName;
   }
 
   /** Delete hive table. */
   void dropHiveTable(String tableName) throws SQLException {
-    stmt.execute(" DROP TABLE " + tableName);
+    stmt.get().execute(" DROP TABLE " + tableName);
   }
 
   void closeConnection() throws Exception {
-    stmt.close();
-    con.close();
+    stmt.get().close();
+    con.get().close();
   }
 }
diff --git 
a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java 
b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index 7f3b394d7f6..b3233f86617 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -69,6 +69,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -1197,17 +1198,14 @@ public class JmsIOTest {
   private static class TextMessageMapperWithErrorCounter
       implements SerializableBiFunction<String, Session, Message> {
 
-    private static int errorCounter;
+    private static final AtomicInteger errorCounter = new AtomicInteger(0);
 
-    TextMessageMapperWithErrorCounter() {
-      errorCounter = 0;
-    }
+    TextMessageMapperWithErrorCounter() {}
 
     @Override
     public Message apply(String value, Session session) {
       try {
-        if (errorCounter == 0) {
-          errorCounter++;
+        if (errorCounter.getAndIncrement() == 0) {
           throw new JMSException("Error!!");
         }
         TextMessage msg = session.createTextMessage();
diff --git 
a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java
 
b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java
index 32e494496be..338a316b1b9 100644
--- 
a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java
+++ 
b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java
@@ -19,19 +19,17 @@ package org.apache.beam.sdk.io.snowflake.test;
 
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import net.snowflake.client.jdbc.SnowflakeSQLException;
 
 /** Fake implementation of Snowflake warehouse used in test code. */
 public class FakeSnowflakeDatabase implements Serializable {
-  private static Map<String, List<String>> tables = new HashMap<>();
+  private static final Map<String, List<String>> tables = new 
ConcurrentHashMap<>();
 
-  private FakeSnowflakeDatabase() {
-    tables = new HashMap<>();
-  }
+  private FakeSnowflakeDatabase() {}
 
   public static void createTable(String table) {
     FakeSnowflakeDatabase.tables.put(table, Collections.emptyList());
@@ -72,7 +70,7 @@ public class FakeSnowflakeDatabase implements Serializable {
   }
 
   public static void clean() {
-    FakeSnowflakeDatabase.tables = new HashMap<>();
+    tables.clear();
   }
 
   public static void truncateTable(String table) {

Reply via email to