lukecwik commented on code in PR #23615:
URL: https://github.com/apache/beam/pull/23615#discussion_r1001155738


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -3041,6 +3081,21 @@ private <DestinationT> WriteResult expandTyped(
                   new PrepareWrite<>(dynamicDestinations, 
SerializableFunctions.identity()))
               .setCoder(KvCoder.of(destinationCoder, input.getCoder()));
 
+      if (getSchemaRefreshFrequency().isLongerThan(Duration.ZERO)) {
+        // Create a slowly-updating side input to periodically update the 
schema and inject that
+        // into the
+        // DynamicDestinations object.

Review Comment:
   ```suggestion
           // into the DynamicDestinations object.
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -3041,6 +3081,21 @@ private <DestinationT> WriteResult expandTyped(
                   new PrepareWrite<>(dynamicDestinations, 
SerializableFunctions.identity()))
               .setCoder(KvCoder.of(destinationCoder, input.getCoder()));
 
+      if (getSchemaRefreshFrequency().isLongerThan(Duration.ZERO)) {
+        // Create a slowly-updating side input to periodically update the 
schema and inject that
+        // into the
+        // DynamicDestinations object.
+        PCollectionView<List<TimestampedValue<Map<String, String>>>> 
tableSchemasView =
+            rowsWithDestination
+                .apply(
+                    new PeriodicallyRefreshTableSchema<>(
+                        getSchemaRefreshFrequency(), dynamicDestinations, 
getBigQueryServices()))
+                .apply(View.asList());

Review Comment:
   View.asList is a map underneath indexed by numerical indices, use 
View.asIterable



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -2056,7 +2059,9 @@ public enum Method {
 
     abstract Boolean getPropagateSuccessful();
 
-    abstract Boolean getAutoSchemaUpdate();
+    abstract Boolean getAutoSchemaRefresh();
+
+    abstract Duration getSchemaRefreshFrequency();

Review Comment:
   Should we set two properties here or can we rely on duration being non-zero 
or non-null to turn the feature on?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PeriodicallyRefreshTableSchema.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.SnappyCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** This transform periodically recalculates the schema for each destination 
table. */
+@SuppressWarnings({"unused"})
+class PeriodicallyRefreshTableSchema<DestinationT, ElementT>
+    extends PTransform<
+        PCollection<KV<DestinationT, ElementT>>,
+        PCollection<TimestampedValue<Map<String, String>>>> {
+  private final Duration refreshDuration;
+  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+  private final BigQueryServices bqServices;
+
+  public PeriodicallyRefreshTableSchema(
+      Duration refreshDuration,
+      DynamicDestinations<?, DestinationT> dynamicDestinations,
+      BigQueryServices bqServices) {
+    this.refreshDuration = refreshDuration;
+    this.dynamicDestinations = dynamicDestinations;
+    this.bqServices = bqServices;
+  }
+
+  @Override
+  public PCollection<TimestampedValue<Map<String, String>>> expand(
+      PCollection<KV<DestinationT, ElementT>> input) {
+    PCollection<TimestampedValue<Map<String, String>>> tableSchemas =
+        input
+            .apply("rewindow", Window.into(new GlobalWindows()))
+            .apply("dedup", ParDo.of(new DedupDoFn()))
+            .apply("refresh schema", ParDo.of(new RefreshTableSchemaDoFn()))
+            .apply(
+                "addTrigger",
+                Window.<TimestampedValue<Map<String, String>>>configure()
+                    
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
+                    .discardingFiredPanes());
+    tableSchemas.setCoder(
+        SnappyCoder.of(
+            TimestampedValue.TimestampedValueCoder.of(
+                MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    return tableSchemas;
+  }
+
+  private class DedupDoFn extends DoFn<KV<DestinationT, ElementT>, KV<Void, 
DestinationT>> {
+    Set<DestinationT> destinations = Sets.newHashSet();
+
+    @StartBundle
+    public void startBundle() {
+      destinations.clear();
+    }
+
+    @ProcessElement
+    public void process(@Element KV<DestinationT, ElementT> element) {
+      destinations.add(element.getKey());
+    }
+
+    @FinishBundle
+    public void finishBundle(FinishBundleContext context) {
+      for (DestinationT destination : destinations) {
+        context.output(
+            KV.of(null, destination), GlobalWindow.INSTANCE.maxTimestamp(), 
GlobalWindow.INSTANCE);
+      }

Review Comment:
   ```suggestion
         }
         destinations.clear();
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PeriodicallyRefreshTableSchema.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.SnappyCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** This transform periodically recalculates the schema for each destination 
table. */
+@SuppressWarnings({"unused"})
+class PeriodicallyRefreshTableSchema<DestinationT, ElementT>
+    extends PTransform<
+        PCollection<KV<DestinationT, ElementT>>,
+        PCollection<TimestampedValue<Map<String, String>>>> {
+  private final Duration refreshDuration;
+  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+  private final BigQueryServices bqServices;
+
+  public PeriodicallyRefreshTableSchema(
+      Duration refreshDuration,
+      DynamicDestinations<?, DestinationT> dynamicDestinations,
+      BigQueryServices bqServices) {
+    this.refreshDuration = refreshDuration;
+    this.dynamicDestinations = dynamicDestinations;
+    this.bqServices = bqServices;
+  }
+
+  @Override
+  public PCollection<TimestampedValue<Map<String, String>>> expand(
+      PCollection<KV<DestinationT, ElementT>> input) {
+    PCollection<TimestampedValue<Map<String, String>>> tableSchemas =
+        input
+            .apply("rewindow", Window.into(new GlobalWindows()))
+            .apply("dedup", ParDo.of(new DedupDoFn()))
+            .apply("refresh schema", ParDo.of(new RefreshTableSchemaDoFn()))
+            .apply(
+                "addTrigger",
+                Window.<TimestampedValue<Map<String, String>>>configure()
+                    
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
+                    .discardingFiredPanes());
+    tableSchemas.setCoder(
+        SnappyCoder.of(
+            TimestampedValue.TimestampedValueCoder.of(
+                MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    return tableSchemas;
+  }
+
+  private class DedupDoFn extends DoFn<KV<DestinationT, ElementT>, KV<Void, 
DestinationT>> {
+    Set<DestinationT> destinations = Sets.newHashSet();
+
+    @StartBundle
+    public void startBundle() {
+      destinations.clear();
+    }
+

Review Comment:
   ```suggestion
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -155,13 +155,13 @@ public PCollection<Void> 
expand(PCollection<KV<DestinationT, StorageApiWritePayl
         .apply("Finalize writes", ParDo.of(new 
StorageApiFinalizeWritesDoFn(bqServices)));
   }
 
-  static class WriteRecordsDoFn<DestinationT extends @NonNull Object, ElementT>
+  static class WriteRecordsDoFn<DestinationT extends @NonNull Object>
       extends DoFn<KV<DestinationT, StorageApiWritePayload>, KV<String, 
String>> {
     private final Counter forcedFlushes = 
Metrics.counter(WriteRecordsDoFn.class, "forcedFlushes");
 
-    class DestinationState {
+    static class DestinationState {

Review Comment:
   Keep this parameterized?
   ```suggestion
       static class DestinationState<ElementT> {
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -297,16 +299,63 @@ public String toString() {
     }
   }
 
+  static class SchemaFromViewDestinationsListOfMaps<T, DestinationT>
+      extends DelegatingDynamicDestinations<T, DestinationT> {
+    PCollectionView<List<TimestampedValue<Map<String, String>>>> schemaView;
+
+    SchemaFromViewDestinationsListOfMaps(
+        DynamicDestinations<T, DestinationT> inner,
+        PCollectionView<List<TimestampedValue<Map<String, String>>>> 
schemaView) {
+      super(inner);
+      checkArgument(schemaView != null, "schemaView can not be null");
+      this.schemaView = schemaView;
+    }
+
+    @Override
+    public List<PCollectionView<?>> getSideInputs() {
+      return 
ImmutableList.<PCollectionView<?>>builder().add(schemaView).build();
+    }
+
+    @Override
+    public @Nullable TableSchema getSchema(DestinationT destination) {
+      List<TimestampedValue<Map<String, String>>> mapValues = 
sideInput(schemaView);
+      Optional<Map<String, String>> mapValue =
+          mapValues.stream()

Review Comment:
   It turns out that using java streams is like 2x slower then plain for loop. 
This seems like this method will be invoked a lot so considering using a simple 
for loop.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PeriodicallyRefreshTableSchema.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.SnappyCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** This transform periodically recalculates the schema for each destination 
table. */
+@SuppressWarnings({"unused"})
+class PeriodicallyRefreshTableSchema<DestinationT, ElementT>
+    extends PTransform<
+        PCollection<KV<DestinationT, ElementT>>,
+        PCollection<TimestampedValue<Map<String, String>>>> {
+  private final Duration refreshDuration;
+  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+  private final BigQueryServices bqServices;
+
+  public PeriodicallyRefreshTableSchema(
+      Duration refreshDuration,
+      DynamicDestinations<?, DestinationT> dynamicDestinations,
+      BigQueryServices bqServices) {
+    this.refreshDuration = refreshDuration;
+    this.dynamicDestinations = dynamicDestinations;
+    this.bqServices = bqServices;
+  }
+
+  @Override
+  public PCollection<TimestampedValue<Map<String, String>>> expand(
+      PCollection<KV<DestinationT, ElementT>> input) {
+    PCollection<TimestampedValue<Map<String, String>>> tableSchemas =
+        input
+            .apply("rewindow", Window.into(new GlobalWindows()))
+            .apply("dedup", ParDo.of(new DedupDoFn()))
+            .apply("refresh schema", ParDo.of(new RefreshTableSchemaDoFn()))
+            .apply(
+                "addTrigger",
+                Window.<TimestampedValue<Map<String, String>>>configure()
+                    
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
+                    .discardingFiredPanes());
+    tableSchemas.setCoder(
+        SnappyCoder.of(
+            TimestampedValue.TimestampedValueCoder.of(
+                MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    return tableSchemas;
+  }
+
+  private class DedupDoFn extends DoFn<KV<DestinationT, ElementT>, KV<Void, 
DestinationT>> {
+    Set<DestinationT> destinations = Sets.newHashSet();
+
+    @StartBundle
+    public void startBundle() {
+      destinations.clear();
+    }
+
+    @ProcessElement
+    public void process(@Element KV<DestinationT, ElementT> element) {
+      destinations.add(element.getKey());
+    }
+
+    @FinishBundle
+    public void finishBundle(FinishBundleContext context) {
+      for (DestinationT destination : destinations) {
+        context.output(
+            KV.of(null, destination), GlobalWindow.INSTANCE.maxTimestamp(), 
GlobalWindow.INSTANCE);
+      }
+    }
+  }
+
+  private class RefreshTableSchemaDoFn
+      extends DoFn<KV<Void, DestinationT>, TimestampedValue<Map<String, 
String>>> {
+    private static final String DESTINATIONS_TAG = "destinations";
+    private static final String DESTINATIONS_TIMER_TIME = "dest_timer_time";
+    private static final String DESTINATIONS_TIMER = "destinationsTimer";
+
+    @StateId(DESTINATIONS_TAG)
+    private final StateSpec<ValueState<Set<String>>> destinationsSpec = 
StateSpecs.value();
+
+    @StateId(DESTINATIONS_TIMER_TIME)
+    private final StateSpec<ValueState<Instant>> timerTimeSpec =
+        StateSpecs.value(InstantCoder.of());
+
+    @TimerId(DESTINATIONS_TIMER)
+    private final TimerSpec destinationsTimer = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    private @Nullable Map<String, String> currentOutput;

Review Comment:
   ```suggestion
       private Map<String, String> currentOutput = new HashMap<>();
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java:
##########
@@ -150,4 +150,12 @@
   Integer getStorageApiAppendThresholdRecordCount();
 
   void setStorageApiAppendThresholdRecordCount(Integer value);
+
+  @Description(
+      "Disable use of the storage API failed row collection; any failing rows 
will retry indefinitely if set."

Review Comment:
   Do you want to expose this as an option instead of passing a magic string in 
experiments to trigger the desired failure handling?
   
   If you do swap to using a magic string in experiments, you might want to 
cache whether the magic string exists as a member variable to not check the 
experiments list all the time.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -1932,6 +1937,141 @@ public TableRow apply(Long input) {
                 TableRow.class)));
   }
 
+  @Test
+  public void testUpdateTableSchemaPeriodicRefresh() throws Exception {
+    if (!useStreaming || !useStorageApi) {
+      return;
+    }
+    // Side inputs only update in between bundles. So we set 
schemaUpdateRetries to 1, to ensure
+    // that the conversion
+    // step gives up when the side input is disabled. We then have to disable 
the failed-rows
+    // collection (dead letter)
+    // to ensure that the bundle actually fails and is retried.
+    BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
+    bqOptions.setSchemaUpdateRetries(1);
+    bqOptions.setDisableStorageApiFailedRowsCollection(true);
+
+    BigQueryIO.Write.Method method =
+        useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : 
Method.STORAGE_WRITE_API;
+    p.enableAbandonedNodeEnforcement(false);
+
+    final String tableSpec1 = "project-id:dataset-id.table1";
+    final String tableSpec2 = "project-id:dataset-id.table2";
+    final String tableSpec3 = "project-id:dataset-id.table3";
+    final TableReference tableRef1 = 
BigQueryHelpers.parseTableSpec(tableSpec1);
+    final TableReference tableRef2 = 
BigQueryHelpers.parseTableSpec(tableSpec2);
+    final TableReference tableRef3 = 
BigQueryHelpers.parseTableSpec(tableSpec3);
+
+    TableSchema tableSchema =
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("name").setType("STRING"),
+                    new 
TableFieldSchema().setName("number").setType("INTEGER"),
+                    new 
TableFieldSchema().setName("tablespec").setType("STRING")));
+    fakeDatasetService.createTable(new 
Table().setTableReference(tableRef1).setSchema(tableSchema));
+    fakeDatasetService.createTable(new 
Table().setTableReference(tableRef2).setSchema(tableSchema));
+    fakeDatasetService.createTable(new 
Table().setTableReference(tableRef3).setSchema(tableSchema));
+
+    LongFunction<Iterable<TableRow>> getRow =
+        (LongFunction<Iterable<TableRow>> & Serializable)
+            (long i) -> {
+              TableRow tableRow;
+              if (i < 5) {
+                tableRow = new TableRow().set("name", "name" + 
i).set("number", Long.toString(i));
+              } else {
+                tableRow =
+                    new TableRow()
+                        .set("name", "name" + i)
+                        .set("number", Long.toString(i))
+                        .set("double_number", Long.toString(i * 2));
+              }
+              return ImmutableList.of(
+                  tableRow.clone().set("tablespec", tableSpec1),
+                  tableRow.clone().set("tablespec", tableSpec2),
+                  tableRow.clone().set("tablespec", tableSpec3));
+            };
+
+    SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination> 
tableFunction =
+        value -> new TableDestination((String) 
value.getValue().get("tablespec"), "");
+
+    final int numRowsPerTable = 1;
+    TestStream.Builder<Long> testStream =
+        TestStream.create(VarLongCoder.of()).advanceWatermarkTo(new 
Instant(0));
+    for (long i = 0; i < numRowsPerTable; ++i) {
+      testStream = testStream.addElements(i);
+    }
+    testStream = testStream.advanceProcessingTime(Duration.standardSeconds(5));
+

Review Comment:
   This does not seem to test the schema change happened since based upon the 
current logic it looks like you add the element `0` to test stream which maps 
to one row. This row doesn't contain the `double_number` field.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -1932,6 +1937,141 @@ public TableRow apply(Long input) {
                 TableRow.class)));
   }
 
+  @Test
+  public void testUpdateTableSchemaPeriodicRefresh() throws Exception {
+    if (!useStreaming || !useStorageApi) {
+      return;
+    }
+    // Side inputs only update in between bundles. So we set 
schemaUpdateRetries to 1, to ensure
+    // that the conversion
+    // step gives up when the side input is disabled. We then have to disable 
the failed-rows
+    // collection (dead letter)
+    // to ensure that the bundle actually fails and is retried.
+    BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
+    bqOptions.setSchemaUpdateRetries(1);
+    bqOptions.setDisableStorageApiFailedRowsCollection(true);
+
+    BigQueryIO.Write.Method method =
+        useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : 
Method.STORAGE_WRITE_API;
+    p.enableAbandonedNodeEnforcement(false);
+
+    final String tableSpec1 = "project-id:dataset-id.table1";
+    final String tableSpec2 = "project-id:dataset-id.table2";
+    final String tableSpec3 = "project-id:dataset-id.table3";
+    final TableReference tableRef1 = 
BigQueryHelpers.parseTableSpec(tableSpec1);
+    final TableReference tableRef2 = 
BigQueryHelpers.parseTableSpec(tableSpec2);
+    final TableReference tableRef3 = 
BigQueryHelpers.parseTableSpec(tableSpec3);
+
+    TableSchema tableSchema =
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("name").setType("STRING"),
+                    new 
TableFieldSchema().setName("number").setType("INTEGER"),
+                    new 
TableFieldSchema().setName("tablespec").setType("STRING")));
+    fakeDatasetService.createTable(new 
Table().setTableReference(tableRef1).setSchema(tableSchema));
+    fakeDatasetService.createTable(new 
Table().setTableReference(tableRef2).setSchema(tableSchema));
+    fakeDatasetService.createTable(new 
Table().setTableReference(tableRef3).setSchema(tableSchema));
+
+    LongFunction<Iterable<TableRow>> getRow =
+        (LongFunction<Iterable<TableRow>> & Serializable)
+            (long i) -> {
+              TableRow tableRow;
+              if (i < 5) {
+                tableRow = new TableRow().set("name", "name" + 
i).set("number", Long.toString(i));
+              } else {
+                tableRow =
+                    new TableRow()
+                        .set("name", "name" + i)
+                        .set("number", Long.toString(i))
+                        .set("double_number", Long.toString(i * 2));
+              }
+              return ImmutableList.of(
+                  tableRow.clone().set("tablespec", tableSpec1),
+                  tableRow.clone().set("tablespec", tableSpec2),
+                  tableRow.clone().set("tablespec", tableSpec3));
+            };
+
+    SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination> 
tableFunction =
+        value -> new TableDestination((String) 
value.getValue().get("tablespec"), "");
+
+    final int numRowsPerTable = 1;
+    TestStream.Builder<Long> testStream =
+        TestStream.create(VarLongCoder.of()).advanceWatermarkTo(new 
Instant(0));
+    for (long i = 0; i < numRowsPerTable; ++i) {
+      testStream = testStream.addElements(i);
+    }
+    testStream = testStream.advanceProcessingTime(Duration.standardSeconds(5));
+
+    PCollection<TableRow> tableRows =
+        p.apply(testStream.advanceWatermarkToInfinity())
+            .apply(
+                MapElements.via(
+                    new SimpleFunction<Long, Iterable<TableRow>>() {
+                      @Override
+                      public Iterable<TableRow> apply(Long input) {
+                        return getRow.apply(input);
+                      }
+                    }))
+            .setCoder(IterableCoder.of(TableRowJsonCoder.of()))
+            .apply(Flatten.iterables());
+    tableRows.apply(
+        BigQueryIO.writeTableRows()
+            .to(tableFunction)
+            .withMethod(method)
+            
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
+            .withSchemaRefreshFrequency(Duration.standardSeconds(2))
+            .withTestServices(fakeBqServices)
+            .withoutValidation());
+
+    Thread thread =
+        new Thread(
+            () -> {
+              try {
+                Thread.sleep(5000);

Review Comment:
   Is there a way where we can reliably advance the schema using teststream and 
injecting a special element that causes the fakeDatasetService to be updated 
instead of relying on Thread.sleep?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -297,16 +299,63 @@ public String toString() {
     }
   }
 
+  static class SchemaFromViewDestinationsListOfMaps<T, DestinationT>
+      extends DelegatingDynamicDestinations<T, DestinationT> {
+    PCollectionView<List<TimestampedValue<Map<String, String>>>> schemaView;
+
+    SchemaFromViewDestinationsListOfMaps(
+        DynamicDestinations<T, DestinationT> inner,
+        PCollectionView<List<TimestampedValue<Map<String, String>>>> 
schemaView) {
+      super(inner);
+      checkArgument(schemaView != null, "schemaView can not be null");
+      this.schemaView = schemaView;
+    }
+
+    @Override
+    public List<PCollectionView<?>> getSideInputs() {
+      return 
ImmutableList.<PCollectionView<?>>builder().add(schemaView).build();
+    }
+
+    @Override
+    public @Nullable TableSchema getSchema(DestinationT destination) {
+      List<TimestampedValue<Map<String, String>>> mapValues = 
sideInput(schemaView);
+      Optional<Map<String, String>> mapValue =
+          mapValues.stream()
+              .max(Comparator.comparing(TimestampedValue::getTimestamp))
+              .map(TimestampedValue::getValue);
+
+      TableDestination tableDestination = inner.getTable(destination);
+      @Nullable
+      String schema = mapValue.map(m -> 
m.get(tableDestination.getTableSpec())).orElse(null);

Review Comment:
   ditto on simple null check vs relying on optional



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java:
##########
@@ -150,4 +150,12 @@
   Integer getStorageApiAppendThresholdRecordCount();
 
   void setStorageApiAppendThresholdRecordCount(Integer value);
+
+  @Description(
+      "Disable use of the storage API failed row collection; any failing rows 
will retry indefinitely if set."
+          + "For internal testing only.")
+  @Default.Boolean(false)

Review Comment:
   ```suggestion
     @Hidden
     @Default.Boolean(false)
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PeriodicallyRefreshTableSchema.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.SnappyCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** This transform periodically recalculates the schema for each destination 
table. */
+@SuppressWarnings({"unused"})
+class PeriodicallyRefreshTableSchema<DestinationT, ElementT>
+    extends PTransform<
+        PCollection<KV<DestinationT, ElementT>>,
+        PCollection<TimestampedValue<Map<String, String>>>> {
+  private final Duration refreshDuration;
+  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+  private final BigQueryServices bqServices;
+
+  public PeriodicallyRefreshTableSchema(
+      Duration refreshDuration,
+      DynamicDestinations<?, DestinationT> dynamicDestinations,
+      BigQueryServices bqServices) {
+    this.refreshDuration = refreshDuration;
+    this.dynamicDestinations = dynamicDestinations;
+    this.bqServices = bqServices;
+  }
+
+  @Override
+  public PCollection<TimestampedValue<Map<String, String>>> expand(
+      PCollection<KV<DestinationT, ElementT>> input) {
+    PCollection<TimestampedValue<Map<String, String>>> tableSchemas =
+        input
+            .apply("rewindow", Window.into(new GlobalWindows()))
+            .apply("dedup", ParDo.of(new DedupDoFn()))
+            .apply("refresh schema", ParDo.of(new RefreshTableSchemaDoFn()))
+            .apply(
+                "addTrigger",
+                Window.<TimestampedValue<Map<String, String>>>configure()
+                    
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
+                    .discardingFiredPanes());
+    tableSchemas.setCoder(
+        SnappyCoder.of(
+            TimestampedValue.TimestampedValueCoder.of(
+                MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    return tableSchemas;
+  }
+
+  private class DedupDoFn extends DoFn<KV<DestinationT, ElementT>, KV<Void, 
DestinationT>> {
+    Set<DestinationT> destinations = Sets.newHashSet();

Review Comment:
   ```suggestion
       private final Set<DestinationT> destinations = Sets.newHashSet();
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PeriodicallyRefreshTableSchema.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.SnappyCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** This transform periodically recalculates the schema for each destination 
table. */
+@SuppressWarnings({"unused"})
+class PeriodicallyRefreshTableSchema<DestinationT, ElementT>
+    extends PTransform<
+        PCollection<KV<DestinationT, ElementT>>,
+        PCollection<TimestampedValue<Map<String, String>>>> {
+  private final Duration refreshDuration;
+  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+  private final BigQueryServices bqServices;
+
+  public PeriodicallyRefreshTableSchema(
+      Duration refreshDuration,
+      DynamicDestinations<?, DestinationT> dynamicDestinations,
+      BigQueryServices bqServices) {
+    this.refreshDuration = refreshDuration;
+    this.dynamicDestinations = dynamicDestinations;
+    this.bqServices = bqServices;
+  }
+
+  @Override
+  public PCollection<TimestampedValue<Map<String, String>>> expand(
+      PCollection<KV<DestinationT, ElementT>> input) {
+    PCollection<TimestampedValue<Map<String, String>>> tableSchemas =
+        input
+            .apply("rewindow", Window.into(new GlobalWindows()))
+            .apply("dedup", ParDo.of(new DedupDoFn()))
+            .apply("refresh schema", ParDo.of(new RefreshTableSchemaDoFn()))
+            .apply(
+                "addTrigger",
+                Window.<TimestampedValue<Map<String, String>>>configure()
+                    
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
+                    .discardingFiredPanes());
+    tableSchemas.setCoder(
+        SnappyCoder.of(
+            TimestampedValue.TimestampedValueCoder.of(
+                MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    return tableSchemas;
+  }
+
+  private class DedupDoFn extends DoFn<KV<DestinationT, ElementT>, KV<Void, 
DestinationT>> {
+    Set<DestinationT> destinations = Sets.newHashSet();
+
+    @StartBundle
+    public void startBundle() {
+      destinations.clear();
+    }
+
+    @ProcessElement
+    public void process(@Element KV<DestinationT, ElementT> element) {
+      destinations.add(element.getKey());
+    }
+
+    @FinishBundle
+    public void finishBundle(FinishBundleContext context) {
+      for (DestinationT destination : destinations) {
+        context.output(
+            KV.of(null, destination), GlobalWindow.INSTANCE.maxTimestamp(), 
GlobalWindow.INSTANCE);
+      }
+    }
+  }
+
+  private class RefreshTableSchemaDoFn
+      extends DoFn<KV<Void, DestinationT>, TimestampedValue<Map<String, 
String>>> {
+    private static final String DESTINATIONS_TAG = "destinations";
+    private static final String DESTINATIONS_TIMER_TIME = "dest_timer_time";
+    private static final String DESTINATIONS_TIMER = "destinationsTimer";
+
+    @StateId(DESTINATIONS_TAG)
+    private final StateSpec<ValueState<Set<String>>> destinationsSpec = 
StateSpecs.value();
+
+    @StateId(DESTINATIONS_TIMER_TIME)
+    private final StateSpec<ValueState<Instant>> timerTimeSpec =
+        StateSpecs.value(InstantCoder.of());
+
+    @TimerId(DESTINATIONS_TIMER)
+    private final TimerSpec destinationsTimer = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    private @Nullable Map<String, String> currentOutput;
+    private @Nullable Instant currentOutputTs;
+
+    private transient @Nullable BigQueryServices.DatasetService 
datasetServiceInternal = null;
+
+    @StartBundle
+    public void startBundle() {
+      initOutputMap();
+    }
+
+    public void initOutputMap() {
+      if (currentOutput == null) {
+        currentOutput = Maps.newHashMap();
+      } else {
+        currentOutput.clear();
+      }
+    }
+

Review Comment:
   ```suggestion
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PeriodicallyRefreshTableSchema.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.SnappyCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** This transform periodically recalculates the schema for each destination 
table. */
+@SuppressWarnings({"unused"})
+class PeriodicallyRefreshTableSchema<DestinationT, ElementT>
+    extends PTransform<
+        PCollection<KV<DestinationT, ElementT>>,
+        PCollection<TimestampedValue<Map<String, String>>>> {
+  private final Duration refreshDuration;
+  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+  private final BigQueryServices bqServices;
+
+  public PeriodicallyRefreshTableSchema(
+      Duration refreshDuration,
+      DynamicDestinations<?, DestinationT> dynamicDestinations,
+      BigQueryServices bqServices) {
+    this.refreshDuration = refreshDuration;
+    this.dynamicDestinations = dynamicDestinations;
+    this.bqServices = bqServices;
+  }
+
+  @Override
+  public PCollection<TimestampedValue<Map<String, String>>> expand(
+      PCollection<KV<DestinationT, ElementT>> input) {
+    PCollection<TimestampedValue<Map<String, String>>> tableSchemas =
+        input
+            .apply("rewindow", Window.into(new GlobalWindows()))
+            .apply("dedup", ParDo.of(new DedupDoFn()))
+            .apply("refresh schema", ParDo.of(new RefreshTableSchemaDoFn()))
+            .apply(
+                "addTrigger",
+                Window.<TimestampedValue<Map<String, String>>>configure()
+                    
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
+                    .discardingFiredPanes());
+    tableSchemas.setCoder(
+        SnappyCoder.of(
+            TimestampedValue.TimestampedValueCoder.of(
+                MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    return tableSchemas;
+  }
+
+  private class DedupDoFn extends DoFn<KV<DestinationT, ElementT>, KV<Void, 
DestinationT>> {
+    Set<DestinationT> destinations = Sets.newHashSet();
+
+    @StartBundle
+    public void startBundle() {
+      destinations.clear();
+    }
+
+    @ProcessElement
+    public void process(@Element KV<DestinationT, ElementT> element) {
+      destinations.add(element.getKey());
+    }
+
+    @FinishBundle
+    public void finishBundle(FinishBundleContext context) {
+      for (DestinationT destination : destinations) {
+        context.output(
+            KV.of(null, destination), GlobalWindow.INSTANCE.maxTimestamp(), 
GlobalWindow.INSTANCE);
+      }
+    }
+  }
+
+  private class RefreshTableSchemaDoFn
+      extends DoFn<KV<Void, DestinationT>, TimestampedValue<Map<String, 
String>>> {
+    private static final String DESTINATIONS_TAG = "destinations";
+    private static final String DESTINATIONS_TIMER_TIME = "dest_timer_time";
+    private static final String DESTINATIONS_TIMER = "destinationsTimer";
+
+    @StateId(DESTINATIONS_TAG)
+    private final StateSpec<ValueState<Set<String>>> destinationsSpec = 
StateSpecs.value();
+
+    @StateId(DESTINATIONS_TIMER_TIME)
+    private final StateSpec<ValueState<Instant>> timerTimeSpec =
+        StateSpecs.value(InstantCoder.of());
+
+    @TimerId(DESTINATIONS_TIMER)
+    private final TimerSpec destinationsTimer = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    private @Nullable Map<String, String> currentOutput;
+    private @Nullable Instant currentOutputTs;
+
+    private transient @Nullable BigQueryServices.DatasetService 
datasetServiceInternal = null;
+
+    @StartBundle
+    public void startBundle() {
+      initOutputMap();
+    }
+
+    public void initOutputMap() {
+      if (currentOutput == null) {
+        currentOutput = Maps.newHashMap();
+      } else {
+        currentOutput.clear();
+      }
+    }
+
+    @ProcessElement
+    public void process(
+        @Element KV<Void, DestinationT> destination,
+        @AlwaysFetched @StateId(DESTINATIONS_TAG) ValueState<Set<String>> 
destinations,
+        @AlwaysFetched @StateId(DESTINATIONS_TIMER_TIME) ValueState<Instant> 
timerTime,
+        @TimerId(DESTINATIONS_TIMER) Timer destinationsTimer)
+        throws IOException, InterruptedException {
+      Set<String> newSet = destinations.read();
+      if (newSet == null) {
+        newSet = Sets.newHashSet();
+      }
+      if 
(newSet.add(dynamicDestinations.getTable(destination.getValue()).getTableSpec()))
 {
+        destinations.write(newSet);
+        if (timerTime.read() == null) {
+          
timerTime.write(destinationsTimer.getCurrentRelativeTime().plus(refreshDuration));
+          
destinationsTimer.set(Preconditions.checkStateNotNull(timerTime.read()));
+        }
+      }
+    }
+
+    @OnTimer(DESTINATIONS_TIMER)
+    public void onTimer(
+        @AlwaysFetched @StateId(DESTINATIONS_TAG) ValueState<Set<String>> 
destinations,
+        @AlwaysFetched @StateId(DESTINATIONS_TIMER_TIME) ValueState<Instant> 
timerTime,
+        @TimerId(DESTINATIONS_TIMER) Timer destinationsTimer,
+        OnTimerContext onTimerContext,
+        PipelineOptions pipelineOptions)
+        throws IOException, InterruptedException {
+      Map<String, String> outputMap = 
Preconditions.checkStateNotNull(currentOutput);
+      currentOutputTs = timerTime.read();
+
+      Set<String> allDestinations = destinations.read();
+      BigQueryServices.DatasetService datasetService = 
getDatasetService(pipelineOptions);
+      for (String tableSpec : allDestinations) {
+        TableReference tableReference = 
BigQueryHelpers.parseTableSpec(tableSpec);
+        Table table = datasetService.getTable(tableReference);
+        if (table != null) {
+          outputMap.put(tableSpec, 
BigQueryHelpers.toJsonString(table.getSchema()));

Review Comment:
   ```suggestion
         currentOutputTs = timerTime.read();
   
         Set<String> allDestinations = destinations.read();
         BigQueryServices.DatasetService datasetService = 
getDatasetService(pipelineOptions);
         for (String tableSpec : allDestinations) {
           TableReference tableReference = 
BigQueryHelpers.parseTableSpec(tableSpec);
           Table table = datasetService.getTable(tableReference);
           if (table != null) {
             currentOutput.put(tableSpec, 
BigQueryHelpers.toJsonString(table.getSchema()));
   ```



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -1932,6 +1937,141 @@ public TableRow apply(Long input) {
                 TableRow.class)));
   }
 
+  @Test
+  public void testUpdateTableSchemaPeriodicRefresh() throws Exception {
+    if (!useStreaming || !useStorageApi) {
+      return;
+    }

Review Comment:
   ```suggestion
       assumeTrue(useStreaming);
       assumeTrue(useStorageApi);
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PeriodicallyRefreshTableSchema.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.SnappyCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** This transform periodically recalculates the schema for each destination 
table. */
+@SuppressWarnings({"unused"})
+class PeriodicallyRefreshTableSchema<DestinationT, ElementT>
+    extends PTransform<
+        PCollection<KV<DestinationT, ElementT>>,
+        PCollection<TimestampedValue<Map<String, String>>>> {
+  private final Duration refreshDuration;
+  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+  private final BigQueryServices bqServices;
+
+  public PeriodicallyRefreshTableSchema(
+      Duration refreshDuration,
+      DynamicDestinations<?, DestinationT> dynamicDestinations,
+      BigQueryServices bqServices) {
+    this.refreshDuration = refreshDuration;
+    this.dynamicDestinations = dynamicDestinations;
+    this.bqServices = bqServices;
+  }
+
+  @Override
+  public PCollection<TimestampedValue<Map<String, String>>> expand(
+      PCollection<KV<DestinationT, ElementT>> input) {
+    PCollection<TimestampedValue<Map<String, String>>> tableSchemas =
+        input
+            .apply("rewindow", Window.into(new GlobalWindows()))
+            .apply("dedup", ParDo.of(new DedupDoFn()))
+            .apply("refresh schema", ParDo.of(new RefreshTableSchemaDoFn()))
+            .apply(
+                "addTrigger",
+                Window.<TimestampedValue<Map<String, String>>>configure()
+                    
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
+                    .discardingFiredPanes());
+    tableSchemas.setCoder(
+        SnappyCoder.of(
+            TimestampedValue.TimestampedValueCoder.of(
+                MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
+    return tableSchemas;
+  }
+
+  private class DedupDoFn extends DoFn<KV<DestinationT, ElementT>, KV<Void, 
DestinationT>> {
+    Set<DestinationT> destinations = Sets.newHashSet();
+
+    @StartBundle
+    public void startBundle() {
+      destinations.clear();
+    }
+
+    @ProcessElement
+    public void process(@Element KV<DestinationT, ElementT> element) {
+      destinations.add(element.getKey());
+    }
+
+    @FinishBundle
+    public void finishBundle(FinishBundleContext context) {
+      for (DestinationT destination : destinations) {
+        context.output(
+            KV.of(null, destination), GlobalWindow.INSTANCE.maxTimestamp(), 
GlobalWindow.INSTANCE);
+      }
+    }
+  }
+
+  private class RefreshTableSchemaDoFn
+      extends DoFn<KV<Void, DestinationT>, TimestampedValue<Map<String, 
String>>> {
+    private static final String DESTINATIONS_TAG = "destinations";
+    private static final String DESTINATIONS_TIMER_TIME = "dest_timer_time";
+    private static final String DESTINATIONS_TIMER = "destinationsTimer";
+
+    @StateId(DESTINATIONS_TAG)
+    private final StateSpec<ValueState<Set<String>>> destinationsSpec = 
StateSpecs.value();
+
+    @StateId(DESTINATIONS_TIMER_TIME)
+    private final StateSpec<ValueState<Instant>> timerTimeSpec =
+        StateSpecs.value(InstantCoder.of());
+
+    @TimerId(DESTINATIONS_TIMER)
+    private final TimerSpec destinationsTimer = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    private @Nullable Map<String, String> currentOutput;
+    private @Nullable Instant currentOutputTs;
+
+    private transient @Nullable BigQueryServices.DatasetService 
datasetServiceInternal = null;
+
+    @StartBundle
+    public void startBundle() {
+      initOutputMap();
+    }
+
+    public void initOutputMap() {
+      if (currentOutput == null) {
+        currentOutput = Maps.newHashMap();
+      } else {
+        currentOutput.clear();
+      }
+    }
+
+    @ProcessElement
+    public void process(
+        @Element KV<Void, DestinationT> destination,
+        @AlwaysFetched @StateId(DESTINATIONS_TAG) ValueState<Set<String>> 
destinations,
+        @AlwaysFetched @StateId(DESTINATIONS_TIMER_TIME) ValueState<Instant> 
timerTime,
+        @TimerId(DESTINATIONS_TIMER) Timer destinationsTimer)
+        throws IOException, InterruptedException {
+      Set<String> newSet = destinations.read();
+      if (newSet == null) {
+        newSet = Sets.newHashSet();
+      }
+      if 
(newSet.add(dynamicDestinations.getTable(destination.getValue()).getTableSpec()))
 {
+        destinations.write(newSet);
+        if (timerTime.read() == null) {
+          
timerTime.write(destinationsTimer.getCurrentRelativeTime().plus(refreshDuration));
+          
destinationsTimer.set(Preconditions.checkStateNotNull(timerTime.read()));
+        }
+      }
+    }
+
+    @OnTimer(DESTINATIONS_TIMER)
+    public void onTimer(
+        @AlwaysFetched @StateId(DESTINATIONS_TAG) ValueState<Set<String>> 
destinations,
+        @AlwaysFetched @StateId(DESTINATIONS_TIMER_TIME) ValueState<Instant> 
timerTime,
+        @TimerId(DESTINATIONS_TIMER) Timer destinationsTimer,
+        OnTimerContext onTimerContext,
+        PipelineOptions pipelineOptions)
+        throws IOException, InterruptedException {
+      Map<String, String> outputMap = 
Preconditions.checkStateNotNull(currentOutput);
+      currentOutputTs = timerTime.read();
+
+      Set<String> allDestinations = destinations.read();
+      BigQueryServices.DatasetService datasetService = 
getDatasetService(pipelineOptions);
+      for (String tableSpec : allDestinations) {
+        TableReference tableReference = 
BigQueryHelpers.parseTableSpec(tableSpec);
+        Table table = datasetService.getTable(tableReference);
+        if (table != null) {
+          outputMap.put(tableSpec, 
BigQueryHelpers.toJsonString(table.getSchema()));
+        }
+      }
+
+      timerTime.write(timerTime.read().plus(refreshDuration));
+      destinationsTimer.set(timerTime.read());
+    }
+
+    @FinishBundle
+    public void finishBundle(FinishBundleContext c) {
+      Map<String, String> outputMap = 
Preconditions.checkStateNotNull(currentOutput);
+      if (!outputMap.isEmpty()) {
+        c.output(
+            TimestampedValue.of(outputMap, 
Preconditions.checkStateNotNull(currentOutputTs)),
+            GlobalWindow.INSTANCE.maxTimestamp(),
+            GlobalWindow.INSTANCE);
+      }

Review Comment:
   ```suggestion
         if (!currentOutput.isEmpty()) {
           c.output(
               TimestampedValue.of(currentOutput, 
Preconditions.checkStateNotNull(currentOutputTs)),
               GlobalWindow.INSTANCE.maxTimestamp(),
               GlobalWindow.INSTANCE);
         }
         currentOutput = new HashMap<>();
         currentOutputTs = null;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to