[ https://issues.apache.org/jira/browse/BEAM-3008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16286771#comment-16286771 ]
ASF GitHub Bot commented on BEAM-3008: -------------------------------------- chamikaramj closed pull request #4205: [BEAM-3008] Adds BigtableOptions configurator to the BigtableIO URL: https://github.com/apache/beam/pull/4205 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 8b4609da224..febdc1f53b0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -122,43 +122,19 @@ * idempotent transformation to that row. * * <p>To configure a Cloud Bigtable sink, you must supply a table id, a project id, an instance id - * and optionally and optionally a {@link BigtableOptions} to provide more specific connection - * configuration, for example: + * and optionally a configuration function for {@link BigtableOptions} to provide more specific + * connection configuration, for example: * * <pre>{@code * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...; * * data.apply("write", * BigtableIO.write() - * .setProjectId("project") - * .setInstanceId("instance") + * .withProjectId("project") + * .withInstanceId("instance") * .withTableId("table")); * }</pre> * - * <h3>Using local emulator</h3> - * - * <p>In order to use local emulator for Bigtable you should use: - * - * <pre>{@code - * BigtableOptions.Builder optionsBuilder = - * new BigtableOptions.Builder() - * .setUsePlaintextNegotiation(true) - * .setCredentialOptions(CredentialOptions.nullCredential()) - * .setDataHost("127.0.0.1") // network interface where Bigtable emulator is bound - * .setInstanceAdminHost("127.0.0.1") - * .setTableAdminHost("127.0.0.1") - * .setPort(LOCAL_EMULATOR_PORT)) - * - * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...; - * - * data.apply("write", - * BigtableIO.write() - * .withBigtableOptions(optionsBuilder) - * .setProjectId("project") - * .setInstanceId("instance") - * .withTableId("table"); - * }</pre> - * * <h3>Experimental</h3> * * <p>This connector for Cloud Bigtable is considered experimental and may break or receive @@ -239,12 +215,23 @@ public static Write write() { @Nullable abstract BigtableService getBigtableService(); - /** Returns the Google Cloud Bigtable instance being read from, and other parameters. */ + /** + * Returns the Google Cloud Bigtable instance being read from, and other parameters. + * @deprecated will be replaced by bigtable options configurator. + */ + @Deprecated @Nullable public abstract BigtableOptions getBigtableOptions(); public abstract boolean getValidate(); + /** + * Configurator of the effective Bigtable Options. + */ + @Nullable + abstract SerializableFunction<BigtableOptions.Builder, + BigtableOptions.Builder> getBigtableOptionsConfigurator(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -260,12 +247,17 @@ public static Write write() { abstract Builder setTableId(String tableId); + /** @deprecated will be replaced by bigtable options configurator. */ + @Deprecated abstract Builder setBigtableOptions(BigtableOptions options); abstract Builder setBigtableService(BigtableService bigtableService); abstract Builder setValidate(boolean validate); + abstract Builder setBigtableOptionsConfigurator( + SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> optionsConfigurator); + abstract Read build(); } @@ -302,7 +294,10 @@ public Read withInstanceId(String instanceId) { * indicated by {@link #withProjectId(String)}, and using any other specified customizations. * * <p>Does not modify this object. + * + * @deprecated will be replaced by bigtable options configurator. */ + @Deprecated public Read withBigtableOptions(BigtableOptions options) { checkArgument(options != null, "options can not be null"); return withBigtableOptions(options.toBuilder()); @@ -320,17 +315,29 @@ public Read withBigtableOptions(BigtableOptions options) { * will have no effect on the returned {@link BigtableIO.Read}. * * <p>Does not modify this object. + * + * @deprecated will be replaced by bigtable options configurator. */ + @Deprecated public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) { checkArgument(optionsBuilder != null, "optionsBuilder can not be null"); // TODO: is there a better way to clone a Builder? Want it to be immune from user changes. - BigtableOptions options = optionsBuilder.build(); - - BigtableOptions.Builder clonedBuilder = options.toBuilder() - .setUseCachedDataPool(true); - BigtableOptions clonedOptions = clonedBuilder.build(); + return toBuilder().setBigtableOptions(optionsBuilder.build().toBuilder().build()).build(); + } - return toBuilder().setBigtableOptions(clonedOptions).build(); + /** + * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance + * with customized options provided by given configurator. + * + * <p>WARNING: instanceId and projectId should not be provided here and should be provided over + * {@link #withProjectId(String)} and {@link #withInstanceId(String)}. + * + * <p>Does not modify this object. + */ + public Read withBigtableOptionsConfigurator( + SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> configurator) { + checkArgument(configurator != null, "configurator can not be null"); + return toBuilder().setBigtableOptionsConfigurator(configurator).build(); } /** @@ -427,17 +434,25 @@ public void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("rowFilter", getRowFilter().toString()) .withLabel("Table Row Filter")); } + + builder.add(DisplayData.item("effectiveBigtableOptions", + effectiveUserProvidedBigtableOptions().build().toString()) + .withLabel("Effective BigtableOptions resulted from configuration of given options")); } @Override public String toString() { return MoreObjects.toStringHelper(Read.class) .add("options", getBigtableOptions()) + .add("effectiveOptions", effectiveUserProvidedBigtableOptions()) .add("projectId", getProjectId()) .add("instanceId", getInstanceId()) .add("tableId", getTableId()) .add("keyRange", getKeyRange()) .add("filter", getRowFilter()) + .add("bigtableOptionsConfigurator", + getBigtableOptionsConfigurator() == null ? null : getBigtableOptionsConfigurator() + .getClass().getName()) .toString(); } @@ -468,25 +483,41 @@ BigtableService getBigtableService(PipelineOptions pipelineOptions) { return getBigtableService(); } - BigtableOptions.Builder clonedOptions = getBigtableOptions() != null - ? getBigtableOptions().toBuilder() - : new BigtableOptions.Builder(); + BigtableOptions.Builder bigtableOptions = effectiveUserProvidedBigtableOptions(); - clonedOptions.setUserAgent(pipelineOptions.getUserAgent()); - if (getInstanceId() != null) { - clonedOptions.setInstanceId(getInstanceId()); - } - if (getProjectId() != null) { - clonedOptions.setProjectId(getProjectId()); - } + bigtableOptions.setUserAgent(pipelineOptions.getUserAgent()); if (getBigtableOptions() != null && getBigtableOptions().getCredentialOptions() .getCredentialType() == CredentialType.DefaultCredentials) { - clonedOptions.setCredentialOptions( + bigtableOptions.setCredentialOptions( CredentialOptions.credential( pipelineOptions.as(GcpOptions.class).getGcpCredential())); } - return new BigtableServiceImpl(clonedOptions.build()); + + // Default option that should be forced + bigtableOptions.setUseCachedDataPool(true); + + return new BigtableServiceImpl(bigtableOptions.build()); + } + + private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() { + BigtableOptions.Builder effectiveOptions = getBigtableOptions() != null + ? getBigtableOptions().toBuilder() + : new BigtableOptions.Builder(); + + if (getBigtableOptionsConfigurator() != null) { + effectiveOptions = getBigtableOptionsConfigurator().apply(effectiveOptions); + } + + if (getInstanceId() != null) { + effectiveOptions.setInstanceId(getInstanceId()); + } + + if (getProjectId() != null) { + effectiveOptions.setProjectId(getProjectId()); + } + + return effectiveOptions; } } @@ -516,10 +547,21 @@ BigtableService getBigtableService(PipelineOptions pipelineOptions) { @Nullable abstract BigtableService getBigtableService(); - /** Returns the Google Cloud Bigtable instance being written to, and other parameters. */ + /** + * Returns the Google Cloud Bigtable instance being written to, and other parameters. + * @deprecated will be replaced by bigtable options configurator. + */ + @Deprecated @Nullable public abstract BigtableOptions getBigtableOptions(); + /** + * Configurator of the effective Bigtable Options. + */ + @Nullable + abstract SerializableFunction<BigtableOptions.Builder, + BigtableOptions.Builder> getBigtableOptionsConfigurator(); + abstract boolean getValidate(); abstract Builder toBuilder(); @@ -533,12 +575,17 @@ BigtableService getBigtableService(PipelineOptions pipelineOptions) { abstract Builder setTableId(String tableId); + /** @deprecated will be replaced by bigtable options configurator. */ + @Deprecated abstract Builder setBigtableOptions(BigtableOptions options); abstract Builder setBigtableService(BigtableService bigtableService); abstract Builder setValidate(boolean validate); + abstract Builder setBigtableOptionsConfigurator( + SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> optionsConfigurator); + abstract Write build(); } @@ -575,7 +622,10 @@ public Write withInstanceId(String instanceId) { * indicated by the given options, and using any other specified customizations. * * <p>Does not modify this object. + * + * @deprecated will be replaced by bigtable options configurator. */ + @Deprecated public Write withBigtableOptions(BigtableOptions options) { return withBigtableOptions(options.toBuilder()); } @@ -592,21 +642,29 @@ public Write withBigtableOptions(BigtableOptions options) { * will have no effect on the returned {@link BigtableIO.Write}. * * <p>Does not modify this object. + * + * @deprecated will be replaced by bigtable options configurator. */ + @Deprecated public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) { checkArgument(optionsBuilder != null, "optionsBuilder can not be null"); // TODO: is there a better way to clone a Builder? Want it to be immune from user changes. - BigtableOptions options = optionsBuilder.build(); + return toBuilder().setBigtableOptions(optionsBuilder.build().toBuilder().build()).build(); + } - // Set useBulkApi to true for enabling bulk writes - BigtableOptions.Builder clonedBuilder = options.toBuilder() - .setBulkOptions( - options.getBulkOptions().toBuilder() - .setUseBulkApi(true) - .build()) - .setUseCachedDataPool(true); - BigtableOptions clonedOptions = clonedBuilder.build(); - return toBuilder().setBigtableOptions(clonedOptions).build(); + /** + * Returns a new {@link BigtableIO.Write} that will read from the Cloud Bigtable instance + * with customized options provided by given configurator. + * + * <p>WARNING: instanceId and projectId should not be provided here and should be provided over + * {@link #withProjectId(String)} and {@link #withInstanceId(String)}. + * + * <p>Does not modify this object. + */ + public Write withBigtableOptionsConfigurator( + SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> configurator) { + checkArgument(configurator != null, "configurator can not be null"); + return toBuilder().setBigtableOptionsConfigurator(configurator).build(); } /** Disables validation that the table being written to exists. */ @@ -687,15 +745,23 @@ public void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("instanceId", getInstanceId()) .withLabel("Bigtable Instnace Id")); } + + builder.add(DisplayData.item("effectiveBigtableOptions", + effectiveUserProvidedBigtableOptions().build().toString()) + .withLabel("Effective BigtableOptions resulted from configuration of given options")); } @Override public String toString() { return MoreObjects.toStringHelper(Write.class) .add("options", getBigtableOptions()) + .add("effectiveOptions", effectiveUserProvidedBigtableOptions()) .add("tableId", getTableId()) .add("projectId", getProjectId()) .add("instanceId", getInstanceId()) + .add("bigtableOptionsConfigurator", + getBigtableOptionsConfigurator() == null ? null : getBigtableOptionsConfigurator() + .getClass().getName()) .toString(); } @@ -713,25 +779,45 @@ BigtableService getBigtableService(PipelineOptions pipelineOptions) { return getBigtableService(); } - BigtableOptions.Builder clonedOptions = getBigtableOptions() != null - ? getBigtableOptions().toBuilder() - : new BigtableOptions.Builder(); + BigtableOptions.Builder bigtableOptions = effectiveUserProvidedBigtableOptions(); + + bigtableOptions.setUserAgent(pipelineOptions.getUserAgent()); + + if (getBigtableOptions() != null && getBigtableOptions().getCredentialOptions() + .getCredentialType() == CredentialType.DefaultCredentials) { + bigtableOptions.setCredentialOptions( + CredentialOptions.credential( + pipelineOptions.as(GcpOptions.class).getGcpCredential())); + } + + // Set useBulkApi to true for enabling bulk writes + bigtableOptions + .setUseCachedDataPool(true) + .setBulkOptions( + effectiveUserProvidedBigtableOptions().build().getBulkOptions().toBuilder() + .setUseBulkApi(true) + .build()); + + return new BigtableServiceImpl(bigtableOptions.build()); + } + + private BigtableOptions.Builder effectiveUserProvidedBigtableOptions() { + BigtableOptions.Builder effectiveOptions = getBigtableOptions() != null + ? getBigtableOptions().toBuilder() + : new BigtableOptions.Builder(); + + if (getBigtableOptionsConfigurator() != null) { + effectiveOptions = getBigtableOptionsConfigurator().apply(effectiveOptions); + } - clonedOptions.setUserAgent(pipelineOptions.getUserAgent()); if (getInstanceId() != null) { - clonedOptions.setInstanceId(getInstanceId()); + effectiveOptions.setInstanceId(getInstanceId()); } if (getProjectId() != null) { - clonedOptions.setProjectId(getProjectId()); + effectiveOptions.setProjectId(getProjectId()); } - if (getBigtableOptions() != null && getBigtableOptions().getCredentialOptions() - .getCredentialType() == CredentialType.DefaultCredentials) { - clonedOptions.setCredentialOptions( - CredentialOptions.credential( - pipelineOptions.as(GcpOptions.class).getGcpCredential())); - } - return new BigtableServiceImpl(clonedOptions.build()); + return effectiveOptions; } private class BigtableWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index a976e4ad351..418db92c4bf 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -140,6 +140,15 @@ public BigtableService apply(PipelineOptions input) { private static final TypeDescriptor<KV<ByteString, Iterable<Mutation>>> BIGTABLE_WRITE_TYPE = new TypeDescriptor<KV<ByteString, Iterable<Mutation>>>() {}; + private static final SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> + PORT_CONFIGURATOR = + new SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder>() { + @Override + public BigtableOptions.Builder apply(BigtableOptions.Builder input) { + return input.setPort(1234); + } + }; + @Before public void setup() throws Exception { service = new FakeBigtableService(); @@ -158,12 +167,14 @@ public void testReadBuildsCorrectly() { BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS) .withTableId("table") .withInstanceId("instance") - .withProjectId("project"); + .withProjectId("project") + .withBigtableOptionsConfigurator(PORT_CONFIGURATOR); assertEquals("options_project", read.getBigtableOptions().getProjectId()); assertEquals("options_instance", read.getBigtableOptions().getInstanceId()); assertEquals("instance", read.getInstanceId()); assertEquals("project", read.getProjectId()); assertEquals("table", read.getTableId()); + assertEquals(PORT_CONFIGURATOR, read.getBigtableOptionsConfigurator()); } @Test @@ -214,12 +225,14 @@ public void testWriteBuildsCorrectly() { BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS) .withTableId("table") .withInstanceId("instance") - .withProjectId("project"); + .withProjectId("project") + .withBigtableOptionsConfigurator(PORT_CONFIGURATOR); assertEquals("table", write.getTableId()); assertEquals("options_project", write.getBigtableOptions().getProjectId()); assertEquals("options_instance", write.getBigtableOptions().getInstanceId()); assertEquals("instance", write.getInstanceId()); assertEquals("project", write.getProjectId()); + assertEquals(PORT_CONFIGURATOR, write.getBigtableOptionsConfigurator()); } @Test ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > BigtableIO should use ValueProviders > ------------------------------------- > > Key: BEAM-3008 > URL: https://issues.apache.org/jira/browse/BEAM-3008 > Project: Beam > Issue Type: New Feature > Components: sdk-java-gcp > Reporter: Solomon Duskis > Assignee: Solomon Duskis > > [https://github.com/apache/beam/pull/2057] is an effort towards BigtableIO > templatization. This Issue is a request to get a fully featured template for > BigtableIO. -- This message was sent by Atlassian JIRA (v6.4.14#64029)