Repository: beam Updated Branches: refs/heads/master 0b19fb414 -> e5929bd13
Pre read api refactoring. Extract `SpannerConfig` and `AbstractSpannerFn` Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/454f1c42 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/454f1c42 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/454f1c42 Branch: refs/heads/master Commit: 454f1c427353feeb858cdc62185ea3fced8d8a1f Parents: 80c9263 Author: Mairbek Khadikov <mair...@google.com> Authored: Mon Jun 19 13:01:20 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Tue Jun 27 18:36:01 2017 -0700 ---------------------------------------------------------------------- .../sdk/io/gcp/spanner/AbstractSpannerFn.java | 41 ++++ .../beam/sdk/io/gcp/spanner/SpannerConfig.java | 118 ++++++++++ .../beam/sdk/io/gcp/spanner/SpannerIO.java | 227 ++++--------------- .../sdk/io/gcp/spanner/SpannerWriteGroupFn.java | 108 +++++++++ .../beam/sdk/io/gcp/spanner/SpannerIOTest.java | 8 +- 5 files changed, 321 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/454f1c42/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java new file mode 100644 index 0000000..08f7fa9 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java @@ -0,0 +1,41 @@ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * Abstract {@link DoFn} that manages {@link Spanner} lifecycle. Use {@link + * AbstractSpannerFn#databaseClient} to access the Cloud Spanner database client. + */ +abstract class AbstractSpannerFn<InputT, OutputT> extends DoFn<InputT, OutputT> { + private transient Spanner spanner; + private transient DatabaseClient databaseClient; + + abstract SpannerConfig getSpannerConfig(); + + @Setup + public void setup() throws Exception { + SpannerConfig spannerConfig = getSpannerConfig(); + SpannerOptions options = spannerConfig.buildSpannerOptions(); + spanner = options.getService(); + databaseClient = spanner.getDatabaseClient(DatabaseId + .of(options.getProjectId(), spannerConfig.getInstanceId().get(), + spannerConfig.getDatabaseId().get())); + } + + @Teardown + public void teardown() throws Exception { + if (spanner == null) { + return; + } + spanner.close(); + spanner = null; + } + + protected DatabaseClient databaseClient() { + return databaseClient; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/454f1c42/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java new file mode 100644 index 0000000..4cb8aa2 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java @@ -0,0 +1,118 @@ +package org.apache.beam.sdk.io.gcp.spanner; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import com.google.cloud.ServiceFactory; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.common.annotations.VisibleForTesting; +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.display.DisplayData; + +/** Configuration for a Cloud Spanner client. */ +@AutoValue +public abstract class SpannerConfig implements Serializable { + + private static final long serialVersionUID = -5680874609304170301L; + + @Nullable + abstract ValueProvider<String> getProjectId(); + + @Nullable + abstract ValueProvider<String> getInstanceId(); + + @Nullable + abstract ValueProvider<String> getDatabaseId(); + + @Nullable + @VisibleForTesting + abstract ServiceFactory<Spanner, SpannerOptions> getServiceFactory(); + + abstract Builder toBuilder(); + + SpannerOptions buildSpannerOptions() { + SpannerOptions.Builder builder = SpannerOptions.newBuilder(); + if (getProjectId() != null) { + builder.setProjectId(getProjectId().get()); + } + if (getServiceFactory() != null) { + builder.setServiceFactory(getServiceFactory()); + } + return builder.build(); + } + + public static SpannerConfig create() { + return builder().build(); + } + + public static Builder builder() { + return new AutoValue_SpannerConfig.Builder(); + } + + public void validate(PipelineOptions options) { + checkNotNull( + getInstanceId(), + "SpannerIO.read() requires instance id to be set with withInstanceId method"); + checkNotNull( + getDatabaseId(), + "SpannerIO.read() requires database id to be set with withDatabaseId method"); + } + + public void populateDisplayData(DisplayData.Builder builder) { + builder + .addIfNotNull(DisplayData.item("projectId", getProjectId()).withLabel("Output Project")) + .addIfNotNull(DisplayData.item("instanceId", getInstanceId()).withLabel("Output Instance")) + .addIfNotNull(DisplayData.item("databaseId", getDatabaseId()).withLabel("Output Database")); + + if (getServiceFactory() != null) { + builder.addIfNotNull( + DisplayData.item("serviceFactory", getServiceFactory().getClass().getName()) + .withLabel("Service Factory")); + } + } + + /** Builder for {@link SpannerConfig}. */ + @AutoValue.Builder + public abstract static class Builder { + + + abstract Builder setProjectId(ValueProvider<String> projectId); + + abstract Builder setInstanceId(ValueProvider<String> instanceId); + + abstract Builder setDatabaseId(ValueProvider<String> databaseId); + + + abstract Builder setServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory); + + public abstract SpannerConfig build(); + } + + public SpannerConfig withProjectId(ValueProvider<String> projectId) { + return toBuilder().setProjectId(projectId).build(); + } + + public SpannerConfig withProjectId(String projectId) { + return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); + } + + public SpannerConfig withInstanceId(ValueProvider<String> instanceId) { + return toBuilder().setInstanceId(instanceId).build(); + } + + public SpannerConfig withInstanceId(String instanceId) { + return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); + } + + public SpannerConfig withDatabaseId(ValueProvider<String> databaseId) { + return toBuilder().setDatabaseId(databaseId).build(); + } + + public SpannerConfig withDatabaseId(String databaseId) { + return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/454f1c42/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 32bf1d0..791c7e7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -17,22 +17,13 @@ */ package org.apache.beam.sdk.io.gcp.spanner; -import static com.google.common.base.Preconditions.checkNotNull; - import com.google.auto.value.AutoValue; import com.google.cloud.ServiceFactory; -import com.google.cloud.ServiceOptions; -import com.google.cloud.spanner.AbortedException; -import com.google.cloud.spanner.DatabaseClient; -import com.google.cloud.spanner.DatabaseId; import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterables; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; + import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; @@ -42,16 +33,8 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayData.Builder; -import org.apache.beam.sdk.util.BackOff; -import org.apache.beam.sdk.util.BackOffUtils; -import org.apache.beam.sdk.util.FluentBackoff; -import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Experimental {@link PTransform Transforms} for reading from and writing to <a @@ -123,37 +106,23 @@ public class SpannerIO { @AutoValue public abstract static class Write extends PTransform<PCollection<Mutation>, PDone> { - @Nullable - abstract ValueProvider<String> getProjectId(); + private static final long serialVersionUID = 1920175411827980145L; - @Nullable - abstract ValueProvider<String> getInstanceId(); - - @Nullable - abstract ValueProvider<String> getDatabaseId(); + abstract SpannerConfig getSpannerConfig(); abstract long getBatchSizeBytes(); - @Nullable - @VisibleForTesting - abstract ServiceFactory<Spanner, SpannerOptions> getServiceFactory(); - abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { - abstract Builder setProjectId(ValueProvider<String> projectId); - - abstract Builder setInstanceId(ValueProvider<String> instanceId); + abstract Builder setSpannerConfig(SpannerConfig spannerConfig); - abstract Builder setDatabaseId(ValueProvider<String> databaseId); + abstract SpannerConfig.Builder spannerConfigBuilder(); abstract Builder setBatchSizeBytes(long batchSizeBytes); - @VisibleForTesting - abstract Builder setServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory); - abstract Write build(); } @@ -166,8 +135,15 @@ public class SpannerIO { return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); } + /** + * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner project. + * + * <p>Does not modify this object. + */ public Write withProjectId(ValueProvider<String> projectId) { - return toBuilder().setProjectId(projectId).build(); + Write.Builder builder = toBuilder(); + builder.spannerConfigBuilder().setProjectId(projectId); + return builder.build(); } /** @@ -180,11 +156,30 @@ public class SpannerIO { return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); } + /** + * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner + * instance. + * + * <p>Does not modify this object. + */ public Write withInstanceId(ValueProvider<String> instanceId) { - return toBuilder().setInstanceId(instanceId).build(); + Write.Builder builder = toBuilder(); + builder.spannerConfigBuilder().setInstanceId(instanceId); + return builder.build(); } /** + * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner + * config. + * + * <p>Does not modify this object. + */ + public Write withSpannerConfig(SpannerConfig spannerConfig) { + return toBuilder().setSpannerConfig(spannerConfig).build(); + } + + + /** * Returns a new {@link SpannerIO.Write} with a new batch size limit. * * <p>Does not modify this object. @@ -203,8 +198,16 @@ public class SpannerIO { return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); } + /** + * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner + * database. + * + * <p>Does not modify this object. + */ public Write withDatabaseId(ValueProvider<String> databaseId) { - return toBuilder().setDatabaseId(databaseId).build(); + Write.Builder builder = toBuilder(); + builder.spannerConfigBuilder().setDatabaseId(databaseId); + return builder.build(); } /** @@ -216,17 +219,14 @@ public class SpannerIO { @VisibleForTesting Write withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) { - return toBuilder().setServiceFactory(serviceFactory).build(); + Write.Builder builder = toBuilder(); + builder.spannerConfigBuilder().setServiceFactory(serviceFactory); + return builder.build(); } @Override public void validate(PipelineOptions options) { - checkNotNull( - getInstanceId(), - "SpannerIO.write() requires instance id to be set with withInstanceId method"); - checkNotNull( - getDatabaseId(), - "SpannerIO.write() requires database id to be set with withDatabaseId method"); + getSpannerConfig().validate(options); } @Override @@ -237,22 +237,13 @@ public class SpannerIO { return PDone.in(input.getPipeline()); } + @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("projectId", getProjectId()).withLabel("Output Project")) - .addIfNotNull( - DisplayData.item("instanceId", getInstanceId()).withLabel("Output Instance")) - .addIfNotNull( - DisplayData.item("databaseId", getDatabaseId()).withLabel("Output Database")) - .add(DisplayData.item("batchSizeBytes", getBatchSizeBytes()) - .withLabel("Batch Size in Bytes")); - if (getServiceFactory() != null) { - builder.addIfNotNull( - DisplayData.item("serviceFactory", getServiceFactory().getClass().getName()) - .withLabel("Service Factory")); - } + getSpannerConfig().populateDisplayData(builder); + builder.add( + DisplayData.item("batchSizeBytes", getBatchSizeBytes()).withLabel("Batch Size in Bytes")); } } @@ -278,123 +269,5 @@ public class SpannerIO { } } - /** Batches together and writes mutations to Google Cloud Spanner. */ - @VisibleForTesting - static class SpannerWriteGroupFn extends DoFn<MutationGroup, Void> { - private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteGroupFn.class); - private final Write spec; - private transient Spanner spanner; - private transient DatabaseClient dbClient; - // Current batch of mutations to be written. - private List<MutationGroup> mutations; - private long batchSizeBytes = 0; - - private static final int MAX_RETRIES = 5; - private static final FluentBackoff BUNDLE_WRITE_BACKOFF = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_RETRIES) - .withInitialBackoff(Duration.standardSeconds(5)); - - @VisibleForTesting SpannerWriteGroupFn(Write spec) { - this.spec = spec; - } - - @Setup - public void setup() throws Exception { - SpannerOptions spannerOptions = getSpannerOptions(); - spanner = spannerOptions.getService(); - dbClient = spanner.getDatabaseClient( - DatabaseId.of(projectId(), spec.getInstanceId().get(), spec.getDatabaseId().get())); - mutations = new ArrayList<>(); - batchSizeBytes = 0; - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - MutationGroup m = c.element(); - mutations.add(m); - batchSizeBytes += MutationSizeEstimator.sizeOf(m); - if (batchSizeBytes >= spec.getBatchSizeBytes()) { - flushBatch(); - } - } - - private String projectId() { - return spec.getProjectId() == null - ? ServiceOptions.getDefaultProjectId() - : spec.getProjectId().get(); - } - - @FinishBundle - public void finishBundle() throws Exception { - if (!mutations.isEmpty()) { - flushBatch(); - } - } - - @Teardown - public void teardown() throws Exception { - if (spanner == null) { - return; - } - spanner.close(); - spanner = null; - } - - private SpannerOptions getSpannerOptions() { - SpannerOptions.Builder spannerOptionsBuider = SpannerOptions.newBuilder(); - if (spec.getServiceFactory() != null) { - spannerOptionsBuider.setServiceFactory(spec.getServiceFactory()); - } - if (spec.getProjectId() != null) { - spannerOptionsBuider.setProjectId(spec.getProjectId().get()); - } - return spannerOptionsBuider.build(); - } - - /** - * Writes a batch of mutations to Cloud Spanner. - * - * <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. If the retry limit - * is exceeded, the last exception from Cloud Spanner will be thrown. - * - * @throws AbortedException if the commit fails or IOException or InterruptedException if - * backing off between retries fails. - */ - private void flushBatch() throws AbortedException, IOException, InterruptedException { - LOG.debug("Writing batch of {} mutations", mutations.size()); - Sleeper sleeper = Sleeper.DEFAULT; - BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff(); - - while (true) { - // Batch upsert rows. - try { - dbClient.writeAtLeastOnce(Iterables.concat(mutations)); - - // Break if the commit threw no exception. - break; - } catch (AbortedException exception) { - // Only log the code and message for potentially-transient errors. The entire exception - // will be propagated upon the last retry. - LOG.error( - "Error writing to Spanner ({}): {}", exception.getCode(), exception.getMessage()); - if (!BackOffUtils.next(sleeper, backoff)) { - LOG.error("Aborting after {} retries.", MAX_RETRIES); - throw exception; - } - } - } - LOG.debug("Successfully wrote {} mutations", mutations.size()); - mutations = new ArrayList<>(); - batchSizeBytes = 0; - } - - @Override - public void populateDisplayData(Builder builder) { - super.populateDisplayData(builder); - spec.populateDisplayData(builder); - } - } - private SpannerIO() {} // Prevent construction. } http://git-wip-us.apache.org/repos/asf/beam/blob/454f1c42/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java new file mode 100644 index 0000000..aed4832 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java @@ -0,0 +1,108 @@ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.cloud.spanner.AbortedException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Batches together and writes mutations to Google Cloud Spanner. */ +@VisibleForTesting class SpannerWriteGroupFn extends AbstractSpannerFn<MutationGroup, Void> { + private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteGroupFn.class); + private final SpannerIO.Write spec; + // Current batch of mutations to be written. + private List<MutationGroup> mutations; + private long batchSizeBytes = 0; + + private static final int MAX_RETRIES = 5; + private static final FluentBackoff BUNDLE_WRITE_BACKOFF = + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RETRIES) + .withInitialBackoff(Duration.standardSeconds(5)); + + @VisibleForTesting SpannerWriteGroupFn(SpannerIO.Write spec) { + this.spec = spec; + } + + @Override SpannerConfig getSpannerConfig() { + return spec.getSpannerConfig(); + } + + @Setup + public void setup() throws Exception { + super.setup(); + mutations = new ArrayList<>(); + batchSizeBytes = 0; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + MutationGroup m = c.element(); + mutations.add(m); + batchSizeBytes += MutationSizeEstimator.sizeOf(m); + if (batchSizeBytes >= spec.getBatchSizeBytes()) { + flushBatch(); + } + } + + @FinishBundle + public void finishBundle() throws Exception { + if (!mutations.isEmpty()) { + flushBatch(); + } + } + + /** + * Writes a batch of mutations to Cloud Spanner. + * + * <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. If the retry limit + * is exceeded, the last exception from Cloud Spanner will be thrown. + * + * @throws AbortedException if the commit fails or IOException or InterruptedException if + * backing off between retries fails. + */ + private void flushBatch() throws AbortedException, IOException, InterruptedException { + LOG.debug("Writing batch of {} mutations", mutations.size()); + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff(); + + while (true) { + // Batch upsert rows. + try { + databaseClient().writeAtLeastOnce(Iterables.concat(mutations)); + + // Break if the commit threw no exception. + break; + } catch (AbortedException exception) { + // Only log the code and message for potentially-transient errors. The entire exception + // will be propagated upon the last retry. + LOG.error( + "Error writing to Spanner ({}): {}", exception.getCode(), exception.getMessage()); + if (!BackOffUtils.next(sleeper, backoff)) { + LOG.error("Aborting after {} retries.", MAX_RETRIES); + throw exception; + } + } + } + LOG.debug("Successfully wrote {} mutations", mutations.size()); + mutations = new ArrayList<>(); + batchSizeBytes = 0; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + spec.populateDisplayData(builder); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/454f1c42/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java index 0cc08bf..abeac0a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java @@ -149,7 +149,7 @@ public class SpannerIOTest implements Serializable { .withDatabaseId("test-database") .withBatchSizeBytes(1000000000) .withServiceFactory(serviceFactory); - SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write); + SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write); DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn); fnTester.processBundle(Arrays.asList(one, two)); @@ -175,7 +175,7 @@ public class SpannerIOTest implements Serializable { .withDatabaseId("test-database") .withBatchSizeBytes(batchSize) .withServiceFactory(serviceFactory); - SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write); + SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write); DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn); fnTester.processBundle(Arrays.asList(one, two, three)); @@ -198,7 +198,7 @@ public class SpannerIOTest implements Serializable { .withDatabaseId("test-database") .withBatchSizeBytes(0) // turn off batching. .withServiceFactory(serviceFactory); - SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write); + SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write); DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn); fnTester.processBundle(Arrays.asList(one, two)); @@ -224,7 +224,7 @@ public class SpannerIOTest implements Serializable { .withDatabaseId("test-database") .withBatchSizeBytes(batchSize) .withServiceFactory(serviceFactory); - SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write); + SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write); DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn); fnTester.processBundle(Arrays.asList(g(one, two, three)));