[ 
https://issues.apache.org/jira/browse/BEAM-3008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297690#comment-16297690
 ] 

ASF GitHub Bot commented on BEAM-3008:
--------------------------------------

chamikaramj closed pull request #4277: [BEAM-3008] Introduces BigtableConfig to 
contain similar logic for Read and Write
URL: https://github.com/apache/beam/pull/4277
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java
new file mode 100644
index 00000000000..ba633d0cdbb
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+/**
+ * Configuration for a Cloud Bigtable client.
+ */
+@AutoValue
+abstract class BigtableConfig implements Serializable {
+
+  /**
+   * Returns the project id being written to.
+   */
+  @Nullable
+  abstract String getProjectId();
+
+  /**
+   * Returns the instance id being written to.
+   */
+  @Nullable
+  abstract String getInstanceId();
+
+  /**
+   * Returns the table being read from.
+   */
+  @Nullable
+  abstract String getTableId();
+
+  /**
+   * Returns the Google Cloud Bigtable instance being written to, and other 
parameters.
+   *
+   * @deprecated will be replaced by bigtable options configurator.
+   */
+  @Deprecated
+  @Nullable
+  abstract BigtableOptions getBigtableOptions();
+
+  /**
+   * Configurator of the effective Bigtable Options.
+   */
+  @Nullable
+  abstract SerializableFunction<BigtableOptions.Builder,
+    BigtableOptions.Builder> getBigtableOptionsConfigurator();
+
+  /**
+   * Weather validate that table exists before writing.
+   */
+  abstract boolean getValidate();
+
+  /**
+   * {@link BigtableService} used only for testing.
+   */
+  @Nullable
+  abstract BigtableService getBigtableService();
+
+  abstract Builder toBuilder();
+
+  static BigtableConfig.Builder builder() {
+    return new AutoValue_BigtableConfig.Builder();
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+
+    abstract Builder setProjectId(String projectId);
+
+    abstract Builder setInstanceId(String instanceId);
+
+    abstract Builder setTableId(String tableId);
+
+    /**
+     * @deprecated will be replaced by bigtable options configurator.
+     */
+    @Deprecated
+    abstract Builder setBigtableOptions(BigtableOptions options);
+
+    abstract Builder setValidate(boolean validate);
+
+    abstract Builder setBigtableOptionsConfigurator(
+      SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> 
optionsConfigurator);
+
+    abstract Builder setBigtableService(BigtableService bigtableService);
+
+    abstract BigtableConfig build();
+  }
+
+  BigtableConfig withProjectId(String projectId) {
+    checkNotNull(projectId, "Project Id of BigTable can not be null");
+    return toBuilder().setProjectId(projectId).build();
+  }
+
+  BigtableConfig withInstanceId(String instanceId) {
+    checkNotNull(instanceId, "Instance Id of BigTable can not be null");
+    return toBuilder().setInstanceId(instanceId).build();
+  }
+
+  BigtableConfig withTableId(String tableId) {
+    checkNotNull(tableId, "tableId can not be null");
+    return toBuilder().setTableId(tableId).build();
+  }
+
+  /**
+   * @deprecated will be replaced by bigtable options configurator.
+   */
+  @Deprecated
+  BigtableConfig withBigtableOptions(BigtableOptions options) {
+    checkNotNull(options, "Bigtable options can not be null");
+    return toBuilder().setBigtableOptions(options).build();
+  }
+
+  BigtableConfig withBigtableOptionsConfigurator(
+    SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> 
configurator) {
+    checkNotNull(configurator, "configurator can not be null");
+    return toBuilder().setBigtableOptionsConfigurator(configurator).build();
+  }
+
+  BigtableConfig withValidate(boolean isEnabled) {
+    return toBuilder().setValidate(isEnabled).build();
+  }
+
+  @VisibleForTesting
+  BigtableConfig withBigtableService(BigtableService bigtableService) {
+    checkNotNull(bigtableService, "bigtableService can not be null");
+    return toBuilder().setBigtableService(bigtableService).build();
+  }
+
+  void validate() {
+    checkArgument(getProjectId() != null && !getProjectId().isEmpty()
+        || getBigtableOptions() != null && getBigtableOptions().getProjectId() 
!= null
+        && !getBigtableOptions().getProjectId().isEmpty(),
+      "Could not obtain Bigtable project id");
+
+    checkArgument(getInstanceId() != null && !getInstanceId().isEmpty()
+        || getBigtableOptions() != null && 
getBigtableOptions().getInstanceId() != null
+        && !getBigtableOptions().getInstanceId().isEmpty(),
+      "Could not obtain Bigtable instance id");
+
+    checkArgument(getTableId() != null && !getTableId().isEmpty(),
+      "Could not obtain Bigtable table id");
+  }
+
+  void populateDisplayData(DisplayData.Builder builder) {
+    builder.add(DisplayData.item("tableId", getTableId())
+      .withLabel("Table ID"));
+
+    if (getBigtableOptions() != null) {
+      builder.add(DisplayData.item("bigtableOptions", 
getBigtableOptions().toString())
+        .withLabel("Bigtable Options"));
+    }
+
+    if (getProjectId() != null) {
+      builder.add(DisplayData.item("projectId", getProjectId())
+        .withLabel("Bigtable Project Id"));
+    }
+
+    if (getInstanceId() != null) {
+      builder.add(DisplayData.item("instanceId", getInstanceId())
+        .withLabel("Bigtable Instnace Id"));
+    }
+
+    builder.add(DisplayData.item("effectiveBigtableOptions",
+      effectiveUserProvidedBigtableOptions().build().toString())
+      .withLabel("Effective BigtableOptions resulted from configuration of 
given options"));
+  }
+
+  /**
+   * Helper function that either returns the mock Bigtable service supplied by
+   * {@link #withBigtableService} or creates and returns an implementation 
that talks to
+   * {@code Cloud Bigtable}.
+   *
+   * <p>Also populate the credentials option from {@link 
GcpOptions#getGcpCredential()} if the
+   * default credentials are being used on {@link BigtableOptions}.
+   */
+  @VisibleForTesting
+  BigtableService getBigtableService(PipelineOptions pipelineOptions) {
+    if (getBigtableService() != null) {
+      return getBigtableService();
+    }
+
+    BigtableOptions.Builder bigtableOptions = 
effectiveUserProvidedBigtableOptions();
+
+    bigtableOptions.setUserAgent(pipelineOptions.getUserAgent());
+
+    if (bigtableOptions.build().getCredentialOptions().getCredentialType()
+      == CredentialOptions.CredentialType.DefaultCredentials) {
+      bigtableOptions.setCredentialOptions(
+        CredentialOptions.credential(
+          pipelineOptions.as(GcpOptions.class).getGcpCredential()));
+    }
+
+    // Default option that should be forced
+    bigtableOptions.setUseCachedDataPool(true);
+
+    return new BigtableServiceImpl(bigtableOptions.build());
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(BigtableConfig.class)
+      .add("projectId", getProjectId())
+      .add("instanceId", getInstanceId())
+      .add("tableId", getTableId())
+      .add("bigtableOptionsConfigurator",
+        getBigtableOptionsConfigurator() == null ? null : 
getBigtableOptionsConfigurator()
+          .getClass().getName())
+      .add("options", getBigtableOptions())
+      .add("effectiveOptions", effectiveUserProvidedBigtableOptions())
+      .toString();
+  }
+
+  private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() {
+    BigtableOptions.Builder effectiveOptions = getBigtableOptions() != null
+      ? getBigtableOptions().toBuilder()
+      : new BigtableOptions.Builder();
+
+    if (getBigtableOptionsConfigurator() != null) {
+      effectiveOptions = 
getBigtableOptionsConfigurator().apply(effectiveOptions);
+    }
+
+    if (getInstanceId() != null) {
+      effectiveOptions.setInstanceId(getInstanceId());
+    }
+
+    if (getProjectId() != null) {
+      effectiveOptions.setProjectId(getProjectId());
+    }
+
+    return effectiveOptions;
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index febdc1f53b0..4199b28833c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -28,9 +28,6 @@
 import com.google.bigtable.v2.RowFilter;
 import com.google.bigtable.v2.SampleRowKeysResponse;
 import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.cloud.bigtable.config.CredentialOptions;
-import com.google.cloud.bigtable.config.CredentialOptions.CredentialType;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -48,7 +45,6 @@
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
@@ -161,11 +157,7 @@
    */
   @Experimental
   public static Read read() {
-    return new AutoValue_BigtableIO_Read.Builder()
-        .setKeyRange(ByteKeyRange.ALL_KEYS)
-        .setTableId("")
-        .setValidate(true)
-        .build();
+    return Read.create();
   }
 
   /**
@@ -177,10 +169,7 @@ public static Read read() {
    */
   @Experimental
   public static Write write() {
-    return new AutoValue_BigtableIO_Write.Builder()
-        .setTableId("")
-        .setValidate(true)
-        .build();
+    return Write.create();
   }
 
   /**
@@ -193,13 +182,7 @@ public static Write write() {
   @AutoValue
   public abstract static class Read extends PTransform<PBegin, 
PCollection<Row>> {
 
-    /** Returns the project id being written to. */
-    @Nullable
-    abstract String getProjectId();
-
-    /** Returns the instance id being written to. */
-    @Nullable
-    abstract String getInstanceId();
+    abstract BigtableConfig getBigtableConfig();
 
     @Nullable
     abstract RowFilter getRowFilter();
@@ -210,10 +193,9 @@ public static Write write() {
 
     /** Returns the table being read from. */
     @Nullable
-    public abstract String getTableId();
-
-    @Nullable
-    abstract BigtableService getBigtableService();
+    public String getTableId() {
+      return getBigtableConfig().getTableId();
+    }
 
     /**
      * Returns the Google Cloud Bigtable instance being read from, and other 
parameters.
@@ -221,43 +203,33 @@ public static Write write() {
      */
     @Deprecated
     @Nullable
-    public abstract BigtableOptions getBigtableOptions();
+    public BigtableOptions getBigtableOptions() {
+      return getBigtableConfig().getBigtableOptions();
+    }
 
-    public abstract boolean getValidate();
+    abstract Builder toBuilder();
 
-    /**
-     * Configurator of the effective Bigtable Options.
-     */
-    @Nullable
-    abstract SerializableFunction<BigtableOptions.Builder,
-      BigtableOptions.Builder> getBigtableOptionsConfigurator();
+    static Read create() {
+      BigtableConfig config = BigtableConfig.builder()
+        .setTableId("")
+        .setValidate(true)
+        .build();
 
-    abstract Builder toBuilder();
+      return new AutoValue_BigtableIO_Read.Builder()
+        .setBigtableConfig(config)
+        .setKeyRange(ByteKeyRange.ALL_KEYS)
+        .build();
+    }
 
     @AutoValue.Builder
     abstract static class Builder {
 
-      abstract Builder setProjectId(String projectId);
-
-      abstract Builder setInstanceId(String instanceId);
+      abstract Builder setBigtableConfig(BigtableConfig bigtableConfig);
 
       abstract Builder setRowFilter(RowFilter filter);
 
       abstract Builder setKeyRange(ByteKeyRange keyRange);
 
-      abstract Builder setTableId(String tableId);
-
-      /** @deprecated will be replaced by bigtable options configurator. */
-      @Deprecated
-      abstract Builder setBigtableOptions(BigtableOptions options);
-
-      abstract Builder setBigtableService(BigtableService bigtableService);
-
-      abstract Builder setValidate(boolean validate);
-
-      abstract Builder setBigtableOptionsConfigurator(
-        SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> 
optionsConfigurator);
-
       abstract Read build();
     }
 
@@ -269,8 +241,8 @@ abstract Builder setBigtableOptionsConfigurator(
      * <p>Does not modify this object.
      */
     public Read withProjectId(String projectId) {
-      checkNotNull(projectId, "Project Id of BigTable can not be null");
-      return toBuilder().setProjectId(projectId).build();
+      BigtableConfig config = getBigtableConfig();
+      return 
toBuilder().setBigtableConfig(config.withProjectId(projectId)).build();
     }
 
     /**
@@ -281,8 +253,8 @@ public Read withProjectId(String projectId) {
      * <p>Does not modify this object.
      */
     public Read withInstanceId(String instanceId) {
-      checkNotNull(instanceId, "Instance Id of BigTable can not be null");
-      return toBuilder().setInstanceId(instanceId).build();
+      BigtableConfig config = getBigtableConfig();
+      return 
toBuilder().setBigtableConfig(config.withInstanceId(instanceId)).build();
     }
 
     /**
@@ -320,9 +292,11 @@ public Read withBigtableOptions(BigtableOptions options) {
      */
     @Deprecated
     public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
-      checkArgument(optionsBuilder != null, "optionsBuilder can not be null");
+      BigtableConfig config = getBigtableConfig();
       // TODO: is there a better way to clone a Builder? Want it to be immune 
from user changes.
-      return 
toBuilder().setBigtableOptions(optionsBuilder.build().toBuilder().build()).build();
+      return toBuilder()
+        
.setBigtableConfig(config.withBigtableOptions(optionsBuilder.build().toBuilder().build()))
+        .build();
     }
 
     /**
@@ -336,8 +310,10 @@ public Read withBigtableOptions(BigtableOptions.Builder 
optionsBuilder) {
      */
     public Read withBigtableOptionsConfigurator(
       SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> 
configurator) {
-      checkArgument(configurator != null, "configurator can not be null");
-      return toBuilder().setBigtableOptionsConfigurator(configurator).build();
+      BigtableConfig config = getBigtableConfig();
+      return toBuilder()
+        
.setBigtableConfig(config.withBigtableOptionsConfigurator(configurator))
+        .build();
     }
 
     /**
@@ -367,25 +343,38 @@ public Read withKeyRange(ByteKeyRange keyRange) {
      * <p>Does not modify this object.
      */
     public Read withTableId(String tableId) {
-      checkArgument(tableId != null, "tableId can not be null");
-      return toBuilder().setTableId(tableId).build();
+      BigtableConfig config = getBigtableConfig();
+      return 
toBuilder().setBigtableConfig(config.withTableId(tableId)).build();
     }
 
     /** Disables validation that the table being read from exists. */
     public Read withoutValidation() {
-      return toBuilder().setValidate(false).build();
+      BigtableConfig config = getBigtableConfig();
+      return toBuilder().setBigtableConfig(config.withValidate(false)).build();
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Read} that will read using the given 
Cloud Bigtable
+     * service implementation.
+     *
+     * <p>This is used for testing.
+     *
+     * <p>Does not modify this object.
+     */
+    Read withBigtableService(BigtableService bigtableService) {
+      BigtableConfig config = getBigtableConfig();
+      return 
toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build();
     }
 
     @Override
     public PCollection<Row> expand(PBegin input) {
-      validateBigtableConfig(getBigtableOptions(), getProjectId(), 
getInstanceId());
-      checkArgument(getTableId() != null && !getTableId().isEmpty(), 
"withTableId() is required");
+      getBigtableConfig().validate();
 
       BigtableSource source =
           new BigtableSource(new SerializableFunction<PipelineOptions, 
BigtableService>() {
             @Override
             public BigtableService apply(PipelineOptions options) {
-              return getBigtableService(options);
+              return getBigtableConfig().getBigtableService(options);
             }
           }, getTableId(), getRowFilter(), getKeyRange(), null);
       return 
input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
@@ -393,39 +382,13 @@ public BigtableService apply(PipelineOptions options) {
 
     @Override
     public void validate(PipelineOptions options) {
-      if (getValidate()) {
-        try {
-          checkArgument(
-              getBigtableService(options).tableExists(getTableId()),
-              "Table %s does not exist",
-              getTableId());
-        } catch (IOException e) {
-          LOG.warn("Error checking whether table {} exists; proceeding.", 
getTableId(), e);
-        }
-      }
+      validateTableExists(getBigtableConfig(), options);
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-
-      builder.add(DisplayData.item("tableId", getTableId())
-        .withLabel("Table ID"));
-
-      if (getBigtableOptions() != null) {
-        builder.add(DisplayData.item("bigtableOptions", 
getBigtableOptions().toString())
-          .withLabel("Bigtable Options"));
-      }
-
-      if (getProjectId() != null) {
-        builder.add(DisplayData.item("projectId", getProjectId())
-            .withLabel("Bigtable Project Id"));
-      }
-
-      if (getInstanceId() != null) {
-        builder.add(DisplayData.item("instanceId", getInstanceId())
-            .withLabel("Bigtable Instnace Id"));
-      }
+      getBigtableConfig().populateDisplayData(builder);
 
       builder.addIfNotDefault(
           DisplayData.item("keyRange", getKeyRange().toString()), 
ByteKeyRange.ALL_KEYS.toString());
@@ -434,90 +397,15 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
         builder.add(DisplayData.item("rowFilter", getRowFilter().toString())
           .withLabel("Table Row Filter"));
       }
-
-      builder.add(DisplayData.item("effectiveBigtableOptions",
-        effectiveUserProvidedBigtableOptions().build().toString())
-        .withLabel("Effective BigtableOptions resulted from configuration of 
given options"));
     }
 
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(Read.class)
-          .add("options", getBigtableOptions())
-          .add("effectiveOptions", effectiveUserProvidedBigtableOptions())
-          .add("projectId", getProjectId())
-          .add("instanceId", getInstanceId())
-          .add("tableId", getTableId())
-          .add("keyRange", getKeyRange())
-          .add("filter", getRowFilter())
-          .add("bigtableOptionsConfigurator",
-            getBigtableOptionsConfigurator() == null ? null : 
getBigtableOptionsConfigurator()
-              .getClass().getName())
-          .toString();
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Read} that will read using the given 
Cloud Bigtable
-     * service implementation.
-     *
-     * <p>This is used for testing.
-     *
-     * <p>Does not modify this object.
-     */
-    Read withBigtableService(BigtableService bigtableService) {
-      checkArgument(bigtableService != null, "bigtableService can not be 
null");
-      return toBuilder().setBigtableService(bigtableService).build();
-    }
-
-    /**
-     * Helper function that either returns the mock Bigtable service supplied 
by
-     * {@link #withBigtableService} or creates and returns an implementation 
that talks to
-     * {@code Cloud Bigtable}.
-     *
-     * <p>Also populate the credentials option from {@link 
GcpOptions#getGcpCredential()} if the
-     * default credentials are being used on {@link BigtableOptions}.
-     */
-    @VisibleForTesting
-    BigtableService getBigtableService(PipelineOptions pipelineOptions) {
-      if (getBigtableService() != null) {
-        return getBigtableService();
-      }
-
-      BigtableOptions.Builder bigtableOptions = 
effectiveUserProvidedBigtableOptions();
-
-      bigtableOptions.setUserAgent(pipelineOptions.getUserAgent());
-
-      if (getBigtableOptions() != null && 
getBigtableOptions().getCredentialOptions()
-          .getCredentialType() == CredentialType.DefaultCredentials) {
-        bigtableOptions.setCredentialOptions(
-            CredentialOptions.credential(
-                pipelineOptions.as(GcpOptions.class).getGcpCredential()));
-      }
-
-      // Default option that should be forced
-      bigtableOptions.setUseCachedDataPool(true);
-
-      return new BigtableServiceImpl(bigtableOptions.build());
-    }
-
-    private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() {
-      BigtableOptions.Builder effectiveOptions = getBigtableOptions() != null
-        ? getBigtableOptions().toBuilder()
-        : new BigtableOptions.Builder();
-
-      if (getBigtableOptionsConfigurator() != null) {
-        effectiveOptions = 
getBigtableOptionsConfigurator().apply(effectiveOptions);
-      }
-
-      if (getInstanceId() != null) {
-        effectiveOptions.setInstanceId(getInstanceId());
-      }
-
-      if (getProjectId() != null) {
-        effectiveOptions.setProjectId(getProjectId());
-      }
-
-      return effectiveOptions;
+        .add("config", getBigtableConfig())
+        .add("keyRange", getKeyRange())
+        .add("filter", getRowFilter())
+        .toString();
     }
   }
 
@@ -532,20 +420,26 @@ BigtableService getBigtableService(PipelineOptions 
pipelineOptions) {
   public abstract static class Write
       extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, 
PDone> {
 
-    /** Returns the project id being written to. */
-    @Nullable
-    abstract String getProjectId();
-
-    /** Returns the instance id being written to. */
-    @Nullable
-    abstract String getInstanceId();
+    static SerializableFunction<BigtableOptions.Builder, 
BigtableOptions.Builder>
+    enableBulkApiConfigurator(final @Nullable 
SerializableFunction<BigtableOptions.Builder,
+        BigtableOptions.Builder> userConfigurator) {
+      return new SerializableFunction<BigtableOptions.Builder, 
BigtableOptions.Builder>() {
+        @Override
+        public BigtableOptions.Builder apply(BigtableOptions.Builder 
optionsBuilder) {
+          if (userConfigurator != null) {
+            optionsBuilder = userConfigurator.apply(optionsBuilder);
+          }
 
-    /** Returns the table being written to. */
-    @Nullable
-    abstract String getTableId();
+          return optionsBuilder
+            .setBulkOptions(
+              optionsBuilder.build().getBulkOptions().toBuilder()
+                .setUseBulkApi(true)
+                .build());
+        }
+      };
+    }
 
-    @Nullable
-    abstract BigtableService getBigtableService();
+    abstract BigtableConfig getBigtableConfig();
 
     /**
      * Returns the Google Cloud Bigtable instance being written to, and other 
parameters.
@@ -553,38 +447,26 @@ BigtableService getBigtableService(PipelineOptions 
pipelineOptions) {
      */
     @Deprecated
     @Nullable
-    public abstract BigtableOptions getBigtableOptions();
+    public BigtableOptions getBigtableOptions() {
+      return getBigtableConfig().getBigtableOptions();
+    }
 
-    /**
-     * Configurator of the effective Bigtable Options.
-     */
-    @Nullable
-    abstract SerializableFunction<BigtableOptions.Builder,
-      BigtableOptions.Builder> getBigtableOptionsConfigurator();
+    abstract Builder toBuilder();
 
-    abstract boolean getValidate();
+    static Write create() {
+      BigtableConfig config = BigtableConfig.builder()
+        .setTableId("")
+        .setValidate(true)
+        .setBigtableOptionsConfigurator(enableBulkApiConfigurator(null))
+        .build();
 
-    abstract Builder toBuilder();
+      return new 
AutoValue_BigtableIO_Write.Builder().setBigtableConfig(config).build();
+    }
 
     @AutoValue.Builder
     abstract static class Builder {
 
-      abstract Builder setProjectId(String projectId);
-
-      abstract Builder setInstanceId(String instanceId);
-
-      abstract Builder setTableId(String tableId);
-
-      /** @deprecated will be replaced by bigtable options configurator. */
-      @Deprecated
-      abstract Builder setBigtableOptions(BigtableOptions options);
-
-      abstract Builder setBigtableService(BigtableService bigtableService);
-
-      abstract Builder setValidate(boolean validate);
-
-      abstract Builder setBigtableOptionsConfigurator(
-        SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> 
optionsConfigurator);
+      abstract Builder setBigtableConfig(BigtableConfig bigtableConfig);
 
       abstract Write build();
     }
@@ -597,8 +479,8 @@ abstract Builder setBigtableOptionsConfigurator(
      * <p>Does not modify this object.
      */
     public Write withProjectId(String projectId) {
-      checkNotNull(projectId, "Project Id of BigTable can not be null");
-      return toBuilder().setProjectId(projectId).build();
+      BigtableConfig config = getBigtableConfig();
+      return 
toBuilder().setBigtableConfig(config.withProjectId(projectId)).build();
     }
 
     /**
@@ -609,8 +491,8 @@ public Write withProjectId(String projectId) {
      * <p>Does not modify this object.
      */
     public Write withInstanceId(String instanceId) {
-      checkNotNull(instanceId, "Instance Id of BigTable can not be null");
-      return toBuilder().setInstanceId(instanceId).build();
+      BigtableConfig config = getBigtableConfig();
+      return 
toBuilder().setBigtableConfig(config.withInstanceId(instanceId)).build();
     }
 
     /**
@@ -627,6 +509,7 @@ public Write withInstanceId(String instanceId) {
      */
     @Deprecated
     public Write withBigtableOptions(BigtableOptions options) {
+      checkArgument(options != null, "options can not be null");
       return withBigtableOptions(options.toBuilder());
     }
 
@@ -647,9 +530,11 @@ public Write withBigtableOptions(BigtableOptions options) {
      */
     @Deprecated
     public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
-      checkArgument(optionsBuilder != null, "optionsBuilder can not be null");
+      BigtableConfig config = getBigtableConfig();
       // TODO: is there a better way to clone a Builder? Want it to be immune 
from user changes.
-      return 
toBuilder().setBigtableOptions(optionsBuilder.build().toBuilder().build()).build();
+      return toBuilder()
+        
.setBigtableConfig(config.withBigtableOptions(optionsBuilder.build().toBuilder().build()))
+        .build();
     }
 
     /**
@@ -663,13 +548,17 @@ public Write withBigtableOptions(BigtableOptions.Builder 
optionsBuilder) {
      */
     public Write withBigtableOptionsConfigurator(
       SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> 
configurator) {
-      checkArgument(configurator != null, "configurator can not be null");
-      return toBuilder().setBigtableOptionsConfigurator(configurator).build();
+      BigtableConfig config = getBigtableConfig();
+      return toBuilder()
+        .setBigtableConfig(config.withBigtableOptionsConfigurator(
+          enableBulkApiConfigurator(configurator)))
+        .build();
     }
 
     /** Disables validation that the table being written to exists. */
     public Write withoutValidation() {
-      return toBuilder().setValidate(false).build();
+      BigtableConfig config = getBigtableConfig();
+      return toBuilder().setBigtableConfig(config.withValidate(false)).build();
     }
 
     /**
@@ -678,20 +567,32 @@ public Write withoutValidation() {
      * <p>Does not modify this object.
      */
     public Write withTableId(String tableId) {
-      checkArgument(tableId != null, "tableId can not be null");
-      return toBuilder().setTableId(tableId).build();
+      BigtableConfig config = getBigtableConfig();
+      return 
toBuilder().setBigtableConfig(config.withTableId(tableId)).build();
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Write} that will write using the given 
Cloud Bigtable
+     * service implementation.
+     *
+     * <p>This is used for testing.
+     *
+     * <p>Does not modify this object.
+     */
+    Write withBigtableService(BigtableService bigtableService) {
+      BigtableConfig config = getBigtableConfig();
+      return 
toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build();
     }
 
     @Override
     public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> input) 
{
-      validateBigtableConfig(getBigtableOptions(), getProjectId(), 
getInstanceId());
-      checkArgument(getTableId() != null && !getTableId().isEmpty(), 
"withTableId() is required");
+      getBigtableConfig().validate();
 
-      input.apply(ParDo.of(new BigtableWriterFn(getTableId(),
+      input.apply(ParDo.of(new 
BigtableWriterFn(getBigtableConfig().getTableId(),
           new SerializableFunction<PipelineOptions, BigtableService>() {
             @Override
             public BigtableService apply(PipelineOptions options) {
-              return getBigtableService(options);
+              return getBigtableConfig().getBigtableService(options);
             }
           })));
       return PDone.in(input.getPipeline());
@@ -699,127 +600,22 @@ public BigtableService apply(PipelineOptions options) {
 
     @Override
     public void validate(PipelineOptions options) {
-      if (getValidate()) {
-        try {
-          checkArgument(
-              getBigtableService(options).tableExists(getTableId()),
-              "Table %s does not exist",
-              getTableId());
-        } catch (IOException e) {
-          LOG.warn("Error checking whether table {} exists; proceeding.", 
getTableId(), e);
-        }
-      }
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Write} that will write using the given 
Cloud Bigtable
-     * service implementation.
-     *
-     * <p>This is used for testing.
-     *
-     * <p>Does not modify this object.
-     */
-    Write withBigtableService(BigtableService bigtableService) {
-      checkArgument(bigtableService != null, "bigtableService can not be 
null");
-      return toBuilder().setBigtableService(bigtableService).build();
+      validateTableExists(getBigtableConfig(), options);
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-
-      builder.add(DisplayData.item("tableId", getTableId())
-        .withLabel("Table ID"));
-
-      if (getBigtableOptions() != null) {
-        builder.add(DisplayData.item("bigtableOptions", 
getBigtableOptions().toString())
-          .withLabel("Bigtable Options"));
-      }
-
-      if (getProjectId() != null) {
-        builder.add(DisplayData.item("projectId", getProjectId())
-            .withLabel("Bigtable Project Id"));
-      }
-
-      if (getInstanceId() != null) {
-        builder.add(DisplayData.item("instanceId", getInstanceId())
-            .withLabel("Bigtable Instnace Id"));
-      }
-
-      builder.add(DisplayData.item("effectiveBigtableOptions",
-        effectiveUserProvidedBigtableOptions().build().toString())
-        .withLabel("Effective BigtableOptions resulted from configuration of 
given options"));
+      getBigtableConfig().populateDisplayData(builder);
     }
 
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(Write.class)
-          .add("options", getBigtableOptions())
-          .add("effectiveOptions", effectiveUserProvidedBigtableOptions())
-          .add("tableId", getTableId())
-          .add("projectId", getProjectId())
-          .add("instanceId", getInstanceId())
-          .add("bigtableOptionsConfigurator",
-          getBigtableOptionsConfigurator() == null ? null : 
getBigtableOptionsConfigurator()
-            .getClass().getName())
+          .add("config", getBigtableConfig())
           .toString();
     }
 
-    /**
-     * Helper function that either returns the mock Bigtable service supplied 
by
-     * {@link #withBigtableService} or creates and returns an implementation 
that talks to
-     * {@code Cloud Bigtable}.
-     *
-     * <p>Also populate the credentials option from {@link 
GcpOptions#getGcpCredential()} if the
-     * default credentials are being used on {@link BigtableOptions}.
-     */
-    @VisibleForTesting
-    BigtableService getBigtableService(PipelineOptions pipelineOptions) {
-      if (getBigtableService() != null) {
-        return getBigtableService();
-      }
-
-      BigtableOptions.Builder bigtableOptions = 
effectiveUserProvidedBigtableOptions();
-
-      bigtableOptions.setUserAgent(pipelineOptions.getUserAgent());
-
-      if (getBigtableOptions() != null && 
getBigtableOptions().getCredentialOptions()
-        .getCredentialType() == CredentialType.DefaultCredentials) {
-        bigtableOptions.setCredentialOptions(
-          CredentialOptions.credential(
-            pipelineOptions.as(GcpOptions.class).getGcpCredential()));
-      }
-
-      // Set useBulkApi to true for enabling bulk writes
-      bigtableOptions
-        .setUseCachedDataPool(true)
-        .setBulkOptions(
-          
effectiveUserProvidedBigtableOptions().build().getBulkOptions().toBuilder()
-            .setUseBulkApi(true)
-            .build());
-
-      return new BigtableServiceImpl(bigtableOptions.build());
-    }
-
-    private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() {
-      BigtableOptions.Builder effectiveOptions = getBigtableOptions() != null
-        ? getBigtableOptions().toBuilder()
-        : new BigtableOptions.Builder();
-
-      if (getBigtableOptionsConfigurator() != null) {
-        effectiveOptions = 
getBigtableOptionsConfigurator().apply(effectiveOptions);
-      }
-
-      if (getInstanceId() != null) {
-        effectiveOptions.setInstanceId(getInstanceId());
-      }
-      if (getProjectId() != null) {
-        effectiveOptions.setProjectId(getProjectId());
-      }
-
-      return effectiveOptions;
-    }
-
     private class BigtableWriterFn extends DoFn<KV<ByteString, 
Iterable<Mutation>>, Void> {
 
       public BigtableWriterFn(String tableId,
@@ -1321,15 +1117,17 @@ public BigtableWriteException(KV<ByteString, 
Iterable<Mutation>> record, Throwab
     }
   }
 
-  static void validateBigtableConfig(BigtableOptions options, String 
projectId, String instanceId) {
-    checkArgument(projectId != null && !projectId.isEmpty()
-            || options != null && options.getProjectId() != null
-            && !options.getProjectId().isEmpty(),
-        "Could not obtain Bigtable project id");
-
-    checkArgument(instanceId != null && !instanceId.isEmpty()
-            || options != null && options.getInstanceId() != null
-            && !options.getInstanceId().isEmpty(),
-        "Could not obtain Bigtable instance id");
+  static void validateTableExists(BigtableConfig config, PipelineOptions 
options) {
+    if (config.getValidate()) {
+      String tableId = config.getTableId();
+      try {
+        checkArgument(
+          config.getBigtableService(options).tableExists(tableId),
+          "Table %s does not exist",
+          tableId);
+      } catch (IOException e) {
+        LOG.warn("Error checking whether table {} exists; proceeding.", 
tableId, e);
+      }
+    }
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 418db92c4bf..87999034ac9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -114,11 +114,6 @@
   @Rule public ExpectedException thrown = ExpectedException.none();
   @Rule public ExpectedLogs logged = ExpectedLogs.none(BigtableIO.class);
 
-  /**
-   * These tests requires a static instance of the {@link FakeBigtableService} 
because the writers
-   * go through a serialization step when executing the test and would not 
affect passed-in objects
-   * otherwise.
-   */
   private static FakeBigtableService service;
   private static SerializableFunction<PipelineOptions, BigtableService> 
serviceFactory =
       new SerializableFunction<PipelineOptions, BigtableService>() {
@@ -171,10 +166,10 @@ public void testReadBuildsCorrectly() {
             .withBigtableOptionsConfigurator(PORT_CONFIGURATOR);
     assertEquals("options_project", read.getBigtableOptions().getProjectId());
     assertEquals("options_instance", 
read.getBigtableOptions().getInstanceId());
-    assertEquals("instance", read.getInstanceId());
-    assertEquals("project", read.getProjectId());
+    assertEquals("instance", read.getBigtableConfig().getInstanceId());
+    assertEquals("project", read.getBigtableConfig().getProjectId());
     assertEquals("table", read.getTableId());
-    assertEquals(PORT_CONFIGURATOR, read.getBigtableOptionsConfigurator());
+    assertEquals(PORT_CONFIGURATOR, 
read.getBigtableConfig().getBigtableOptionsConfigurator());
   }
 
   @Test
@@ -225,14 +220,12 @@ public void testWriteBuildsCorrectly() {
         BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS)
             .withTableId("table")
             .withInstanceId("instance")
-            .withProjectId("project")
-            .withBigtableOptionsConfigurator(PORT_CONFIGURATOR);
-    assertEquals("table", write.getTableId());
+            .withProjectId("project");
+    assertEquals("table", write.getBigtableConfig().getTableId());
     assertEquals("options_project", write.getBigtableOptions().getProjectId());
     assertEquals("options_instance", 
write.getBigtableOptions().getInstanceId());
-    assertEquals("instance", write.getInstanceId());
-    assertEquals("project", write.getProjectId());
-    assertEquals(PORT_CONFIGURATOR, write.getBigtableOptionsConfigurator());
+    assertEquals("instance", write.getBigtableConfig().getInstanceId());
+    assertEquals("project", write.getBigtableConfig().getProjectId());
   }
 
   @Test
@@ -305,10 +298,12 @@ public void 
testUsePipelineOptionsCredentialsIfNotSpecifiedInBigtableOptions() t
     BigtableService readService = BigtableIO.read()
         .withBigtableOptions(options)
         .withTableId("TEST-TABLE")
+        .getBigtableConfig()
         .getBigtableService(pipelineOptions);
     BigtableService writeService = BigtableIO.write()
         .withBigtableOptions(options)
         .withTableId("TEST-TABLE")
+        .getBigtableConfig()
         .getBigtableService(pipelineOptions);
     assertEquals(CredentialType.SuppliedCredentials,
         
readService.getBigtableOptions().getCredentialOptions().getCredentialType());
@@ -327,10 +322,12 @@ public void 
testDontUsePipelineOptionsCredentialsIfSpecifiedInBigtableOptions()
     BigtableService readService = BigtableIO.read()
         .withBigtableOptions(options)
         .withTableId("TEST-TABLE")
+        .getBigtableConfig()
         .getBigtableService(pipelineOptions);
     BigtableService writeService = BigtableIO.write()
         .withBigtableOptions(options)
         .withTableId("TEST-TABLE")
+        .getBigtableConfig()
         .getBigtableService(pipelineOptions);
     assertEquals(CredentialType.None,
         
readService.getBigtableOptions().getCredentialOptions().getCredentialType());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> BigtableIO should use ValueProviders 
> -------------------------------------
>
>                 Key: BEAM-3008
>                 URL: https://issues.apache.org/jira/browse/BEAM-3008
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-gcp
>            Reporter: Solomon Duskis
>            Assignee: Solomon Duskis
>
> [https://github.com/apache/beam/pull/2057] is an effort towards BigtableIO 
> templatization.  This Issue is a request to get a fully featured template for 
> BigtableIO.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to