This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b9a7585d7f4 [ErrorProne] Enable StaticAssignmentInConstructor check
(#37786)
b9a7585d7f4 is described below
commit b9a7585d7f408cb58aee18de91be39723d2526c6
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Sun Mar 15 02:01:47 2026 +0100
[ErrorProne] Enable StaticAssignmentInConstructor check (#37786)
* Update static test state to use atomic variables for thread safety and
re-enable StaticAssignmentInConstructor checkstyle rule.
* checkstyle
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 1 -
.../direct/BoundedReadEvaluatorFactoryTest.java | 11 +++---
.../direct/UnboundedReadEvaluatorFactoryTest.java | 44 ++++++++++++----------
.../worker/StreamingDataflowWorkerTest.java | 7 ++--
.../aggregators/metrics/sink/InMemoryMetrics.java | 26 +++++++------
.../aggregators/metrics/sink/InMemoryMetrics.java | 26 +++++++------
.../coders/SparkRunnerKryoRegistratorTest.java | 3 +-
.../sdk/io/hcatalog/HiveDatabaseTestHelper.java | 17 +++++----
.../java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 10 ++---
.../io/snowflake/test/FakeSnowflakeDatabase.java | 10 ++---
10 files changed, 83 insertions(+), 72 deletions(-)
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 27bc588efaa..a6627cda2db 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -1554,7 +1554,6 @@ class BeamModulePlugin implements Plugin<Project> {
"NonCanonicalType",
"Slf4jFormatShouldBeConst",
"Slf4jSignOnlyFormat",
- "StaticAssignmentInConstructor",
"ThreadPriorityCheck",
"TimeUnitConversionChecker",
"UndefinedEquals",
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 12f8eae152f..0a8ec4f21b5 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import
org.apache.beam.runners.direct.BoundedReadEvaluatorFactory.BoundedSourceShard;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
@@ -306,7 +307,7 @@ public class BoundedReadEvaluatorFactoryTest {
evaluator.finishBundle();
CommittedBundle<Long> committed = output.commit(Instant.now());
assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L),
gw(1L)));
- assertThat(TestSource.readerClosed, is(true));
+ assertThat(TestSource.readerClosed.get(), is(true));
}
@Test
@@ -326,7 +327,7 @@ public class BoundedReadEvaluatorFactoryTest {
evaluator.finishBundle();
CommittedBundle<Long> committed = output.commit(Instant.now());
assertThat(committed.getElements(), emptyIterable());
- assertThat(TestSource.readerClosed, is(true));
+ assertThat(TestSource.readerClosed.get(), is(true));
}
@Test
@@ -336,7 +337,7 @@ public class BoundedReadEvaluatorFactoryTest {
}
private static class TestSource<T> extends OffsetBasedSource<T> {
- private static boolean readerClosed;
+ private static final AtomicBoolean readerClosed = new AtomicBoolean(false);
private final Coder<T> coder;
private final T[] elems;
private final int firstSplitIndex;
@@ -352,7 +353,7 @@ public class BoundedReadEvaluatorFactoryTest {
this.elems = elems;
this.coder = coder;
this.firstSplitIndex = firstSplitIndex;
- readerClosed = false;
+ readerClosed.set(false);
subrangesCompleted = new CountDownLatch(2);
}
@@ -449,7 +450,7 @@ public class BoundedReadEvaluatorFactoryTest {
@Override
public void close() throws IOException {
- TestSource.readerClosed = true;
+ TestSource.readerClosed.set(true);
}
}
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 5413a694e92..ca577aeb034 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -339,8 +339,8 @@ public class UnboundedReadEvaluatorFactoryTest {
} while (!Iterables.isEmpty(residual.getElements()));
verify(output, times(numElements)).add(any());
- assertThat(TestUnboundedSource.readerCreatedCount, equalTo(1));
- assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
+ assertThat(TestUnboundedSource.READER_CREATED_COUNT.get(), equalTo(1));
+ assertThat(TestUnboundedSource.READER_CLOSED_COUNT.get(), equalTo(1));
}
@Test
@@ -382,7 +382,7 @@ public class UnboundedReadEvaluatorFactoryTest {
secondEvaluator.processElement(Iterables.getOnlyElement(residual.getElements()));
secondEvaluator.finishBundle();
- assertThat(TestUnboundedSource.readerClosedCount, equalTo(2));
+ assertThat(TestUnboundedSource.READER_CLOSED_COUNT.get(), equalTo(2));
assertThat(
Iterables.getOnlyElement(residual.getElements()).getValue().getCheckpoint().isFinalized(),
is(true));
@@ -421,12 +421,12 @@ public class UnboundedReadEvaluatorFactoryTest {
@Test // before this was throwing a NPE
public void emptySource() throws Exception {
- TestUnboundedSource.readerClosedCount = 0;
+ TestUnboundedSource.READER_CLOSED_COUNT.set(0);
final TestUnboundedSource<String> source = new
TestUnboundedSource<>(StringUtf8Coder.of());
source.advanceWatermarkToInfinity = true;
processElement(source);
- assertEquals(1, TestUnboundedSource.readerClosedCount);
- TestUnboundedSource.readerClosedCount = 0; // reset
+ assertEquals(1, TestUnboundedSource.READER_CLOSED_COUNT.get());
+ TestUnboundedSource.READER_CLOSED_COUNT.set(0); // reset
}
@Test(expected = IOException.class)
@@ -472,7 +472,7 @@ public class UnboundedReadEvaluatorFactoryTest {
final WindowedValue<UnboundedSourceShard<String, TestCheckpointMark>>
value =
WindowedValues.of(
shard, BoundedWindow.TIMESTAMP_MAX_VALUE, GlobalWindow.INSTANCE,
PaneInfo.NO_FIRING);
- TestUnboundedSource.readerClosedCount = 0;
+ TestUnboundedSource.READER_CLOSED_COUNT.set(0);
evaluator.processElement(value);
}
@@ -492,11 +492,15 @@ public class UnboundedReadEvaluatorFactoryTest {
}
private static class TestUnboundedSource<T> extends UnboundedSource<T,
TestCheckpointMark> {
- private static int getWatermarkCalls = 0;
-
- static int readerCreatedCount;
- static int readerClosedCount;
- static int readerAdvancedCount;
+ private static final java.util.concurrent.atomic.AtomicInteger
getWatermarkCalls =
+ new java.util.concurrent.atomic.AtomicInteger(0);
+
+ static final java.util.concurrent.atomic.AtomicInteger
READER_CREATED_COUNT =
+ new java.util.concurrent.atomic.AtomicInteger(0);
+ static final java.util.concurrent.atomic.AtomicInteger READER_CLOSED_COUNT
=
+ new java.util.concurrent.atomic.AtomicInteger(0);
+ static final java.util.concurrent.atomic.AtomicInteger
READER_ADVANCED_COUNT =
+ new java.util.concurrent.atomic.AtomicInteger(0);
private final Coder<T> coder;
private final List<T> elems;
private boolean dedupes = false;
@@ -508,9 +512,9 @@ public class UnboundedReadEvaluatorFactoryTest {
}
private TestUnboundedSource(Coder<T> coder, boolean throwOnClose, List<T>
elems) {
- readerCreatedCount = 0;
- readerClosedCount = 0;
- readerAdvancedCount = 0;
+ READER_CREATED_COUNT.set(0);
+ READER_CLOSED_COUNT.set(0);
+ READER_ADVANCED_COUNT.set(0);
this.coder = coder;
this.elems = elems;
this.throwOnClose = throwOnClose;
@@ -528,7 +532,7 @@ public class UnboundedReadEvaluatorFactoryTest {
checkState(
checkpointMark == null || checkpointMark.decoded,
"Cannot resume from a checkpoint that has not been decoded");
- readerCreatedCount++;
+ READER_CREATED_COUNT.incrementAndGet();
return new TestUnboundedReader(elems, checkpointMark == null ? -1 :
checkpointMark.index);
}
@@ -568,7 +572,7 @@ public class UnboundedReadEvaluatorFactoryTest {
@Override
public boolean advance() throws IOException {
- readerAdvancedCount++;
+ READER_ADVANCED_COUNT.incrementAndGet();
if (index + 1 < elems.size()) {
index++;
return true;
@@ -578,11 +582,11 @@ public class UnboundedReadEvaluatorFactoryTest {
@Override
public Instant getWatermark() {
- getWatermarkCalls++;
+ getWatermarkCalls.incrementAndGet();
if (index + 1 == elems.size() &&
TestUnboundedSource.this.advanceWatermarkToInfinity) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
} else {
- return new Instant(index + getWatermarkCalls);
+ return new Instant(index + getWatermarkCalls.get());
}
}
@@ -618,7 +622,7 @@ public class UnboundedReadEvaluatorFactoryTest {
@Override
public void close() throws IOException {
try {
- readerClosedCount++;
+ READER_CLOSED_COUNT.incrementAndGet();
// Enforce the AutoCloseable contract. Close is not idempotent.
assertThat(closed, is(false));
if (throwOnClose) {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index fafa27f98fc..ad958dcdf76 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -4796,17 +4796,18 @@ public class StreamingDataflowWorkerTest {
private static class FakeSlowDoFn extends DoFn<String, String> {
- private static FakeClock clock; // A static variable keeps this DoFn
serializable.
+ private static final AtomicReference<FakeClock> clock =
+ new AtomicReference<>(); // A static variable keeps this DoFn
serializable.
private final Duration sleep;
FakeSlowDoFn(FakeClock clock, Duration sleep) {
- FakeSlowDoFn.clock = clock;
+ FakeSlowDoFn.clock.set(clock);
this.sleep = sleep;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
- clock.sleep(sleep);
+ clock.get().sleep(sleep);
c.output(c.element());
}
}
diff --git
a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/InMemoryMetrics.java
b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/InMemoryMetrics.java
index 69f6abee1f6..69df5768e5d 100644
---
a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/InMemoryMetrics.java
+++
b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/InMemoryMetrics.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import java.util.Collection;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
import
org.apache.beam.runners.spark.structuredstreaming.metrics.WithMetricsSupport;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.spark.metrics.sink.Sink;
@@ -29,8 +30,10 @@ import org.apache.spark.metrics.sink.Sink;
/** An in-memory {@link Sink} implementation for tests. */
public class InMemoryMetrics implements Sink {
- private static WithMetricsSupport extendedMetricsRegistry;
- private static MetricRegistry internalMetricRegistry;
+ private static final AtomicReference<WithMetricsSupport>
extendedMetricsRegistry =
+ new AtomicReference<>();
+ private static final AtomicReference<MetricRegistry> internalMetricRegistry =
+ new AtomicReference<>();
// Constructor for Spark 3.1
@SuppressWarnings("UnusedParameters")
@@ -38,24 +41,24 @@ public class InMemoryMetrics implements Sink {
final Properties properties,
final MetricRegistry metricRegistry,
final org.apache.spark.SecurityManager securityMgr) {
- extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
- internalMetricRegistry = metricRegistry;
+
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
+ internalMetricRegistry.set(metricRegistry);
}
// Constructor for Spark >= 3.2
@SuppressWarnings("UnusedParameters")
public InMemoryMetrics(final Properties properties, final MetricRegistry
metricRegistry) {
- extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
- internalMetricRegistry = metricRegistry;
+
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
+ internalMetricRegistry.set(metricRegistry);
}
@SuppressWarnings({"TypeParameterUnusedInFormals", "rawtypes"}) // because
of getGauges
public static <T> T valueOf(final String name) {
// this might fail in case we have multiple aggregators with the same
suffix after
// the last dot, but it should be good enough for tests.
- if (extendedMetricsRegistry != null) {
- Collection<Gauge> matches =
- extendedMetricsRegistry.getGauges((n, m) ->
n.endsWith(name)).values();
+ WithMetricsSupport extended = extendedMetricsRegistry.get();
+ if (extended != null) {
+ Collection<Gauge> matches = extended.getGauges((n, m) ->
n.endsWith(name)).values();
return matches.isEmpty() ? null : (T)
Iterables.getOnlyElement(matches).getValue();
} else {
return null;
@@ -64,8 +67,9 @@ public class InMemoryMetrics implements Sink {
@SuppressWarnings("WeakerAccess")
public static void clearAll() {
- if (internalMetricRegistry != null) {
- internalMetricRegistry.removeMatching(MetricFilter.ALL);
+ MetricRegistry internal = internalMetricRegistry.get();
+ if (internal != null) {
+ internal.removeMatching(MetricFilter.ALL);
}
}
diff --git
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
index db040bbfcc4..2c12c42bbf7 100644
---
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
+++
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import java.util.Collection;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.spark.metrics.sink.Sink;
@@ -29,8 +30,10 @@ import org.apache.spark.metrics.sink.Sink;
/** An in-memory {@link Sink} implementation for tests. */
public class InMemoryMetrics implements Sink {
- private static WithMetricsSupport extendedMetricsRegistry;
- private static MetricRegistry internalMetricRegistry;
+ private static final AtomicReference<WithMetricsSupport>
extendedMetricsRegistry =
+ new AtomicReference<>();
+ private static final AtomicReference<MetricRegistry> internalMetricRegistry =
+ new AtomicReference<>();
// Constructor for Spark 3.1
@SuppressWarnings("UnusedParameters")
@@ -38,24 +41,24 @@ public class InMemoryMetrics implements Sink {
final Properties properties,
final MetricRegistry metricRegistry,
final org.apache.spark.SecurityManager securityMgr) {
- extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
- internalMetricRegistry = metricRegistry;
+
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
+ internalMetricRegistry.set(metricRegistry);
}
// Constructor for Spark >= 3.2
@SuppressWarnings("UnusedParameters")
public InMemoryMetrics(final Properties properties, final MetricRegistry
metricRegistry) {
- extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
- internalMetricRegistry = metricRegistry;
+
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
+ internalMetricRegistry.set(metricRegistry);
}
@SuppressWarnings({"TypeParameterUnusedInFormals", "rawtypes"})
public static <T> T valueOf(final String name) {
// this might fail in case we have multiple aggregators with the same
suffix after
// the last dot, but it should be good enough for tests.
- if (extendedMetricsRegistry != null) {
- Collection<Gauge> matches =
- extendedMetricsRegistry.getGauges((n, m) ->
n.endsWith(name)).values();
+ WithMetricsSupport extended = extendedMetricsRegistry.get();
+ if (extended != null) {
+ Collection<Gauge> matches = extended.getGauges((n, m) ->
n.endsWith(name)).values();
return matches.isEmpty() ? null : (T)
Iterables.getOnlyElement(matches).getValue();
} else {
return null;
@@ -64,8 +67,9 @@ public class InMemoryMetrics implements Sink {
@SuppressWarnings("WeakerAccess")
public static void clearAll() {
- if (internalMetricRegistry != null) {
- internalMetricRegistry.removeMatching(MetricFilter.ALL);
+ MetricRegistry internal = internalMetricRegistry.get();
+ if (internal != null) {
+ internal.removeMatching(MetricFilter.ALL);
}
}
diff --git
a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java
index 9cb82f27722..ddd0e74d1c9 100644
---
a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java
+++
b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java
@@ -38,7 +38,8 @@ import org.junit.runner.RunWith;
* tests requiring a different context have to be forked using separate test
classes.
*/
@SuppressWarnings({
- "rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
+ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+ "StaticAssignmentInConstructor" // used for testing purposes
})
@RunWith(Enclosed.class)
public class SparkRunnerKryoRegistratorTest {
diff --git
a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java
b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java
index fb83c0060f4..616ad10edb4 100644
---
a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java
+++
b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java
@@ -21,12 +21,13 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
/** Helper for creating connection and test tables on hive database via JDBC
driver. */
class HiveDatabaseTestHelper {
- private static Connection con;
- private static Statement stmt;
+ private static final AtomicReference<Connection> con = new
AtomicReference<>();
+ private static final AtomicReference<Statement> stmt = new
AtomicReference<>();
HiveDatabaseTestHelper(
String hiveHost,
@@ -36,24 +37,24 @@ class HiveDatabaseTestHelper {
String hivePassword)
throws Exception {
String hiveUrl = String.format("jdbc:hive2://%s:%s/%s", hiveHost,
hivePort, hiveDatabase);
- con = DriverManager.getConnection(hiveUrl, hiveUsername, hivePassword);
- stmt = con.createStatement();
+ con.set(DriverManager.getConnection(hiveUrl, hiveUsername, hivePassword));
+ stmt.set(con.get().createStatement());
}
/** Create hive table. */
String createHiveTable(String testIdentifier) throws Exception {
String tableName = DatabaseTestHelper.getTestTableName(testIdentifier);
- stmt.execute(" CREATE TABLE IF NOT EXISTS " + tableName + " (id STRING)");
+ stmt.get().execute(" CREATE TABLE IF NOT EXISTS " + tableName + " (id
STRING)");
return tableName;
}
/** Delete hive table. */
void dropHiveTable(String tableName) throws SQLException {
- stmt.execute(" DROP TABLE " + tableName);
+ stmt.get().execute(" DROP TABLE " + tableName);
}
void closeConnection() throws Exception {
- stmt.close();
- con.close();
+ stmt.get().close();
+ con.get().close();
}
}
diff --git
a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index 7f3b394d7f6..b3233f86617 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -69,6 +69,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@@ -1197,17 +1198,14 @@ public class JmsIOTest {
private static class TextMessageMapperWithErrorCounter
implements SerializableBiFunction<String, Session, Message> {
- private static int errorCounter;
+ private static final AtomicInteger errorCounter = new AtomicInteger(0);
- TextMessageMapperWithErrorCounter() {
- errorCounter = 0;
- }
+ TextMessageMapperWithErrorCounter() {}
@Override
public Message apply(String value, Session session) {
try {
- if (errorCounter == 0) {
- errorCounter++;
+ if (errorCounter.getAndIncrement() == 0) {
throw new JMSException("Error!!");
}
TextMessage msg = session.createTextMessage();
diff --git
a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java
b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java
index 32e494496be..338a316b1b9 100644
---
a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java
+++
b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java
@@ -19,19 +19,17 @@ package org.apache.beam.sdk.io.snowflake.test;
import java.io.Serializable;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import net.snowflake.client.jdbc.SnowflakeSQLException;
/** Fake implementation of Snowflake warehouse used in test code. */
public class FakeSnowflakeDatabase implements Serializable {
- private static Map<String, List<String>> tables = new HashMap<>();
+ private static final Map<String, List<String>> tables = new
ConcurrentHashMap<>();
- private FakeSnowflakeDatabase() {
- tables = new HashMap<>();
- }
+ private FakeSnowflakeDatabase() {}
public static void createTable(String table) {
FakeSnowflakeDatabase.tables.put(table, Collections.emptyList());
@@ -72,7 +70,7 @@ public class FakeSnowflakeDatabase implements Serializable {
}
public static void clean() {
- FakeSnowflakeDatabase.tables = new HashMap<>();
+ tables.clear();
}
public static void truncateTable(String table) {