This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 48935a83720 Resolve Error Prone NonCanonicalType warnings across the
codebase (#37772)
48935a83720 is described below
commit 48935a8372012851962494534291e396194a94e9
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Fri Mar 20 18:26:53 2026 +0100
Resolve Error Prone NonCanonicalType warnings across the codebase (#37772)
---
.../beam/examples/complete/game/GameStats.java | 23 +++++++++++-----------
.../beam/examples/complete/game/LeaderBoard.java | 16 +++++++--------
.../examples/complete/game/StatefulTeamScore.java | 12 ++++-------
.../runners/core/MergingActiveWindowSetTest.java | 6 ++++--
.../FlinkStreamingPortablePipelineTranslator.java | 5 +----
.../flink/FlinkPortableClientEntryPoint.java | 3 ++-
.../FlinkStreamingPortablePipelineTranslator.java | 5 +----
.../flink/metrics/FlinkMetricContainerTest.java | 8 ++++----
.../worker/CachingShuffleBatchReaderTest.java | 2 +-
.../samza/runtime/ClassicBundleManagerTest.java | 2 +-
.../sdk/io/googleads/DummyRateLimitPolicy.java | 2 +-
.../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 5 ++---
.../sdk/io/gcp/testing/FakeBigQueryServices.java | 3 ++-
.../sdk/io/gcp/testing/FakeDatasetService.java | 4 ++--
.../sdk/io/gcp/spanner/SpannerIOWriteTest.java | 18 ++++++++---------
15 files changed, 52 insertions(+), 62 deletions(-)
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
b/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index b8163462624..783c136cf1b 100644
---
a/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++
b/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.complete.game.utils.GameConstants;
+import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
@@ -189,18 +190,17 @@ public class GameStats extends LeaderBoard {
* Create a map of information that describes how to write pipeline output
to BigQuery. This map
* is used to write information about team score sums.
*/
- protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String,
Integer>>>
+ protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
configureWindowedWrite() {
- Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
tableConfigure =
- new HashMap<>();
+ Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure
= new HashMap<>();
tableConfigure.put(
- "team", new WriteWindowedToBigQuery.FieldInfo<>("STRING", (c, w) ->
c.element().getKey()));
+ "team", new WriteToBigQuery.FieldInfo<>("STRING", (c, w) ->
c.element().getKey()));
tableConfigure.put(
"total_score",
- new WriteWindowedToBigQuery.FieldInfo<>("INTEGER", (c, w) ->
c.element().getValue()));
+ new WriteToBigQuery.FieldInfo<>("INTEGER", (c, w) ->
c.element().getValue()));
tableConfigure.put(
"window_start",
- new WriteWindowedToBigQuery.FieldInfo<>(
+ new WriteToBigQuery.FieldInfo<>(
"STRING",
(c, w) -> {
IntervalWindow window = (IntervalWindow) w;
@@ -208,7 +208,7 @@ public class GameStats extends LeaderBoard {
}));
tableConfigure.put(
"processing_time",
- new WriteWindowedToBigQuery.FieldInfo<>(
+ new WriteToBigQuery.FieldInfo<>(
"STRING", (c, w) ->
GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
return tableConfigure;
}
@@ -217,20 +217,19 @@ public class GameStats extends LeaderBoard {
* Create a map of information that describes how to write pipeline output
to BigQuery. This map
* is used to write information about mean user session time.
*/
- protected static Map<String, WriteWindowedToBigQuery.FieldInfo<Double>>
- configureSessionWindowWrite() {
+ protected static Map<String, WriteToBigQuery.FieldInfo<Double>>
configureSessionWindowWrite() {
- Map<String, WriteWindowedToBigQuery.FieldInfo<Double>> tableConfigure =
new HashMap<>();
+ Map<String, WriteToBigQuery.FieldInfo<Double>> tableConfigure = new
HashMap<>();
tableConfigure.put(
"window_start",
- new WriteWindowedToBigQuery.FieldInfo<>(
+ new WriteToBigQuery.FieldInfo<>(
"STRING",
(c, w) -> {
IntervalWindow window = (IntervalWindow) w;
return GameConstants.DATE_TIME_FORMATTER.print(window.start());
}));
tableConfigure.put(
- "mean_duration", new WriteWindowedToBigQuery.FieldInfo<>("FLOAT", (c,
w) -> c.element()));
+ "mean_duration", new WriteToBigQuery.FieldInfo<>("FLOAT", (c, w) ->
c.element()));
return tableConfigure;
}
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
b/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index b8d81056a3e..832c0ad79e7 100644
---
a/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++
b/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -130,19 +130,18 @@ public class LeaderBoard extends HourlyTeamScore {
* Create a map of information that describes how to write pipeline output
to BigQuery. This map
* is used to write team score sums and includes event timing information.
*/
- protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String,
Integer>>>
+ protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
configureWindowedTableWrite() {
- Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
tableConfigure =
- new HashMap<>();
+ Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure
= new HashMap<>();
tableConfigure.put(
- "team", new WriteWindowedToBigQuery.FieldInfo<>("STRING", (c, w) ->
c.element().getKey()));
+ "team", new WriteToBigQuery.FieldInfo<>("STRING", (c, w) ->
c.element().getKey()));
tableConfigure.put(
"total_score",
- new WriteWindowedToBigQuery.FieldInfo<>("INTEGER", (c, w) ->
c.element().getValue()));
+ new WriteToBigQuery.FieldInfo<>("INTEGER", (c, w) ->
c.element().getValue()));
tableConfigure.put(
"window_start",
- new WriteWindowedToBigQuery.FieldInfo<>(
+ new WriteToBigQuery.FieldInfo<>(
"STRING",
(c, w) -> {
IntervalWindow window = (IntervalWindow) w;
@@ -150,12 +149,11 @@ public class LeaderBoard extends HourlyTeamScore {
}));
tableConfigure.put(
"processing_time",
- new WriteWindowedToBigQuery.FieldInfo<>(
+ new WriteToBigQuery.FieldInfo<>(
"STRING", (c, w) ->
GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
tableConfigure.put(
"timing",
- new WriteWindowedToBigQuery.FieldInfo<>(
- "STRING", (c, w) -> c.pane().getTiming().toString()));
+ new WriteToBigQuery.FieldInfo<>("STRING", (c, w) ->
c.pane().getTiming().toString()));
return tableConfigure;
}
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java
b/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java
index 2315f1b811b..b28db261ab0 100644
---
a/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java
+++
b/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java
@@ -100,16 +100,12 @@ public class StatefulTeamScore extends LeaderBoard {
*/
private static Map<String, FieldInfo<KV<String, Integer>>>
configureCompleteWindowedTableWrite() {
- Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>>
tableConfigure =
- new HashMap<>();
- tableConfigure.put(
- "team", new WriteWindowedToBigQuery.FieldInfo<>("STRING", (c, w) ->
c.element().getKey()));
- tableConfigure.put(
- "total_score",
- new WriteWindowedToBigQuery.FieldInfo<>("INTEGER", (c, w) ->
c.element().getValue()));
+ Map<String, FieldInfo<KV<String, Integer>>> tableConfigure = new
HashMap<>();
+ tableConfigure.put("team", new FieldInfo<>("STRING", (c, w) ->
c.element().getKey()));
+ tableConfigure.put("total_score", new FieldInfo<>("INTEGER", (c, w) ->
c.element().getValue()));
tableConfigure.put(
"processing_time",
- new WriteWindowedToBigQuery.FieldInfo<>(
+ new FieldInfo<>(
"STRING", (c, w) ->
GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
return tableConfigure;
}
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
index c822f3c77d4..be88fd9b4ca 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Duration;
@@ -68,8 +69,9 @@ public class MergingActiveWindowSetTest {
private void add(long... instants) {
for (final long instant : instants) {
System.out.println("ADD " + instant);
- Sessions.AssignContext context =
- windowFn.new AssignContext() {
+ WindowFn<Object, IntervalWindow> wf = windowFn;
+ WindowFn<Object, IntervalWindow>.AssignContext context =
+ wf.new AssignContext() {
@Override
public Object element() {
return (Object) instant;
diff --git
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index e8929b84593..647297394f5 100644
---
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -284,10 +284,7 @@ public class FlinkStreamingPortablePipelineTranslator
return context;
}
- private void urnNotFound(
- String id,
- RunnerApi.Pipeline pipeline,
- FlinkStreamingPortablePipelineTranslator.TranslationContext context) {
+ private void urnNotFound(String id, RunnerApi.Pipeline pipeline,
TranslationContext context) {
throw new IllegalArgumentException(
String.format(
"Unknown type of URN %s for PTransform with id %s.",
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java
index 8f0ecf3efbd..7d9263678a2 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java
@@ -31,6 +31,7 @@ import
org.apache.beam.runners.fnexecution.environment.ProcessManager;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.jobsubmission.JobInvocation;
import org.apache.beam.runners.jobsubmission.JobInvoker;
+import org.apache.beam.runners.jobsubmission.JobServerDriver;
import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
@@ -210,7 +211,7 @@ public class FlinkPortableClientEntryPoint {
}
}
- private class DetachedJobInvokerFactory implements
FlinkJobServerDriver.JobInvokerFactory {
+ private class DetachedJobInvokerFactory implements
JobServerDriver.JobInvokerFactory {
private CountDownLatch latch = new CountDownLatch(1);
private volatile PortablePipelineRunner actualPipelineRunner;
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index caa5a1788c8..77dc4d79516 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -284,10 +284,7 @@ public class FlinkStreamingPortablePipelineTranslator
return context;
}
- private void urnNotFound(
- String id,
- RunnerApi.Pipeline pipeline,
- FlinkStreamingPortablePipelineTranslator.TranslationContext context) {
+ private void urnNotFound(String id, RunnerApi.Pipeline pipeline,
TranslationContext context) {
throw new IllegalArgumentException(
String.format(
"Unknown type of URN %s for PTransform with id %s.",
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
index 663af054085..52c0db92c39 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java
@@ -99,8 +99,8 @@ public class FlinkMetricContainerTest {
@Test
public void testGauge() {
- FlinkMetricContainer.FlinkGauge flinkGauge =
- new FlinkMetricContainer.FlinkGauge(GaugeResult.empty());
+ FlinkMetricContainerBase.FlinkGauge flinkGauge =
+ new FlinkMetricContainerBase.FlinkGauge(GaugeResult.empty());
when(metricGroup.gauge(eq("namespace.name"),
any())).thenReturn(flinkGauge);
MetricsContainer step = container.getMetricsContainer("step");
@@ -249,8 +249,8 @@ public class FlinkMetricContainerTest {
@Test
public void testDistribution() {
- FlinkMetricContainer.FlinkDistributionGauge flinkGauge =
- new
FlinkMetricContainer.FlinkDistributionGauge(DistributionResult.IDENTITY_ELEMENT);
+ FlinkDistributionGauge flinkGauge =
+ new FlinkDistributionGauge(DistributionResult.IDENTITY_ELEMENT);
when(metricGroup.gauge(eq("namespace.name"),
any())).thenReturn(flinkGauge);
MetricsContainer step = container.getMetricsContainer("step");
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java
index c1db7efdf08..b00c1494e2f 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java
@@ -82,7 +82,7 @@ public final class CachingShuffleBatchReaderTest {
verify(base, times(1)).read(null, null);
CachingShuffleBatchReader.BatchRange range =
new CachingShuffleBatchReader.BatchRange(null, null);
- CachingShuffleBatchReader.Batch batch = reader.cache.get(range);
+ ShuffleBatchReader.Batch batch = reader.cache.get(range);
assertThat(batch, notNullValue());
reader.cache.invalidateAll();
read = reader.read(null, null);
diff --git
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java
index 8ff29ecf383..09b51349aa4 100644
---
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java
+++
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java
@@ -50,7 +50,7 @@ public final class ClassicBundleManagerTest {
private FutureCollector<String> mockFutureCollector;
private ClassicBundleManager<String> bundleManager;
- private ClassicBundleManager.BundleProgressListener<String>
bundleProgressListener;
+ private BundleManager.BundleProgressListener<String> bundleProgressListener;
private Scheduler<KeyedTimerData<Void>> mockScheduler;
@Before
diff --git
a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/DummyRateLimitPolicy.java
b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/DummyRateLimitPolicy.java
index 205cba38c11..10d15940037 100644
---
a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/DummyRateLimitPolicy.java
+++
b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/DummyRateLimitPolicy.java
@@ -21,7 +21,7 @@ import com.google.ads.googleads.v23.errors.GoogleAdsError;
import com.google.protobuf.Message;
import org.checkerframework.checker.nullness.qual.Nullable;
-public class DummyRateLimitPolicy implements
GoogleAdsV23.RateLimitPolicy<GoogleAdsError> {
+public class DummyRateLimitPolicy implements
GoogleAdsIO.RateLimitPolicy<GoogleAdsError> {
@Override
public void onBeforeRequest(@Nullable String developerToken, String
customerId, Message request)
throws InterruptedException {}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
index 4c546f4963a..acc71df14aa 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
@@ -58,7 +58,6 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
-import
org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler;
import
org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.util.construction.SdkComponents;
import
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
@@ -353,7 +352,7 @@ public class BigQueryIOTranslation {
// from older Beam versions.
// See https://github.com/apache/beam/issues/30534.
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
- builder.setBadRecordErrorHandler(new
BadRecordErrorHandler.DefaultErrorHandler<>());
+ builder.setBadRecordErrorHandler(new
ErrorHandler.DefaultErrorHandler<>());
} else {
byte[] badRecordRouter = configRow.getBytes("bad_record_router");
builder.setBadRecordRouter((BadRecordRouter)
fromByteArray(badRecordRouter));
@@ -903,7 +902,7 @@ public class BigQueryIOTranslation {
// from older Beam versions.
// See https://github.com/apache/beam/issues/30534.
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
- builder.setBadRecordErrorHandler(new
BadRecordErrorHandler.DefaultErrorHandler<>());
+ builder.setBadRecordErrorHandler(new
ErrorHandler.DefaultErrorHandler<>());
} else {
byte[] badRecordRouter = configRow.getBytes("bad_record_router");
builder.setBadRecordRouter((BadRecordRouter)
fromByteArray(badRecordRouter));
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java
index c258ce4ab7f..c9df1e4dad0 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java
@@ -25,6 +25,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
@@ -128,7 +129,7 @@ public class FakeBigQueryServices implements
BigQueryServices {
// Longs tend to get converted back to Integers due to JSON serialization.
Convert them back.
public static TableRow convertNumbers(TableRow tableRow) {
- for (TableRow.Entry<?, Object> entry : tableRow.entrySet()) {
+ for (Map.Entry<?, Object> entry : tableRow.entrySet()) {
if (entry.getValue() instanceof Integer) {
entry.setValue(Long.valueOf((Integer) entry.getValue()));
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 6a975d44bcd..3e5f370434c 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.io.gcp.testing;
import static org.junit.Assert.assertEquals;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpHeaders;
+import com.google.api.client.http.HttpResponseException;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.grpc.GrpcStatusCode;
@@ -811,7 +811,7 @@ public class FakeDatasetService implements DatasetService,
WriteStreamService, S
void throwNotFound(@FormatString String format, Object... args) throws
IOException {
throw new IOException(
String.format(format, args),
- new GoogleJsonResponseException.Builder(404, String.format(format,
args), new HttpHeaders())
+ new HttpResponseException.Builder(404, String.format(format, args),
new HttpHeaders())
.build());
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
index 602ac9ae4f1..dbe017531b2 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
@@ -1065,7 +1065,7 @@ public class SpannerIOWriteTest implements Serializable {
BatchableMutationFilterFn testFn =
new BatchableMutationFilterFn(null, null, 10000000, 3 * CELLS_PER_KEY,
1000);
- BatchableMutationFilterFn.ProcessContext mockProcessContext =
+ DoFn<MutationGroup, MutationGroup>.ProcessContext mockProcessContext =
Mockito.mock(ProcessContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
@@ -1195,7 +1195,7 @@ public class SpannerIOWriteTest implements Serializable {
BatchableMutationFilterFn testFn = new BatchableMutationFilterFn(null,
null, 1000, 1000, 3);
- BatchableMutationFilterFn.ProcessContext mockProcessContext =
+ DoFn<MutationGroup, MutationGroup>.ProcessContext mockProcessContext =
Mockito.mock(ProcessContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
@@ -1246,7 +1246,7 @@ public class SpannerIOWriteTest implements Serializable {
BatchableMutationFilterFn testFn = new BatchableMutationFilterFn(null,
null, 0, 0, 0);
- BatchableMutationFilterFn.ProcessContext mockProcessContext =
+ DoFn<MutationGroup, MutationGroup>.ProcessContext mockProcessContext =
Mockito.mock(ProcessContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
@@ -1280,9 +1280,9 @@ public class SpannerIOWriteTest implements Serializable {
100, // groupingFactor
null);
- GatherSortCreateBatchesFn.ProcessContext mockProcessContext =
+ DoFn<MutationGroup, Iterable<MutationGroup>>.ProcessContext
mockProcessContext =
Mockito.mock(ProcessContext.class);
- GatherSortCreateBatchesFn.FinishBundleContext mockFinishBundleContext =
+ DoFn<MutationGroup, Iterable<MutationGroup>>.FinishBundleContext
mockFinishBundleContext =
Mockito.mock(FinishBundleContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
@@ -1355,9 +1355,9 @@ public class SpannerIOWriteTest implements Serializable {
3, // groupingFactor
null);
- GatherSortCreateBatchesFn.ProcessContext mockProcessContext =
+ DoFn<MutationGroup, Iterable<MutationGroup>>.ProcessContext
mockProcessContext =
Mockito.mock(ProcessContext.class);
- GatherSortCreateBatchesFn.FinishBundleContext mockFinishBundleContext =
+ DoFn<MutationGroup, Iterable<MutationGroup>>.FinishBundleContext
mockFinishBundleContext =
Mockito.mock(FinishBundleContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
OutputReceiver<Iterable<MutationGroup>> mockOutputReceiver =
mock(OutputReceiver.class);
@@ -1485,9 +1485,9 @@ public class SpannerIOWriteTest implements Serializable {
}
private void testAndVerifyBatches(GatherSortCreateBatchesFn testFn) throws
Exception {
- GatherSortCreateBatchesFn.ProcessContext mockProcessContext =
+ DoFn<MutationGroup, Iterable<MutationGroup>>.ProcessContext
mockProcessContext =
Mockito.mock(ProcessContext.class);
- GatherSortCreateBatchesFn.FinishBundleContext mockFinishBundleContext =
+ DoFn<MutationGroup, Iterable<MutationGroup>>.FinishBundleContext
mockFinishBundleContext =
Mockito.mock(FinishBundleContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());