igorbernstein2 commented on code in PR #24015:
URL: https://github.com/apache/beam/pull/24015#discussion_r1093413943


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableClientWrapper.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.byteString;
+import static org.apache.beam.sdk.io.gcp.bigtable.RowUtils.byteStringUtf8;
+
+import com.google.auth.Credentials;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.models.RowMutation;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Internal
+public class BigtableClientWrapper implements Serializable {
+  private final BigtableTableAdminClient tableAdminClient;
+  private final BigtableDataClient dataClient;
+
+  private final BigtableOptions bigtableOptions;
+
+  public BigtableClientWrapper(
+      String project,
+      String instanceId,
+      @Nullable Integer emulatorPort,
+      @Nullable Credentials gcpCredentials)
+      throws IOException {
+    BigtableOptions.Builder optionsBuilder =
+        BigtableOptions.builder()
+            .setProjectId(project)
+            .setInstanceId(instanceId)
+            .setUserAgent("apache-beam-test");
+    if (emulatorPort != null) {
+      optionsBuilder.enableEmulator("localhost", emulatorPort);
+    }
+    if (gcpCredentials != null) {
+      
optionsBuilder.setCredentialOptions(CredentialOptions.credential(gcpCredentials));
+    }
+    bigtableOptions = optionsBuilder.build();
+
+    BigtableHBaseVeneeringSettings settings =
+        BigtableHBaseVeneeringSettings.create(bigtableOptions);

Review Comment:
   Why do you need all of this? Can't you use the idiomatic settings directly?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java:
##########
@@ -115,6 +120,11 @@ BigtableConfig withTableId(ValueProvider<String> tableId) {
     return toBuilder().setTableId(tableId).build();
   }
 
+  BigtableConfig withAppProfileId(@Nullable ValueProvider<String> 
appProfileId) {
+    checkArgument(appProfileId != null, "App profile id can not be null");

Review Comment:
   Should this be checkNotNull



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableHBaseVeneeringSettings.java:
##########
@@ -0,0 +1,758 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import com.google.api.core.ApiFunction;
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.batching.FlowControlSettings;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
+import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.api.gax.rpc.ServerStreamingCallSettings;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.api.gax.rpc.StubSettings;
+import com.google.api.gax.rpc.UnaryCallSettings;
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
+import com.google.cloud.bigtable.Version;
+import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminSettings;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBulkReadRowsCallSettings;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.Deadline;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.grpc.internal.GrpcUtil;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.initialization.qual.UnderInitialization;
+import org.threeten.bp.Duration;
+
+/** Helper class to translate {@link BigtableOptions} to Bigtable Veneer 
settings. */
+class BigtableHBaseVeneeringSettings {
+  private static final String DEFAULT_BIGTABLE_BATCH_DATA_ENDPOINT =
+      "batch-bigtable.googleapis.com:443";
+  private static final Duration DEFAULT_UNARY_ATTEMPT_TIMEOUTS = 
Duration.ofSeconds(20);
+  private static final Duration DEFAULT_BULK_MUTATE_ATTEMPT_TIMEOUTS = 
Duration.ofMinutes(6);
+
+  private final BigtableDataSettings dataSettings;
+  private final BigtableTableAdminSettings tableAdminSettings;
+  private final BigtableInstanceAdminSettings instanceAdminSettings;
+
+  private final BigtableIOOperationTimeouts clientTimeouts;
+
+  static BigtableHBaseVeneeringSettings create(@Nonnull BigtableOptions 
options)
+      throws IOException {
+    return new BigtableHBaseVeneeringSettings(options);
+  }
+
+  private BigtableHBaseVeneeringSettings(@Nonnull BigtableOptions options) 
throws IOException {
+    // Build configs for veneer
+    this.clientTimeouts = buildCallSettings(options);
+
+    this.dataSettings = buildBigtableDataSettings(clientTimeouts, options);
+    this.tableAdminSettings = buildBigtableTableAdminSettings(options);
+    this.instanceAdminSettings = buildBigtableInstanceAdminSettings(options);
+  }
+
+  // ************** Getters **************
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableDataSettings}. */
+  BigtableDataSettings getDataSettings() {
+    return dataSettings;
+  }
+
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableTableAdminSettings}. */
+  BigtableTableAdminSettings getTableAdminSettings() {
+    return tableAdminSettings;
+  }
+
+  BigtableIOOperationTimeouts getOperationTimeouts() {
+    return clientTimeouts;
+  }
+
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableInstanceAdminSettings}. */
+  BigtableInstanceAdminSettings getInstanceAdminSettings() {
+    return instanceAdminSettings;
+  }
+
+  // ************** Private Helpers **************
+  private BigtableDataSettings buildBigtableDataSettings(
+      @UnderInitialization BigtableHBaseVeneeringSettings this,
+      BigtableIOOperationTimeouts clientTimeouts,
+      BigtableOptions options)
+      throws IOException {
+    BigtableDataSettings.Builder dataBuilder;
+
+    // Configure the Data connection
+    dataBuilder = BigtableDataSettings.newBuilder();
+    if (options.useBatch()) {
+      configureConnection(
+          dataBuilder.stubSettings(), DEFAULT_BIGTABLE_BATCH_DATA_ENDPOINT, 
options);
+    } else {
+      configureConnection(
+          dataBuilder.stubSettings(), options.getDataHost() + ":" + 
options.getPort(), options);
+    }
+    configureCredentialProvider(dataBuilder.stubSettings(), options);
+    configureHeaderProvider(dataBuilder.stubSettings(), options);
+
+    // Configure the target
+    
dataBuilder.setProjectId(options.getProjectId()).setInstanceId(options.getInstanceId());
+    if (options.getAppProfileId() != null) {
+      dataBuilder.setAppProfileId(options.getAppProfileId());
+    }
+
+    // Configure RPCs - this happens in multiple parts:
+    // - retry settings are configured here
+    // - timeouts are split into multiple places:
+    //   - timeouts for retries are configured here
+    //   - if USE_TIMEOUTS is explicitly disabled, then an interceptor is 
added to force all
+    // deadlines to 6 minutes
+    configureConnectionCallTimeouts(dataBuilder.stubSettings(), 
clientTimeouts);
+
+    // Complex RPC method settings
+    configureBulkMutationSettings(
+        dataBuilder.stubSettings().bulkMutateRowsSettings(),
+        clientTimeouts.getBulkMutateTimeouts(),
+        options);
+    configureBulkReadRowsSettings(
+        dataBuilder.stubSettings().bulkReadRowsSettings(),
+        clientTimeouts.getBulkReadRowsTimeouts(),
+        options);
+    configureReadRowsSettings(
+        dataBuilder.stubSettings().readRowsSettings(),
+        clientTimeouts.getBulkReadRowsTimeouts(),
+        options);
+
+    // RPC methods - simple
+    configureNonRetryableCallSettings(
+        dataBuilder.stubSettings().checkAndMutateRowSettings(), 
clientTimeouts.getUnaryTimeouts());
+    configureNonRetryableCallSettings(
+        dataBuilder.stubSettings().readModifyWriteRowSettings(), 
clientTimeouts.getUnaryTimeouts());
+
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().mutateRowSettings(), 
clientTimeouts.getUnaryTimeouts(), options);
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().readRowSettings(), 
clientTimeouts.getUnaryTimeouts(), options);
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().sampleRowKeysSettings(),
+        clientTimeouts.getUnaryTimeouts(),
+        options);
+
+    return dataBuilder.build();
+  }
+
+  private BigtableTableAdminSettings buildBigtableTableAdminSettings(
+      @UnderInitialization BigtableHBaseVeneeringSettings this, 
BigtableOptions options)
+      throws IOException {
+    BigtableTableAdminSettings.Builder adminBuilder;
+
+    // Configure connection
+    adminBuilder = BigtableTableAdminSettings.newBuilder();
+    configureConnection(
+        adminBuilder.stubSettings(), options.getAdminHost() + ":" + 
options.getPort(), options);
+    configureCredentialProvider(adminBuilder.stubSettings(), options);
+
+    configureHeaderProvider(adminBuilder.stubSettings(), options);
+
+    
adminBuilder.setProjectId(options.getProjectId()).setInstanceId(options.getInstanceId());
+
+    // timeout/retry settings don't apply to admin operations
+    // v1 used to use RetryOptions for:
+    // - createTable
+    // - getTable
+    // - listTables
+    // - deleteTable
+    // - modifyColumnFamilies
+    // - dropRowRange
+    // However data latencies are very different from data latencies and end 
users shouldn't need to
+    // change the defaults
+    // if it turns out that the timeout & retry behavior needs to be 
configurable, we will expose
+    // separate settings
+
+    return adminBuilder.build();
+  }
+
+  private BigtableInstanceAdminSettings buildBigtableInstanceAdminSettings(
+      @UnderInitialization BigtableHBaseVeneeringSettings this, 
BigtableOptions options)
+      throws IOException {
+    BigtableInstanceAdminSettings.Builder adminBuilder;
+
+    // Configure connection
+    adminBuilder = BigtableInstanceAdminSettings.newBuilder();
+    configureConnection(
+        adminBuilder.stubSettings(), options.getAdminHost() + ":" + 
options.getPort(), options);
+    configureCredentialProvider(adminBuilder.stubSettings(), options);
+
+    configureHeaderProvider(adminBuilder.stubSettings(), options);
+
+    adminBuilder.setProjectId(options.getProjectId());
+
+    return adminBuilder.build();
+  }

Review Comment:
   I dont think we care about the instance admin api?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java:
##########
@@ -196,66 +222,123 @@ BigtableService getBigtableService(PipelineOptions 
pipelineOptions) {
       return getBigtableService();
     }
 
-    BigtableOptions.Builder bigtableOptions = 
effectiveUserProvidedBigtableOptions();
+    BigtableConfig.Builder config = toBuilder();
 
-    bigtableOptions.setUserAgent(pipelineOptions.getUserAgent());
+    if (pipelineOptions instanceof GcpOptions) {
+      config.setCredentials(((GcpOptions) pipelineOptions).getGcpCredential());
+    }
 
-    if (bigtableOptions.build().getCredentialOptions().getCredentialType()
-        == CredentialOptions.CredentialType.DefaultCredentials) {
-      bigtableOptions.setCredentialOptions(
-          
CredentialOptions.credential(pipelineOptions.as(GcpOptions.class).getGcpCredential()));
+    try {
+      translateBigtableOptions(config);
+    } catch (IOException e) {
+      throw new RuntimeException(e);

Review Comment:
   Consider throwing a more specific error



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableHBaseVeneeringSettings.java:
##########
@@ -0,0 +1,758 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import com.google.api.core.ApiFunction;
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.batching.FlowControlSettings;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
+import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.api.gax.rpc.ServerStreamingCallSettings;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.api.gax.rpc.StubSettings;
+import com.google.api.gax.rpc.UnaryCallSettings;
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
+import com.google.cloud.bigtable.Version;
+import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminSettings;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBulkReadRowsCallSettings;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.Deadline;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.grpc.internal.GrpcUtil;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.initialization.qual.UnderInitialization;
+import org.threeten.bp.Duration;
+
+/** Helper class to translate {@link BigtableOptions} to Bigtable Veneer 
settings. */
+class BigtableHBaseVeneeringSettings {
+  private static final String DEFAULT_BIGTABLE_BATCH_DATA_ENDPOINT =
+      "batch-bigtable.googleapis.com:443";
+  private static final Duration DEFAULT_UNARY_ATTEMPT_TIMEOUTS = 
Duration.ofSeconds(20);
+  private static final Duration DEFAULT_BULK_MUTATE_ATTEMPT_TIMEOUTS = 
Duration.ofMinutes(6);
+
+  private final BigtableDataSettings dataSettings;
+  private final BigtableTableAdminSettings tableAdminSettings;
+  private final BigtableInstanceAdminSettings instanceAdminSettings;
+
+  private final BigtableIOOperationTimeouts clientTimeouts;
+
+  static BigtableHBaseVeneeringSettings create(@Nonnull BigtableOptions 
options)
+      throws IOException {
+    return new BigtableHBaseVeneeringSettings(options);
+  }
+
+  private BigtableHBaseVeneeringSettings(@Nonnull BigtableOptions options) 
throws IOException {
+    // Build configs for veneer
+    this.clientTimeouts = buildCallSettings(options);
+
+    this.dataSettings = buildBigtableDataSettings(clientTimeouts, options);
+    this.tableAdminSettings = buildBigtableTableAdminSettings(options);
+    this.instanceAdminSettings = buildBigtableInstanceAdminSettings(options);
+  }
+
+  // ************** Getters **************
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableDataSettings}. */
+  BigtableDataSettings getDataSettings() {
+    return dataSettings;
+  }
+
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableTableAdminSettings}. */
+  BigtableTableAdminSettings getTableAdminSettings() {
+    return tableAdminSettings;
+  }
+
+  BigtableIOOperationTimeouts getOperationTimeouts() {
+    return clientTimeouts;
+  }
+
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableInstanceAdminSettings}. */
+  BigtableInstanceAdminSettings getInstanceAdminSettings() {
+    return instanceAdminSettings;
+  }
+
+  // ************** Private Helpers **************
+  private BigtableDataSettings buildBigtableDataSettings(
+      @UnderInitialization BigtableHBaseVeneeringSettings this,
+      BigtableIOOperationTimeouts clientTimeouts,
+      BigtableOptions options)
+      throws IOException {
+    BigtableDataSettings.Builder dataBuilder;
+
+    // Configure the Data connection
+    dataBuilder = BigtableDataSettings.newBuilder();
+    if (options.useBatch()) {
+      configureConnection(
+          dataBuilder.stubSettings(), DEFAULT_BIGTABLE_BATCH_DATA_ENDPOINT, 
options);
+    } else {
+      configureConnection(
+          dataBuilder.stubSettings(), options.getDataHost() + ":" + 
options.getPort(), options);
+    }
+    configureCredentialProvider(dataBuilder.stubSettings(), options);
+    configureHeaderProvider(dataBuilder.stubSettings(), options);
+
+    // Configure the target
+    
dataBuilder.setProjectId(options.getProjectId()).setInstanceId(options.getInstanceId());
+    if (options.getAppProfileId() != null) {
+      dataBuilder.setAppProfileId(options.getAppProfileId());
+    }
+
+    // Configure RPCs - this happens in multiple parts:
+    // - retry settings are configured here
+    // - timeouts are split into multiple places:
+    //   - timeouts for retries are configured here
+    //   - if USE_TIMEOUTS is explicitly disabled, then an interceptor is 
added to force all
+    // deadlines to 6 minutes
+    configureConnectionCallTimeouts(dataBuilder.stubSettings(), 
clientTimeouts);
+
+    // Complex RPC method settings
+    configureBulkMutationSettings(
+        dataBuilder.stubSettings().bulkMutateRowsSettings(),
+        clientTimeouts.getBulkMutateTimeouts(),
+        options);
+    configureBulkReadRowsSettings(
+        dataBuilder.stubSettings().bulkReadRowsSettings(),
+        clientTimeouts.getBulkReadRowsTimeouts(),
+        options);
+    configureReadRowsSettings(
+        dataBuilder.stubSettings().readRowsSettings(),
+        clientTimeouts.getBulkReadRowsTimeouts(),
+        options);
+
+    // RPC methods - simple
+    configureNonRetryableCallSettings(
+        dataBuilder.stubSettings().checkAndMutateRowSettings(), 
clientTimeouts.getUnaryTimeouts());
+    configureNonRetryableCallSettings(
+        dataBuilder.stubSettings().readModifyWriteRowSettings(), 
clientTimeouts.getUnaryTimeouts());
+
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().mutateRowSettings(), 
clientTimeouts.getUnaryTimeouts(), options);
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().readRowSettings(), 
clientTimeouts.getUnaryTimeouts(), options);
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().sampleRowKeysSettings(),
+        clientTimeouts.getUnaryTimeouts(),
+        options);
+
+    return dataBuilder.build();
+  }
+
+  private BigtableTableAdminSettings buildBigtableTableAdminSettings(
+      @UnderInitialization BigtableHBaseVeneeringSettings this, 
BigtableOptions options)
+      throws IOException {
+    BigtableTableAdminSettings.Builder adminBuilder;
+
+    // Configure connection
+    adminBuilder = BigtableTableAdminSettings.newBuilder();
+    configureConnection(
+        adminBuilder.stubSettings(), options.getAdminHost() + ":" + 
options.getPort(), options);
+    configureCredentialProvider(adminBuilder.stubSettings(), options);
+
+    configureHeaderProvider(adminBuilder.stubSettings(), options);
+
+    
adminBuilder.setProjectId(options.getProjectId()).setInstanceId(options.getInstanceId());
+
+    // timeout/retry settings don't apply to admin operations
+    // v1 used to use RetryOptions for:
+    // - createTable
+    // - getTable
+    // - listTables
+    // - deleteTable
+    // - modifyColumnFamilies
+    // - dropRowRange
+    // However data latencies are very different from data latencies and end 
users shouldn't need to
+    // change the defaults
+    // if it turns out that the timeout & retry behavior needs to be 
configurable, we will expose
+    // separate settings
+
+    return adminBuilder.build();
+  }
+
+  private BigtableInstanceAdminSettings buildBigtableInstanceAdminSettings(
+      @UnderInitialization BigtableHBaseVeneeringSettings this, 
BigtableOptions options)
+      throws IOException {
+    BigtableInstanceAdminSettings.Builder adminBuilder;
+
+    // Configure connection
+    adminBuilder = BigtableInstanceAdminSettings.newBuilder();
+    configureConnection(
+        adminBuilder.stubSettings(), options.getAdminHost() + ":" + 
options.getPort(), options);
+    configureCredentialProvider(adminBuilder.stubSettings(), options);
+
+    configureHeaderProvider(adminBuilder.stubSettings(), options);
+
+    adminBuilder.setProjectId(options.getProjectId());
+
+    return adminBuilder.build();
+  }
+
+  @SuppressWarnings("rawtypes")
+  private void configureConnection(
+      @UnderInitialization BigtableHBaseVeneeringSettings this,
+      StubSettings.Builder<?, ?> stubSettings,
+      String endpoint,
+      BigtableOptions options) {
+    final InstantiatingGrpcChannelProvider.Builder channelProvider =
+        ((InstantiatingGrpcChannelProvider) 
stubSettings.getTransportChannelProvider()).toBuilder();
+
+    stubSettings.setEndpoint(endpoint);
+
+    if (options.usePlaintextNegotiation()) {
+      // Make sure to avoid clobbering the old Configurator
+      @SuppressWarnings("rawtypes")
+      final ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> 
prevConfigurator =
+          channelProvider.getChannelConfigurator();
+      //noinspection rawtypes
+      channelProvider.setChannelConfigurator(
+          new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
+            @Override
+            public ManagedChannelBuilder apply(ManagedChannelBuilder 
channelBuilder) {
+              if (prevConfigurator != null) {
+                channelBuilder = prevConfigurator.apply(channelBuilder);
+              }
+              return channelBuilder.usePlaintext();
+            }
+          });
+    }
+
+    channelProvider.setPoolSize(options.getChannelCount());
+
+    stubSettings.setTransportChannelProvider(channelProvider.build());
+  }
+
+  private void configureHeaderProvider(
+      @UnderInitialization BigtableHBaseVeneeringSettings this,
+      StubSettings.Builder<?, ?> stubSettings,
+      BigtableOptions options) {
+
+    ImmutableMap.Builder<String, String> headersBuilder = 
ImmutableMap.<String, String>builder();
+    List<String> userAgentParts = Lists.newArrayList();
+    userAgentParts.add("bigtable-" + Version.VERSION);
+    userAgentParts.add("jdk-" + 
System.getProperty("java.specification.version"));
+
+    String customUserAgent = options.getUserAgent();
+    if (customUserAgent != null) {
+      userAgentParts.add(customUserAgent);
+    }
+
+    String userAgent = Joiner.on(",").join(userAgentParts);
+    headersBuilder.put(GrpcUtil.USER_AGENT_KEY.name(), userAgent);
+
+    String tracingCookie = options.getTracingCookie();
+    if (tracingCookie != null) {
+      headersBuilder.put("cookie", tracingCookie);
+    }

Review Comment:
   we can probably drop this



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java:
##########
@@ -196,66 +222,123 @@ BigtableService getBigtableService(PipelineOptions 
pipelineOptions) {
       return getBigtableService();
     }
 
-    BigtableOptions.Builder bigtableOptions = 
effectiveUserProvidedBigtableOptions();
+    BigtableConfig.Builder config = toBuilder();
 
-    bigtableOptions.setUserAgent(pipelineOptions.getUserAgent());
+    if (pipelineOptions instanceof GcpOptions) {
+      config.setCredentials(((GcpOptions) pipelineOptions).getGcpCredential());
+    }
 
-    if (bigtableOptions.build().getCredentialOptions().getCredentialType()
-        == CredentialOptions.CredentialType.DefaultCredentials) {
-      bigtableOptions.setCredentialOptions(
-          
CredentialOptions.credential(pipelineOptions.as(GcpOptions.class).getGcpCredential()));
+    try {
+      translateBigtableOptions(config);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
 
-    return new BigtableServiceImpl(bigtableOptions.build());
+    config.setUserAgent(pipelineOptions.getUserAgent());
+
+    return new BigtableServiceImpl(config.build());
   }
 
   boolean isDataAccessible() {
-    return getTableId().isAccessible()
-        && (getProjectId() == null || getProjectId().isAccessible())
+    return (getProjectId() == null || getProjectId().isAccessible())
         && (getInstanceId() == null || getInstanceId().isAccessible());
   }
 
-  private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() {
-    BigtableOptions.Builder effectiveOptions =
-        getBigtableOptions() != null
-            ? getBigtableOptions().toBuilder()
-            : new BigtableOptions.Builder();
+  private void translateBigtableOptions(BigtableConfig.Builder builder) throws 
IOException {
+    BigtableOptions.Builder effectiveOptionsBuilder = null;
+
+    if (getBigtableOptions() != null) {
+      effectiveOptionsBuilder = getBigtableOptions().toBuilder();
+    }
 
     if (getBigtableOptionsConfigurator() != null) {
-      effectiveOptions = 
getBigtableOptionsConfigurator().apply(effectiveOptions);
+      effectiveOptionsBuilder = 
getBigtableOptionsConfigurator().apply(BigtableOptions.builder());
     }
 
-    // Default option that should be forced in most cases
-    effectiveOptions.setUseCachedDataPool(true);
+    if (effectiveOptionsBuilder == null) {
+      return;
+    }
+
+    BigtableOptions effectiveOptions = effectiveOptionsBuilder.build();
 
-    if (getInstanceId() != null) {
-      effectiveOptions.setInstanceId(getInstanceId().get());
+    // Todo decided if we should implement cached channel pool
+
+    if (effectiveOptions.getInstanceId() != null && getInstanceId() == null) {
+      
builder.setInstanceId(ValueProvider.StaticValueProvider.of(effectiveOptions.getInstanceId()));
     }
 
-    if (getProjectId() != null) {
-      effectiveOptions.setProjectId(getProjectId().get());
+    if (effectiveOptions.getProjectId() != null && getProjectId() == null) {
+      
builder.setProjectId(ValueProvider.StaticValueProvider.of(effectiveOptions.getProjectId()));
     }
 
-    if (getEmulatorHost() != null) {
-      effectiveOptions.enableEmulator(getEmulatorHost());
-      effectiveOptions.setUseCachedDataPool(false);
+    if (!effectiveOptions.getDataHost().equals("bigtable.googleapis.com")
+        && getEmulatorHost() == null) {
+      builder.setEmulatorHost(
+          String.format("%s:%s", effectiveOptions.getDataHost(), 
effectiveOptions.getPort()));
     }
 
-    return effectiveOptions;
+    if (effectiveOptions.getCredentialOptions() != null) {
+      CredentialOptions credOptions = effectiveOptions.getCredentialOptions();
+      switch (credOptions.getCredentialType()) {
+        case DefaultCredentials:
+          GoogleCredentials credentials = 
GoogleCredentials.getApplicationDefault();
+          builder.setCredentials(credentials);
+          break;
+        case P12:
+          String keyFile = ((CredentialOptions.P12CredentialOptions) 
credOptions).getKeyFile();
+          String serviceAccount =
+              ((CredentialOptions.P12CredentialOptions) 
credOptions).getServiceAccount();
+          try {
+            KeyStore keyStore = KeyStore.getInstance("PKCS12");
+
+            try (FileInputStream fin = new FileInputStream(keyFile)) {
+              keyStore.load(fin, "notasecret".toCharArray());
+            }
+            PrivateKey privateKey =
+                (PrivateKey) keyStore.getKey("privatekey", 
"notasecret".toCharArray());
+
+            if (privateKey == null) {
+              throw new IllegalStateException("private key cannot be null");
+            }
+            builder.setCredentials(
+                ServiceAccountJwtAccessCredentials.newBuilder()
+                    .setClientEmail(serviceAccount)
+                    .setPrivateKey(privateKey)
+                    .build());
+          } catch (GeneralSecurityException exception) {
+            throw new RuntimeException("exception while retrieving 
credentials", exception);
+          }
+          break;
+        case SuppliedCredentials:
+          builder.setCredentials(
+              ((CredentialOptions.UserSuppliedCredentialOptions) 
credOptions).getCredential());
+          break;
+        case SuppliedJson:
+          CredentialOptions.JsonCredentialsOptions jsonCredentialsOptions =
+              (CredentialOptions.JsonCredentialsOptions) credOptions;
+          synchronized (jsonCredentialsOptions) {

Review Comment:
   I think we can drop cred caching for BigtableOtions. If anything we should 
add it at a higher level



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java:
##########
@@ -39,14 +48,22 @@
 })
 abstract class BigtableConfig implements Serializable {
 
+  enum CredentialType {
+    DEFAULT,
+    P12,
+    SUPPLIED,
+    JSON,
+    NONE
+  }

Review Comment:
   is this stale?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -992,25 +1248,31 @@ public String toString() {
     private final BigtableReadOptions readOptions;
     private @Nullable Long estimatedSizeBytes;
 
+    private final BigtableServiceFactory.ConfigId configId;
+
     /** Creates a new {@link BigtableSource} with just one {@link 
ByteKeyRange}. */
     protected BigtableSource withSingleRange(ByteKeyRange range) {
       checkArgument(range != null, "range can not be null");
-      return new BigtableSource(config, readOptions.withKeyRange(range), 
estimatedSizeBytes);
+      return new BigtableSource(
+          configId, config, readOptions.withKeyRange(range), 
estimatedSizeBytes);
     }
 
     protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {
       checkArgument(estimatedSizeBytes != null, "estimatedSizeBytes can not be 
null");
-      return new BigtableSource(config, readOptions, estimatedSizeBytes);
+      return new BigtableSource(configId, config, readOptions, 
estimatedSizeBytes);
     }
 
     /**
      * Makes an API call to the Cloud Bigtable service that gives information 
about tablet key
      * boundaries and estimated sizes. We can use these samples to ensure that 
splits are on
      * different tablets, and possibly generate sub-splits within tablets.
      */
-    private List<SampleRowKeysResponse> getSampleRowKeys(PipelineOptions 
pipelineOptions)
-        throws IOException {
-      return config.getBigtableService(pipelineOptions).getSampleRowKeys(this);
+    private List<KeyOffset> getSampleRowKeys(PipelineOptions pipelineOptions) 
throws IOException {
+      BigtableServiceFactory.BigtableServiceEntry serviceEntry =
+          FACTORY_INSTANCE.getServiceForReading(configId, config, readOptions, 
pipelineOptions);
+      List<KeyOffset> keyOffsets = 
serviceEntry.getService().getSampleRowKeys(this);
+      FACTORY_INSTANCE.releaseReadService(serviceEntry);
+      return keyOffsets;
     }

Review Comment:
   consider making BigtableServiceEntry AutoCloseable and use try-with-resources



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java:
##########
@@ -115,6 +120,11 @@ BigtableConfig withTableId(ValueProvider<String> tableId) {
     return toBuilder().setTableId(tableId).build();
   }
 
+  BigtableConfig withAppProfileId(@Nullable ValueProvider<String> 
appProfileId) {

Review Comment:
   If you have a non-null check, then this cant be Nullable



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -697,6 +813,17 @@ public Write withTableId(String tableId) {
       return withTableId(StaticValueProvider.of(tableId));
     }
 
+    /**
+     * Returns a new {@link BigtableIO.Write} with the provided credentials.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withCredentialsProvider(CredentialsProvider 
credentialsProvider) {

Review Comment:
   It looks like beam has its own abstraction for Crendential provider...take a 
look at org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory. Take a look 
at GcpOptions and how other services use getGcpCredentials there



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableHBaseVeneeringSettings.java:
##########
@@ -0,0 +1,758 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import com.google.api.core.ApiFunction;
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.batching.FlowControlSettings;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
+import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.api.gax.rpc.ServerStreamingCallSettings;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.api.gax.rpc.StubSettings;
+import com.google.api.gax.rpc.UnaryCallSettings;
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
+import com.google.cloud.bigtable.Version;
+import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminSettings;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBulkReadRowsCallSettings;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.Deadline;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.grpc.internal.GrpcUtil;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.initialization.qual.UnderInitialization;
+import org.threeten.bp.Duration;
+
+/** Helper class to translate {@link BigtableOptions} to Bigtable Veneer 
settings. */
+class BigtableHBaseVeneeringSettings {
+  private static final String DEFAULT_BIGTABLE_BATCH_DATA_ENDPOINT =
+      "batch-bigtable.googleapis.com:443";
+  private static final Duration DEFAULT_UNARY_ATTEMPT_TIMEOUTS = 
Duration.ofSeconds(20);
+  private static final Duration DEFAULT_BULK_MUTATE_ATTEMPT_TIMEOUTS = 
Duration.ofMinutes(6);
+
+  private final BigtableDataSettings dataSettings;
+  private final BigtableTableAdminSettings tableAdminSettings;
+  private final BigtableInstanceAdminSettings instanceAdminSettings;
+
+  private final BigtableIOOperationTimeouts clientTimeouts;
+
+  static BigtableHBaseVeneeringSettings create(@Nonnull BigtableOptions 
options)
+      throws IOException {
+    return new BigtableHBaseVeneeringSettings(options);
+  }
+
+  private BigtableHBaseVeneeringSettings(@Nonnull BigtableOptions options) 
throws IOException {
+    // Build configs for veneer
+    this.clientTimeouts = buildCallSettings(options);
+
+    this.dataSettings = buildBigtableDataSettings(clientTimeouts, options);
+    this.tableAdminSettings = buildBigtableTableAdminSettings(options);
+    this.instanceAdminSettings = buildBigtableInstanceAdminSettings(options);
+  }
+
+  // ************** Getters **************
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableDataSettings}. */
+  BigtableDataSettings getDataSettings() {
+    return dataSettings;
+  }
+
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableTableAdminSettings}. */
+  BigtableTableAdminSettings getTableAdminSettings() {
+    return tableAdminSettings;
+  }
+
+  BigtableIOOperationTimeouts getOperationTimeouts() {
+    return clientTimeouts;
+  }
+
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableInstanceAdminSettings}. */
+  BigtableInstanceAdminSettings getInstanceAdminSettings() {
+    return instanceAdminSettings;
+  }
+
+  // ************** Private Helpers **************
+  private BigtableDataSettings buildBigtableDataSettings(
+      @UnderInitialization BigtableHBaseVeneeringSettings this,
+      BigtableIOOperationTimeouts clientTimeouts,
+      BigtableOptions options)
+      throws IOException {
+    BigtableDataSettings.Builder dataBuilder;
+
+    // Configure the Data connection
+    dataBuilder = BigtableDataSettings.newBuilder();
+    if (options.useBatch()) {
+      configureConnection(
+          dataBuilder.stubSettings(), DEFAULT_BIGTABLE_BATCH_DATA_ENDPOINT, 
options);
+    } else {
+      configureConnection(
+          dataBuilder.stubSettings(), options.getDataHost() + ":" + 
options.getPort(), options);
+    }
+    configureCredentialProvider(dataBuilder.stubSettings(), options);
+    configureHeaderProvider(dataBuilder.stubSettings(), options);
+
+    // Configure the target
+    
dataBuilder.setProjectId(options.getProjectId()).setInstanceId(options.getInstanceId());
+    if (options.getAppProfileId() != null) {
+      dataBuilder.setAppProfileId(options.getAppProfileId());
+    }
+
+    // Configure RPCs - this happens in multiple parts:
+    // - retry settings are configured here
+    // - timeouts are split into multiple places:
+    //   - timeouts for retries are configured here
+    //   - if USE_TIMEOUTS is explicitly disabled, then an interceptor is 
added to force all
+    // deadlines to 6 minutes
+    configureConnectionCallTimeouts(dataBuilder.stubSettings(), 
clientTimeouts);
+
+    // Complex RPC method settings
+    configureBulkMutationSettings(
+        dataBuilder.stubSettings().bulkMutateRowsSettings(),
+        clientTimeouts.getBulkMutateTimeouts(),
+        options);
+    configureBulkReadRowsSettings(
+        dataBuilder.stubSettings().bulkReadRowsSettings(),
+        clientTimeouts.getBulkReadRowsTimeouts(),
+        options);
+    configureReadRowsSettings(
+        dataBuilder.stubSettings().readRowsSettings(),
+        clientTimeouts.getBulkReadRowsTimeouts(),
+        options);
+
+    // RPC methods - simple
+    configureNonRetryableCallSettings(
+        dataBuilder.stubSettings().checkAndMutateRowSettings(), 
clientTimeouts.getUnaryTimeouts());
+    configureNonRetryableCallSettings(
+        dataBuilder.stubSettings().readModifyWriteRowSettings(), 
clientTimeouts.getUnaryTimeouts());
+
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().mutateRowSettings(), 
clientTimeouts.getUnaryTimeouts(), options);
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().readRowSettings(), 
clientTimeouts.getUnaryTimeouts(), options);
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().sampleRowKeysSettings(),
+        clientTimeouts.getUnaryTimeouts(),
+        options);
+
+    return dataBuilder.build();
+  }
+
+  private BigtableTableAdminSettings buildBigtableTableAdminSettings(
+      @UnderInitialization BigtableHBaseVeneeringSettings this, 
BigtableOptions options)
+      throws IOException {
+    BigtableTableAdminSettings.Builder adminBuilder;
+
+    // Configure connection
+    adminBuilder = BigtableTableAdminSettings.newBuilder();
+    configureConnection(
+        adminBuilder.stubSettings(), options.getAdminHost() + ":" + 
options.getPort(), options);
+    configureCredentialProvider(adminBuilder.stubSettings(), options);
+
+    configureHeaderProvider(adminBuilder.stubSettings(), options);
+
+    
adminBuilder.setProjectId(options.getProjectId()).setInstanceId(options.getInstanceId());
+
+    // timeout/retry settings don't apply to admin operations
+    // v1 used to use RetryOptions for:
+    // - createTable
+    // - getTable
+    // - listTables
+    // - deleteTable
+    // - modifyColumnFamilies
+    // - dropRowRange
+    // However data latencies are very different from data latencies and end 
users shouldn't need to
+    // change the defaults
+    // if it turns out that the timeout & retry behavior needs to be 
configurable, we will expose
+    // separate settings
+
+    return adminBuilder.build();
+  }
+
+  private BigtableInstanceAdminSettings buildBigtableInstanceAdminSettings(
+      @UnderInitialization BigtableHBaseVeneeringSettings this, 
BigtableOptions options)
+      throws IOException {
+    BigtableInstanceAdminSettings.Builder adminBuilder;
+
+    // Configure connection
+    adminBuilder = BigtableInstanceAdminSettings.newBuilder();
+    configureConnection(
+        adminBuilder.stubSettings(), options.getAdminHost() + ":" + 
options.getPort(), options);
+    configureCredentialProvider(adminBuilder.stubSettings(), options);
+
+    configureHeaderProvider(adminBuilder.stubSettings(), options);
+
+    adminBuilder.setProjectId(options.getProjectId());
+
+    return adminBuilder.build();
+  }
+
+  @SuppressWarnings("rawtypes")
+  private void configureConnection(
+      @UnderInitialization BigtableHBaseVeneeringSettings this,
+      StubSettings.Builder<?, ?> stubSettings,
+      String endpoint,
+      BigtableOptions options) {
+    final InstantiatingGrpcChannelProvider.Builder channelProvider =
+        ((InstantiatingGrpcChannelProvider) 
stubSettings.getTransportChannelProvider()).toBuilder();
+
+    stubSettings.setEndpoint(endpoint);
+
+    if (options.usePlaintextNegotiation()) {
+      // Make sure to avoid clobbering the old Configurator
+      @SuppressWarnings("rawtypes")
+      final ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> 
prevConfigurator =
+          channelProvider.getChannelConfigurator();
+      //noinspection rawtypes
+      channelProvider.setChannelConfigurator(
+          new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
+            @Override
+            public ManagedChannelBuilder apply(ManagedChannelBuilder 
channelBuilder) {
+              if (prevConfigurator != null) {
+                channelBuilder = prevConfigurator.apply(channelBuilder);
+              }
+              return channelBuilder.usePlaintext();
+            }
+          });
+    }
+
+    channelProvider.setPoolSize(options.getChannelCount());
+
+    stubSettings.setTransportChannelProvider(channelProvider.build());
+  }
+
+  private void configureHeaderProvider(
+      @UnderInitialization BigtableHBaseVeneeringSettings this,
+      StubSettings.Builder<?, ?> stubSettings,
+      BigtableOptions options) {
+
+    ImmutableMap.Builder<String, String> headersBuilder = 
ImmutableMap.<String, String>builder();
+    List<String> userAgentParts = Lists.newArrayList();
+    userAgentParts.add("bigtable-" + Version.VERSION);
+    userAgentParts.add("jdk-" + 
System.getProperty("java.specification.version"));

Review Comment:
   Do we need to mess with the header in this connector? I can see adding beam 
into the user agent, but the rest of it seems a bit extra



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java:
##########
@@ -196,66 +222,123 @@ BigtableService getBigtableService(PipelineOptions 
pipelineOptions) {
       return getBigtableService();
     }
 
-    BigtableOptions.Builder bigtableOptions = 
effectiveUserProvidedBigtableOptions();
+    BigtableConfig.Builder config = toBuilder();
 
-    bigtableOptions.setUserAgent(pipelineOptions.getUserAgent());
+    if (pipelineOptions instanceof GcpOptions) {
+      config.setCredentials(((GcpOptions) pipelineOptions).getGcpCredential());
+    }
 
-    if (bigtableOptions.build().getCredentialOptions().getCredentialType()
-        == CredentialOptions.CredentialType.DefaultCredentials) {
-      bigtableOptions.setCredentialOptions(
-          
CredentialOptions.credential(pipelineOptions.as(GcpOptions.class).getGcpCredential()));
+    try {
+      translateBigtableOptions(config);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
 
-    return new BigtableServiceImpl(bigtableOptions.build());
+    config.setUserAgent(pipelineOptions.getUserAgent());
+
+    return new BigtableServiceImpl(config.build());
   }
 
   boolean isDataAccessible() {
-    return getTableId().isAccessible()
-        && (getProjectId() == null || getProjectId().isAccessible())
+    return (getProjectId() == null || getProjectId().isAccessible())
         && (getInstanceId() == null || getInstanceId().isAccessible());
   }
 
-  private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() {
-    BigtableOptions.Builder effectiveOptions =
-        getBigtableOptions() != null
-            ? getBigtableOptions().toBuilder()
-            : new BigtableOptions.Builder();
+  private void translateBigtableOptions(BigtableConfig.Builder builder) throws 
IOException {
+    BigtableOptions.Builder effectiveOptionsBuilder = null;
+
+    if (getBigtableOptions() != null) {
+      effectiveOptionsBuilder = getBigtableOptions().toBuilder();
+    }
 
     if (getBigtableOptionsConfigurator() != null) {
-      effectiveOptions = 
getBigtableOptionsConfigurator().apply(effectiveOptions);
+      effectiveOptionsBuilder = 
getBigtableOptionsConfigurator().apply(BigtableOptions.builder());

Review Comment:
   I think the argument to the configurator should be `effectiveOptionsBuilder 
|| BigtableOptions.builder()`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableHBaseVeneeringSettings.java:
##########
@@ -0,0 +1,758 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import com.google.api.core.ApiFunction;
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.batching.FlowControlSettings;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
+import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.api.gax.rpc.ServerStreamingCallSettings;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.api.gax.rpc.StubSettings;
+import com.google.api.gax.rpc.UnaryCallSettings;
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
+import com.google.cloud.bigtable.Version;
+import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminSettings;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBulkReadRowsCallSettings;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.Deadline;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.grpc.internal.GrpcUtil;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.initialization.qual.UnderInitialization;
+import org.threeten.bp.Duration;
+
+/** Helper class to translate {@link BigtableOptions} to Bigtable Veneer 
settings. */
+class BigtableHBaseVeneeringSettings {
+  private static final String DEFAULT_BIGTABLE_BATCH_DATA_ENDPOINT =
+      "batch-bigtable.googleapis.com:443";
+  private static final Duration DEFAULT_UNARY_ATTEMPT_TIMEOUTS = 
Duration.ofSeconds(20);
+  private static final Duration DEFAULT_BULK_MUTATE_ATTEMPT_TIMEOUTS = 
Duration.ofMinutes(6);
+
+  private final BigtableDataSettings dataSettings;
+  private final BigtableTableAdminSettings tableAdminSettings;
+  private final BigtableInstanceAdminSettings instanceAdminSettings;
+
+  private final BigtableIOOperationTimeouts clientTimeouts;
+
+  static BigtableHBaseVeneeringSettings create(@Nonnull BigtableOptions 
options)
+      throws IOException {
+    return new BigtableHBaseVeneeringSettings(options);
+  }
+
+  private BigtableHBaseVeneeringSettings(@Nonnull BigtableOptions options) 
throws IOException {
+    // Build configs for veneer
+    this.clientTimeouts = buildCallSettings(options);
+
+    this.dataSettings = buildBigtableDataSettings(clientTimeouts, options);
+    this.tableAdminSettings = buildBigtableTableAdminSettings(options);
+    this.instanceAdminSettings = buildBigtableInstanceAdminSettings(options);
+  }
+
+  // ************** Getters **************
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableDataSettings}. */
+  BigtableDataSettings getDataSettings() {
+    return dataSettings;
+  }
+
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableTableAdminSettings}. */
+  BigtableTableAdminSettings getTableAdminSettings() {
+    return tableAdminSettings;
+  }
+
+  BigtableIOOperationTimeouts getOperationTimeouts() {
+    return clientTimeouts;
+  }
+
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableInstanceAdminSettings}. */
+  BigtableInstanceAdminSettings getInstanceAdminSettings() {
+    return instanceAdminSettings;
+  }
+
+  // ************** Private Helpers **************
+  private BigtableDataSettings buildBigtableDataSettings(
+      @UnderInitialization BigtableHBaseVeneeringSettings this,
+      BigtableIOOperationTimeouts clientTimeouts,
+      BigtableOptions options)
+      throws IOException {
+    BigtableDataSettings.Builder dataBuilder;
+
+    // Configure the Data connection
+    dataBuilder = BigtableDataSettings.newBuilder();
+    if (options.useBatch()) {
+      configureConnection(
+          dataBuilder.stubSettings(), DEFAULT_BIGTABLE_BATCH_DATA_ENDPOINT, 
options);
+    } else {
+      configureConnection(
+          dataBuilder.stubSettings(), options.getDataHost() + ":" + 
options.getPort(), options);
+    }
+    configureCredentialProvider(dataBuilder.stubSettings(), options);
+    configureHeaderProvider(dataBuilder.stubSettings(), options);
+
+    // Configure the target
+    
dataBuilder.setProjectId(options.getProjectId()).setInstanceId(options.getInstanceId());
+    if (options.getAppProfileId() != null) {
+      dataBuilder.setAppProfileId(options.getAppProfileId());
+    }
+
+    // Configure RPCs - this happens in multiple parts:
+    // - retry settings are configured here
+    // - timeouts are split into multiple places:
+    //   - timeouts for retries are configured here
+    //   - if USE_TIMEOUTS is explicitly disabled, then an interceptor is 
added to force all
+    // deadlines to 6 minutes
+    configureConnectionCallTimeouts(dataBuilder.stubSettings(), 
clientTimeouts);
+
+    // Complex RPC method settings
+    configureBulkMutationSettings(
+        dataBuilder.stubSettings().bulkMutateRowsSettings(),
+        clientTimeouts.getBulkMutateTimeouts(),
+        options);
+    configureBulkReadRowsSettings(
+        dataBuilder.stubSettings().bulkReadRowsSettings(),
+        clientTimeouts.getBulkReadRowsTimeouts(),
+        options);
+    configureReadRowsSettings(
+        dataBuilder.stubSettings().readRowsSettings(),
+        clientTimeouts.getBulkReadRowsTimeouts(),
+        options);
+
+    // RPC methods - simple
+    configureNonRetryableCallSettings(
+        dataBuilder.stubSettings().checkAndMutateRowSettings(), 
clientTimeouts.getUnaryTimeouts());
+    configureNonRetryableCallSettings(
+        dataBuilder.stubSettings().readModifyWriteRowSettings(), 
clientTimeouts.getUnaryTimeouts());
+
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().mutateRowSettings(), 
clientTimeouts.getUnaryTimeouts(), options);
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().readRowSettings(), 
clientTimeouts.getUnaryTimeouts(), options);

Review Comment:
   do we care about these? Unless I'm mistaken BigtableIO only cares about 
ReadRows, MutateRows and SampleRowKeys. If I'm correct then we can remove alot 
of this noise?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java:
##########
@@ -196,66 +222,123 @@ BigtableService getBigtableService(PipelineOptions 
pipelineOptions) {
       return getBigtableService();
     }
 
-    BigtableOptions.Builder bigtableOptions = 
effectiveUserProvidedBigtableOptions();
+    BigtableConfig.Builder config = toBuilder();
 
-    bigtableOptions.setUserAgent(pipelineOptions.getUserAgent());
+    if (pipelineOptions instanceof GcpOptions) {
+      config.setCredentials(((GcpOptions) pipelineOptions).getGcpCredential());
+    }
 
-    if (bigtableOptions.build().getCredentialOptions().getCredentialType()
-        == CredentialOptions.CredentialType.DefaultCredentials) {
-      bigtableOptions.setCredentialOptions(
-          
CredentialOptions.credential(pipelineOptions.as(GcpOptions.class).getGcpCredential()));
+    try {
+      translateBigtableOptions(config);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
 
-    return new BigtableServiceImpl(bigtableOptions.build());
+    config.setUserAgent(pipelineOptions.getUserAgent());
+
+    return new BigtableServiceImpl(config.build());
   }
 
   boolean isDataAccessible() {
-    return getTableId().isAccessible()
-        && (getProjectId() == null || getProjectId().isAccessible())
+    return (getProjectId() == null || getProjectId().isAccessible())
         && (getInstanceId() == null || getInstanceId().isAccessible());
   }
 
-  private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() {
-    BigtableOptions.Builder effectiveOptions =
-        getBigtableOptions() != null
-            ? getBigtableOptions().toBuilder()
-            : new BigtableOptions.Builder();
+  private void translateBigtableOptions(BigtableConfig.Builder builder) throws 
IOException {
+    BigtableOptions.Builder effectiveOptionsBuilder = null;
+
+    if (getBigtableOptions() != null) {
+      effectiveOptionsBuilder = getBigtableOptions().toBuilder();
+    }
 
     if (getBigtableOptionsConfigurator() != null) {
-      effectiveOptions = 
getBigtableOptionsConfigurator().apply(effectiveOptions);
+      effectiveOptionsBuilder = 
getBigtableOptionsConfigurator().apply(BigtableOptions.builder());
     }
 
-    // Default option that should be forced in most cases
-    effectiveOptions.setUseCachedDataPool(true);
+    if (effectiveOptionsBuilder == null) {
+      return;
+    }
+
+    BigtableOptions effectiveOptions = effectiveOptionsBuilder.build();
 
-    if (getInstanceId() != null) {
-      effectiveOptions.setInstanceId(getInstanceId().get());
+    // Todo decided if we should implement cached channel pool
+
+    if (effectiveOptions.getInstanceId() != null && getInstanceId() == null) {
+      
builder.setInstanceId(ValueProvider.StaticValueProvider.of(effectiveOptions.getInstanceId()));
     }
 
-    if (getProjectId() != null) {
-      effectiveOptions.setProjectId(getProjectId().get());
+    if (effectiveOptions.getProjectId() != null && getProjectId() == null) {
+      
builder.setProjectId(ValueProvider.StaticValueProvider.of(effectiveOptions.getProjectId()));
     }
 
-    if (getEmulatorHost() != null) {
-      effectiveOptions.enableEmulator(getEmulatorHost());
-      effectiveOptions.setUseCachedDataPool(false);
+    if (!effectiveOptions.getDataHost().equals("bigtable.googleapis.com")
+        && getEmulatorHost() == null) {
+      builder.setEmulatorHost(
+          String.format("%s:%s", effectiveOptions.getDataHost(), 
effectiveOptions.getPort()));

Review Comment:
   I think looks strange. If someone set the endpoint to 
batch-bigtable.googleapis.com or to a test endpoint, then we assume its an 
emulator?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java:
##########
@@ -115,6 +120,11 @@ BigtableConfig withTableId(ValueProvider<String> tableId) {
     return toBuilder().setTableId(tableId).build();
   }
 
+  BigtableConfig withAppProfileId(@Nullable ValueProvider<String> 
appProfileId) {
+    checkArgument(appProfileId != null, "App profile id can not be null");
+    return toBuilder().setAppProfileId(appProfileId).build();
+  }
+
   /** @deprecated will be replaced by bigtable options configurator. */
   @Deprecated
   BigtableConfig withBigtableOptions(BigtableOptions options) {

Review Comment:
   PLease deprecate the configurator for BigtbleOptions as well. We want to 
move away from BigtableOptions entirely. Instead provide a different mechanism 
for setting BigtableIO relevant options



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigToVeneerSettings.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import com.google.api.core.ApiFunction;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.grpc.ChannelPoolSettings;
+import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
+import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.api.gax.rpc.StubSettings;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.internal.GrpcUtil;
+import java.io.IOException;
+import java.util.Objects;
+import javax.annotation.Nonnull;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.initialization.qual.UnderInitialization;
+
+/** Helper class to translate {@link BigtableOptions} to Bigtable Veneer 
settings. */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+class BigtableConfigToVeneerSettings {
+  private static final String DEFAULT_DATA_ENDPOINT = 
"bigtable.googleapis.com:443";
+  private static final String DEFAULT_ADMIN_ENDPOINT = 
"bigtableadmin.googleapis.com:443";
+
+  private final BigtableDataSettings dataSettings;
+  private final BigtableTableAdminSettings tableAdminSettings;
+
+  static BigtableConfigToVeneerSettings create(@Nonnull BigtableConfig config) 
throws IOException {
+    return new BigtableConfigToVeneerSettings(config);
+  }
+
+  private BigtableConfigToVeneerSettings(@Nonnull BigtableConfig config) 
throws IOException {
+    if (config.getProjectId() == null || config.getInstanceId() == null) {
+      throw new IOException("can't find project or instance id");
+    }
+
+    // Build configs for veneer
+    this.dataSettings = buildBigtableDataSettings(config);
+    this.tableAdminSettings = buildBigtableTableAdminSettings(config);
+  }
+
+  // ************** Getters **************
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableDataSettings}. */
+  BigtableDataSettings getDataSettings() {
+    return dataSettings;
+  }
+
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableTableAdminSettings}. */
+  BigtableTableAdminSettings getTableAdminSettings() {
+    return tableAdminSettings;
+  }
+
+  // ************** Private Helpers **************
+  private BigtableDataSettings buildBigtableDataSettings(
+      @UnderInitialization BigtableConfigToVeneerSettings this, BigtableConfig 
config)
+      throws IOException {
+    BigtableDataSettings.Builder dataBuilder;
+
+    // Configure the Data connection
+    dataBuilder = BigtableDataSettings.newBuilder();
+    if (config.getEmulatorHost() != null) {
+      configureConnection(
+          dataBuilder.stubSettings(), config, 
Objects.requireNonNull(config.getEmulatorHost()));
+    } else {
+      configureConnection(dataBuilder.stubSettings(), config, 
DEFAULT_DATA_ENDPOINT);

Review Comment:
   dont we want this to be batch-bigtable.googleapis.com?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -530,48 +595,135 @@ public void onFailure(Throwable throwable) {
 
   @Override
   public String toString() {
-    return 
MoreObjects.toStringHelper(BigtableServiceImpl.class).add("options", 
options).toString();
+    return 
MoreObjects.toStringHelper(BigtableServiceImpl.class).add("options", 
config).toString();
   }
 
   @Override
   public Reader createReader(BigtableSource source) throws IOException {
-    BigtableSession session = new BigtableSession(options);
+    dataSettings = configureReadSettings(dataSettings.toBuilder(), 
source.getReadOptions());
+
+    RetrySettings retrySettings =
+        
dataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
+    LOG.info("Creating a Reader for Bigtable with settings: " + dataSettings);
+    BigtableDataClient client = BigtableDataClient.create(dataSettings);
     if (source.getMaxBufferElementCount() != null) {
-      return BigtableSegmentReaderImpl.create(session, source);
+      return BigtableSegmentReaderImpl.create(
+          client,
+          dataSettings.getProjectId(),
+          dataSettings.getInstanceId(),
+          source.getTableId().get(),
+          source.getRanges(),
+          source.getRowFilter(),
+          source.getMaxBufferElementCount(),
+          retrySettings.getInitialRpcTimeout(),
+          retrySettings.getTotalTimeout());
     } else {
-      return new BigtableReaderImpl(session, source);
+      return new BigtableReaderImpl(
+          client,
+          dataSettings.getProjectId(),
+          dataSettings.getInstanceId(),
+          source.getTableId().get(),
+          source.getRanges(),
+          source.getRowFilter(),
+          retrySettings.getInitialRpcTimeout(),
+          retrySettings.getTotalTimeout());
     }
   }
 
+  private BigtableDataSettings configureReadSettings(
+      BigtableDataSettings.Builder settings, BigtableReadOptions readOptions) {
+
+    RetrySettings.Builder retrySettings =
+        
settings.stubSettings().bulkReadRowsSettings().getRetrySettings().toBuilder();
+
+    if (readOptions.getAttemptTimeout() != null) {
+      
retrySettings.setInitialRpcTimeout(Duration.ofMillis(readOptions.getAttemptTimeout()));

Review Comment:
   I think you want to set the MaxRpcTimeout as well



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -656,4 +697,118 @@ public int compareTo(@Nonnull EndPoint o) {
           .result();
     }
   }
+
+  static class BeamRowAdapter implements 
RowAdapter<com.google.bigtable.v2.Row> {
+    @Override
+    public RowBuilder<com.google.bigtable.v2.Row> createRowBuilder() {
+      return new DefaultRowBuilder();
+    }
+
+    @Override
+    public boolean isScanMarkerRow(com.google.bigtable.v2.Row row) {
+      return Objects.equals(row, 
com.google.bigtable.v2.Row.getDefaultInstance());
+    }
+
+    @Override
+    public ByteString getKey(com.google.bigtable.v2.Row row) {
+      return row.getKey();
+    }
+
+    private static class DefaultRowBuilder
+        implements RowAdapter.RowBuilder<com.google.bigtable.v2.Row> {
+      private com.google.bigtable.v2.Row.Builder protoBuilder =
+          com.google.bigtable.v2.Row.newBuilder();
+
+      private TreeMap<String, TreeMap<ByteString, ImmutableList.Builder<Cell>>>
+          cellsByFamilyColumn = new TreeMap<>();
+      private TreeMap<ByteString, ImmutableList.Builder<Cell>> cellsByColumn =
+          new TreeMap<>(Comparator.comparing(o -> 
o.toString(StandardCharsets.UTF_8)));
+      private ImmutableList.Builder<Cell> currentColumnCells;

Review Comment:
   I think you are doing a lot of necessary work here....all of the cells will 
be in contiguous chunks and also you are creating parallel lists that you are 
throwing away.
   
   Why not just write to proto directly?
   
   ```java
   startRow() {
     builder = Row.newBuilder().setRow(key);
     lastFamilyName = null;
     lastFamily = null;
     lastColumnName = null;
     lastColumn = null;
   }
   startCell() {
     bool familyChanged = false;
     if (newFamilyName != lastFamilyName) {
       familyChanged = true;
       lastFamily = builder.newFamilyBuilder().setName(newFamilyName);
       lastFamilyName = newFamilyName;
     }
     if (lastColumnName != newColumnName || familyChanged) {
       lastColumn = lastFamilt.newColumnBuilder().setName(newColumnName);
       lastColumnName = newColumnName;
     }
     lastCell = 
lastColumn.newCellBuilder().setTimestamp(timestamp).addAllLabels(labels);
     currentValue = null;
   }
   cellValue() {
     if (currentValue == null) { currentValue = newValue; }
     else currentValue = currentValue.append(newValue);
   }
   ...
   ```
   
   



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -656,4 +697,118 @@ public int compareTo(@Nonnull EndPoint o) {
           .result();
     }
   }
+
+  static class BeamRowAdapter implements 
RowAdapter<com.google.bigtable.v2.Row> {

Review Comment:
   BeamRow is not quite the correct name for this...Someone reading this would 
assume this is the Beam SQL Row, I would call this RowProtoAdapter or similar



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.options.PipelineOptions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.threeten.bp.Duration;
+
+/**
+ * Factory class that caches {@link BigtableService} to share between workers 
with the same {@link
+ * BigtableConfig} and read / write options.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+class BigtableServiceFactory {
+
+  static final BigtableServiceFactory FACTORY_INSTANCE = new 
BigtableServiceFactory();
+
+  private int nextId = 0;
+
+  private final Map<ConfigId, BigtableServiceEntry> readEntries = new 
HashMap<>();
+  private final Map<ConfigId, BigtableServiceEntry> writeEntries = new 
HashMap<>();
+
+  @AutoValue
+  abstract static class ConfigId implements Serializable {
+
+    abstract int id();
+
+    static ConfigId create(int id) {
+      return new AutoValue_BigtableServiceFactory_ConfigId(id);
+    }
+  }
+
+  @AutoValue
+  abstract static class BigtableServiceEntry {
+
+    abstract ConfigId getConfigId();
+
+    abstract BigtableService getService();
+
+    abstract AtomicInteger getRefCount();
+
+    // Workaround for ReadRows requests which requires to pass the timeouts in
+    // ApiContext. Can be removed later once it's fixed in Veneer.
+    abstract Duration getAttemptTimeout();
+
+    abstract Duration getOperationTimeout();

Review Comment:
   This is definitely the wrong place for this...BigtableServiceEntry should 
only be used for ref counting 
   



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableHBaseVeneeringSettings.java:
##########
@@ -0,0 +1,758 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import com.google.api.core.ApiFunction;
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.batching.FlowControlSettings;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
+import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.api.gax.rpc.ServerStreamingCallSettings;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.api.gax.rpc.StubSettings;
+import com.google.api.gax.rpc.UnaryCallSettings;
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
+import com.google.cloud.bigtable.Version;
+import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminSettings;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBatchingCallSettings;
+import com.google.cloud.bigtable.data.v2.stub.BigtableBulkReadRowsCallSettings;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.Deadline;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.grpc.internal.GrpcUtil;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.initialization.qual.UnderInitialization;
+import org.threeten.bp.Duration;
+
+/** Helper class to translate {@link BigtableOptions} to Bigtable Veneer 
settings. */
+class BigtableHBaseVeneeringSettings {
+  private static final String DEFAULT_BIGTABLE_BATCH_DATA_ENDPOINT =
+      "batch-bigtable.googleapis.com:443";
+  private static final Duration DEFAULT_UNARY_ATTEMPT_TIMEOUTS = 
Duration.ofSeconds(20);
+  private static final Duration DEFAULT_BULK_MUTATE_ATTEMPT_TIMEOUTS = 
Duration.ofMinutes(6);
+
+  private final BigtableDataSettings dataSettings;
+  private final BigtableTableAdminSettings tableAdminSettings;
+  private final BigtableInstanceAdminSettings instanceAdminSettings;
+
+  private final BigtableIOOperationTimeouts clientTimeouts;
+
+  static BigtableHBaseVeneeringSettings create(@Nonnull BigtableOptions 
options)
+      throws IOException {
+    return new BigtableHBaseVeneeringSettings(options);
+  }
+
+  private BigtableHBaseVeneeringSettings(@Nonnull BigtableOptions options) 
throws IOException {
+    // Build configs for veneer
+    this.clientTimeouts = buildCallSettings(options);
+
+    this.dataSettings = buildBigtableDataSettings(clientTimeouts, options);
+    this.tableAdminSettings = buildBigtableTableAdminSettings(options);
+    this.instanceAdminSettings = buildBigtableInstanceAdminSettings(options);
+  }
+
+  // ************** Getters **************
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableDataSettings}. */
+  BigtableDataSettings getDataSettings() {
+    return dataSettings;
+  }
+
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableTableAdminSettings}. */
+  BigtableTableAdminSettings getTableAdminSettings() {
+    return tableAdminSettings;
+  }
+
+  BigtableIOOperationTimeouts getOperationTimeouts() {
+    return clientTimeouts;
+  }
+
+  /** Utility to convert {@link BigtableOptions} to {@link 
BigtableInstanceAdminSettings}. */
+  BigtableInstanceAdminSettings getInstanceAdminSettings() {
+    return instanceAdminSettings;
+  }
+
+  // ************** Private Helpers **************
+  private BigtableDataSettings buildBigtableDataSettings(
+      @UnderInitialization BigtableHBaseVeneeringSettings this,
+      BigtableIOOperationTimeouts clientTimeouts,
+      BigtableOptions options)
+      throws IOException {
+    BigtableDataSettings.Builder dataBuilder;
+
+    // Configure the Data connection
+    dataBuilder = BigtableDataSettings.newBuilder();
+    if (options.useBatch()) {
+      configureConnection(
+          dataBuilder.stubSettings(), DEFAULT_BIGTABLE_BATCH_DATA_ENDPOINT, 
options);
+    } else {
+      configureConnection(
+          dataBuilder.stubSettings(), options.getDataHost() + ":" + 
options.getPort(), options);
+    }
+    configureCredentialProvider(dataBuilder.stubSettings(), options);
+    configureHeaderProvider(dataBuilder.stubSettings(), options);
+
+    // Configure the target
+    
dataBuilder.setProjectId(options.getProjectId()).setInstanceId(options.getInstanceId());
+    if (options.getAppProfileId() != null) {
+      dataBuilder.setAppProfileId(options.getAppProfileId());
+    }
+
+    // Configure RPCs - this happens in multiple parts:
+    // - retry settings are configured here
+    // - timeouts are split into multiple places:
+    //   - timeouts for retries are configured here
+    //   - if USE_TIMEOUTS is explicitly disabled, then an interceptor is 
added to force all
+    // deadlines to 6 minutes
+    configureConnectionCallTimeouts(dataBuilder.stubSettings(), 
clientTimeouts);
+
+    // Complex RPC method settings
+    configureBulkMutationSettings(
+        dataBuilder.stubSettings().bulkMutateRowsSettings(),
+        clientTimeouts.getBulkMutateTimeouts(),
+        options);
+    configureBulkReadRowsSettings(
+        dataBuilder.stubSettings().bulkReadRowsSettings(),
+        clientTimeouts.getBulkReadRowsTimeouts(),
+        options);
+    configureReadRowsSettings(
+        dataBuilder.stubSettings().readRowsSettings(),
+        clientTimeouts.getBulkReadRowsTimeouts(),
+        options);
+
+    // RPC methods - simple
+    configureNonRetryableCallSettings(
+        dataBuilder.stubSettings().checkAndMutateRowSettings(), 
clientTimeouts.getUnaryTimeouts());
+    configureNonRetryableCallSettings(
+        dataBuilder.stubSettings().readModifyWriteRowSettings(), 
clientTimeouts.getUnaryTimeouts());
+
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().mutateRowSettings(), 
clientTimeouts.getUnaryTimeouts(), options);
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().readRowSettings(), 
clientTimeouts.getUnaryTimeouts(), options);
+    configureRetryableCallSettings(
+        dataBuilder.stubSettings().sampleRowKeysSettings(),
+        clientTimeouts.getUnaryTimeouts(),
+        options);
+
+    return dataBuilder.build();
+  }
+
+  private BigtableTableAdminSettings buildBigtableTableAdminSettings(
+      @UnderInitialization BigtableHBaseVeneeringSettings this, 
BigtableOptions options)
+      throws IOException {
+    BigtableTableAdminSettings.Builder adminBuilder;
+
+    // Configure connection
+    adminBuilder = BigtableTableAdminSettings.newBuilder();
+    configureConnection(
+        adminBuilder.stubSettings(), options.getAdminHost() + ":" + 
options.getPort(), options);
+    configureCredentialProvider(adminBuilder.stubSettings(), options);
+
+    configureHeaderProvider(adminBuilder.stubSettings(), options);
+
+    
adminBuilder.setProjectId(options.getProjectId()).setInstanceId(options.getInstanceId());
+
+    // timeout/retry settings don't apply to admin operations
+    // v1 used to use RetryOptions for:
+    // - createTable
+    // - getTable
+    // - listTables
+    // - deleteTable
+    // - modifyColumnFamilies
+    // - dropRowRange
+    // However data latencies are very different from data latencies and end 
users shouldn't need to
+    // change the defaults
+    // if it turns out that the timeout & retry behavior needs to be 
configurable, we will expose
+    // separate settings
+
+    return adminBuilder.build();
+  }
+
+  private BigtableInstanceAdminSettings buildBigtableInstanceAdminSettings(
+      @UnderInitialization BigtableHBaseVeneeringSettings this, 
BigtableOptions options)
+      throws IOException {
+    BigtableInstanceAdminSettings.Builder adminBuilder;
+
+    // Configure connection
+    adminBuilder = BigtableInstanceAdminSettings.newBuilder();
+    configureConnection(
+        adminBuilder.stubSettings(), options.getAdminHost() + ":" + 
options.getPort(), options);
+    configureCredentialProvider(adminBuilder.stubSettings(), options);
+
+    configureHeaderProvider(adminBuilder.stubSettings(), options);
+
+    adminBuilder.setProjectId(options.getProjectId());
+
+    return adminBuilder.build();
+  }
+
+  @SuppressWarnings("rawtypes")
+  private void configureConnection(
+      @UnderInitialization BigtableHBaseVeneeringSettings this,
+      StubSettings.Builder<?, ?> stubSettings,
+      String endpoint,
+      BigtableOptions options) {
+    final InstantiatingGrpcChannelProvider.Builder channelProvider =
+        ((InstantiatingGrpcChannelProvider) 
stubSettings.getTransportChannelProvider()).toBuilder();
+
+    stubSettings.setEndpoint(endpoint);
+
+    if (options.usePlaintextNegotiation()) {
+      // Make sure to avoid clobbering the old Configurator
+      @SuppressWarnings("rawtypes")
+      final ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> 
prevConfigurator =
+          channelProvider.getChannelConfigurator();
+      //noinspection rawtypes
+      channelProvider.setChannelConfigurator(
+          new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
+            @Override
+            public ManagedChannelBuilder apply(ManagedChannelBuilder 
channelBuilder) {
+              if (prevConfigurator != null) {
+                channelBuilder = prevConfigurator.apply(channelBuilder);
+              }
+              return channelBuilder.usePlaintext();
+            }
+          });
+    }
+
+    channelProvider.setPoolSize(options.getChannelCount());
+
+    stubSettings.setTransportChannelProvider(channelProvider.build());
+  }
+
+  private void configureHeaderProvider(
+      @UnderInitialization BigtableHBaseVeneeringSettings this,
+      StubSettings.Builder<?, ?> stubSettings,
+      BigtableOptions options) {
+
+    ImmutableMap.Builder<String, String> headersBuilder = 
ImmutableMap.<String, String>builder();
+    List<String> userAgentParts = Lists.newArrayList();
+    userAgentParts.add("bigtable-" + Version.VERSION);
+    userAgentParts.add("jdk-" + 
System.getProperty("java.specification.version"));

Review Comment:
   Just add the beam user agent be done with it :)



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -521,43 +561,96 @@ public Read withoutValidation() {
     }
 
     /**
-     * Returns a new {@link BigtableIO.Read} that will read using the given 
Cloud Bigtable service
-     * implementation.
+     * Returns a new {@link BigtableIO.Read} that will use an official 
Bigtable emulator.
      *
      * <p>This is used for testing.
-     *
-     * <p>Does not modify this object.
      */
     @VisibleForTesting
-    Read withBigtableService(BigtableService bigtableService) {
+    public Read withEmulator(String emulatorHost) {
       BigtableConfig config = getBigtableConfig();
-      return 
toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build();
+      return 
toBuilder().setBigtableConfig(config.withEmulator(emulatorHost)).build();
     }
 
     /**
-     * Returns a new {@link BigtableIO.Read} that will use an official 
Bigtable emulator.
+     * Configures the attempt timeout in milliseconds of the reads.
      *
-     * <p>This is used for testing.
+     * <p>Does not modify this object.
+     */
+    public Read withAttemptTimeout(long timeoutMs) {
+      checkArgument(timeoutMs > 0, "attempt timeout must be positive");
+      BigtableReadOptions readOptions = getBigtableReadOptions();
+      return toBuilder()
+          
.setBigtableReadOptions(readOptions.toBuilder().setAttemptTimeout(timeoutMs).build())
+          .build();
+    }
+
+    /**
+     * Configures the operation timeout in milliseconds of the reads.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withOperationTimeout(long timeoutMs) {
+      checkArgument(timeoutMs > 0, "operation timeout must be positive");
+      BigtableReadOptions readOptions = getBigtableReadOptions();
+      return toBuilder()
+          
.setBigtableReadOptions(readOptions.toBuilder().setOperationTimeout(timeoutMs).build())
+          .build();
+    }
+
+    /**
+     * Configures the initial retry delay in milliseconds.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withRetryInitialDelay(long initialDelayMs) {
+      checkArgument(initialDelayMs > 0, "initial delay must be positive");
+      BigtableReadOptions readOptions = getBigtableReadOptions();
+      return toBuilder()
+          .setBigtableReadOptions(
+              
readOptions.toBuilder().setRetryInitialDelay(initialDelayMs).build())
+          .build();
+    }
+
+    /**
+     * Configures the delay multiplier.
+     *
+     * <p>Does not modify this object.
      */
+    public Read withRetryDelayMultiplier(double multiplier) {
+      checkArgument(multiplier > 0, "delay multiplier must be positive");
+      BigtableReadOptions readOptions = getBigtableReadOptions();
+      return toBuilder()
+          .setBigtableReadOptions(
+              
readOptions.toBuilder().setRetryDelayMultiplier(multiplier).build())
+          .build();
+    }
+
+    /** Set the configId for testing. */
     @VisibleForTesting
-    public Read withEmulator(String emulatorHost) {
-      BigtableConfig config = getBigtableConfig();
-      return 
toBuilder().setBigtableConfig(config.withEmulator(emulatorHost)).build();
+    Read withConfigId(int id) {
+      return 
toBuilder().setBigtableConfig(getBigtableConfig().withConfigId(id)).build();

Review Comment:
   I dont think Read should store the ConfigId, that should be stored in the 
source, similar to your Write
   
   For testing you should have an AutoValue setter for BigtableServiceFactory 
instance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to