Repository: beam
Updated Branches:
  refs/heads/master 17f0843eb -> a1d82c203


http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
index 73d8eb7..1e451cc 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
@@ -17,9 +17,7 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
-import com.google.common.base.Strings;
 import java.io.IOException;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -35,46 +33,34 @@ import org.apache.beam.sdk.values.ValueInSingleWindow;
  * which tables each element is written to, and format the element into a 
{@link TableRow} using the
  * user-supplied format function.
  */
-public class PrepareWrite<T>
-    extends PTransform<PCollection<T>, PCollection<KV<TableDestination, 
TableRow>>> {
-  private SerializableFunction<ValueInSingleWindow<T>, TableDestination> 
tableFunction;
+public class PrepareWrite<T, DestinationT>
+    extends PTransform<PCollection<T>, PCollection<KV<DestinationT, 
TableRow>>> {
+  private DynamicDestinations<T, DestinationT> dynamicDestinations;
   private SerializableFunction<T, TableRow> formatFunction;
 
   public PrepareWrite(
-      SerializableFunction<ValueInSingleWindow<T>, TableDestination> 
tableFunction,
+      DynamicDestinations<T, DestinationT> dynamicDestinations,
       SerializableFunction<T, TableRow> formatFunction) {
-    this.tableFunction = tableFunction;
+    this.dynamicDestinations = dynamicDestinations;
     this.formatFunction = formatFunction;
   }
 
   @Override
-  public PCollection<KV<TableDestination, TableRow>> expand(PCollection<T> 
input) {
+  public PCollection<KV<DestinationT, TableRow>> expand(PCollection<T> input) {
     return input.apply(
         ParDo.of(
-            new DoFn<T, KV<TableDestination, TableRow>>() {
+            new DoFn<T, KV<DestinationT, TableRow>>() {
               @ProcessElement
               public void processElement(ProcessContext context, BoundedWindow 
window)
                   throws IOException {
-                TableDestination tableDestination =
-                    tableSpecFromWindowedValue(
-                        context.getPipelineOptions().as(BigQueryOptions.class),
+                
dynamicDestinations.setSideInputAccessorFromProcessContext(context);
+                DestinationT tableDestination =
+                    dynamicDestinations.getDestination(
                         ValueInSingleWindow.of(
                             context.element(), context.timestamp(), window, 
context.pane()));
                 TableRow tableRow = formatFunction.apply(context.element());
                 context.output(KV.of(tableDestination, tableRow));
               }
-            }));
-  }
-
-  private TableDestination tableSpecFromWindowedValue(
-      BigQueryOptions options, ValueInSingleWindow<T> value) {
-    TableDestination tableDestination = tableFunction.apply(value);
-    TableReference tableReference = tableDestination.getTableReference();
-    if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
-      tableReference.setProjectId(options.getProject());
-      tableDestination =
-          new TableDestination(tableReference, 
tableDestination.getTableDescription());
-    }
-    return tableDestination;
+            }).withSideInputs(dynamicDestinations.getSideInputs()));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
index efd9c31..9cb0027 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
@@ -19,42 +19,33 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
 /**
-* PTransform that performs streaming BigQuery write. To increase consistency,
-* it leverages BigQuery best effort de-dup mechanism.
+ * PTransform that performs streaming BigQuery write. To increase consistency, 
it leverages
+ * BigQuery's best effort de-dup mechanism.
  */
-public class StreamingInserts extends
-    PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
-  private final Write<?> write;
+public class StreamingInserts<DestinationT>
+    extends PTransform<PCollection<KV<DestinationT, TableRow>>, WriteResult> {
+  private BigQueryServices bigQueryServices;
+  private final CreateDisposition createDisposition;
+  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
 
-  private static class ConstantSchemaFunction implements
-      SerializableFunction<TableDestination, TableSchema> {
-    private final @Nullable String jsonSchema;
-
-    ConstantSchemaFunction(TableSchema schema) {
-      this.jsonSchema = BigQueryHelpers.toJsonString(schema);
-    }
-
-    @Override
-    @Nullable
-    public TableSchema apply(TableDestination table) {
-      return BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class);
-    }
+  /** Constructor. */
+  StreamingInserts(CreateDisposition createDisposition,
+                   DynamicDestinations<?, DestinationT> dynamicDestinations) {
+    this.createDisposition = createDisposition;
+    this.dynamicDestinations = dynamicDestinations;
+    this.bigQueryServices = new BigQueryServicesImpl();
   }
 
-  /** Constructor. */
-  StreamingInserts(Write<?> write) {
-    this.write = write;
+  void setTestServices(BigQueryServices bigQueryServices) {
+    this.bigQueryServices = bigQueryServices;
   }
 
   @Override
@@ -63,17 +54,13 @@ public class StreamingInserts extends
   }
 
   @Override
-  public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) 
{
-    // Since BigQueryIO.java does not yet have support for per-table schemas, 
inject a constant
-    // schema function here. If no schema is specified, this function will 
return null.
-    SerializableFunction<TableDestination, TableSchema> schemaFunction =
-        new ConstantSchemaFunction(write.getSchema());
-
-    PCollection<KV<TableDestination, TableRow>> writes = input
-        .apply("CreateTables", new CreateTables(write.getCreateDisposition(), 
schemaFunction)
-                .withTestServices(write.getBigQueryServices()));
+  public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
+    PCollection<KV<TableDestination, TableRow>> writes =
+        input.apply(
+            "CreateTables",
+            new CreateTables<DestinationT>(createDisposition, 
dynamicDestinations)
+                .withTestServices(bigQueryServices));
 
-    return writes.apply(new StreamingWriteTables()
-        .withTestServices(write.getBigQueryServices()));
+    return writes.apply(new 
StreamingWriteTables().withTestServices(bigQueryServices));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
index 4d130b6..20b47e1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
@@ -22,6 +22,9 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -77,6 +80,11 @@ public class StreamingWriteTables extends PTransform<
     tagged
         .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), 
TableRowInfoCoder.of()))
         .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
+        // Put in the global window to ensure that DynamicDestinations side 
inputs are accessed
+        // correctly.
+        .apply("GlobalWindow",
+            Window.<KV<ShardedKey<String>, TableRowInfo>>into(new 
GlobalWindows())
+            .triggering(DefaultTrigger.of()).discardingFiredPanes())
         .apply("StreamingWrite",
             ParDo.of(
                 new StreamingWriteFn(bigQueryServices)));

http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
index c418804..8a06d13 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -41,7 +41,7 @@ public class TableDestinationCoder extends 
CustomCoder<TableDestination> {
       throw new CoderException("cannot encode a null value");
     }
     stringCoder.encode(value.getTableSpec(), outStream, context.nested());
-    stringCoder.encode(value.getTableDescription(), outStream, context);
+    stringCoder.encode(value.getTableDescription(), outStream, 
context.nested());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index 5f89067..4f609b2 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -26,6 +26,7 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -37,64 +38,66 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Writes each bundle of {@link TableRow} elements out to a separate file using
- * {@link TableRowWriter}.
+ * Writes each bundle of {@link TableRow} elements out to a separate file 
using {@link
+ * TableRowWriter}.
  */
-class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, 
WriteBundlesToFiles.Result> {
+class WriteBundlesToFiles<DestinationT>
+    extends DoFn<KV<DestinationT, TableRow>, 
WriteBundlesToFiles.Result<DestinationT>> {
   private static final Logger LOG = 
LoggerFactory.getLogger(WriteBundlesToFiles.class);
 
   // Map from tablespec to a writer for that table.
-  private transient Map<TableDestination, TableRowWriter> writers;
+  private transient Map<DestinationT, TableRowWriter> writers;
   private final String tempFilePrefix;
 
   /**
    * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a 
single output file,
    * and encapsulates the table it is destined to as well as the file byte 
size.
    */
-  public static final class Result implements Serializable {
+  public static final class Result<DestinationT> implements Serializable {
     private static final long serialVersionUID = 1L;
     public final String filename;
     public final Long fileByteSize;
-    public final TableDestination tableDestination;
+    public final DestinationT destination;
 
-    public Result(String filename, Long fileByteSize, TableDestination 
tableDestination) {
+    public Result(String filename, Long fileByteSize, DestinationT 
destination) {
       this.filename = filename;
       this.fileByteSize = fileByteSize;
-      this.tableDestination = tableDestination;
+      this.destination = destination;
     }
   }
 
-  /**
-   * a coder for the {@link Result} class.
-   */
-  public static class ResultCoder extends CustomCoder<Result> {
-    private static final ResultCoder INSTANCE = new ResultCoder();
+  /** a coder for the {@link Result} class. */
+  public static class ResultCoder<DestinationT> extends 
CustomCoder<Result<DestinationT>> {
     private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
     private static final VarLongCoder longCoder = VarLongCoder.of();
-    private static final TableDestinationCoder tableDestinationCoder = 
TableDestinationCoder.of();
+    private final Coder<DestinationT> destinationCoder;
 
-    public static ResultCoder of() {
-      return INSTANCE;
+    public static <DestinationT> ResultCoder<DestinationT> of(
+        Coder<DestinationT> destinationCoder) {
+      return new ResultCoder<>(destinationCoder);
+    }
+
+    ResultCoder(Coder<DestinationT> destinationCoder) {
+      this.destinationCoder = destinationCoder;
     }
 
     @Override
-    public void encode(Result value, OutputStream outStream, Context context)
+    public void encode(Result<DestinationT> value, OutputStream outStream, 
Context context)
         throws IOException {
       if (value == null) {
         throw new CoderException("cannot encode a null value");
       }
       stringCoder.encode(value.filename, outStream, context.nested());
       longCoder.encode(value.fileByteSize, outStream, context.nested());
-      tableDestinationCoder.encode(value.tableDestination, outStream, 
context.nested());
+      destinationCoder.encode(value.destination, outStream, context.nested());
     }
 
     @Override
-    public Result decode(InputStream inStream, Context context)
-        throws IOException {
+    public Result<DestinationT> decode(InputStream inStream, Context context) 
throws IOException {
       String filename = stringCoder.decode(inStream, context.nested());
       long fileByteSize = longCoder.decode(inStream, context.nested());
-      TableDestination tableDestination = 
tableDestinationCoder.decode(inStream, context.nested());
-      return new Result(filename, fileByteSize, tableDestination);
+      DestinationT destination = destinationCoder.decode(inStream, 
context.nested());
+      return new Result<>(filename, fileByteSize, destination);
     }
 
     @Override
@@ -138,9 +141,9 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, 
TableRow>, WriteBund
 
   @FinishBundle
   public void finishBundle(Context c) throws Exception {
-    for (Map.Entry<TableDestination, TableRowWriter> entry : 
writers.entrySet()) {
+    for (Map.Entry<DestinationT, TableRowWriter> entry : writers.entrySet()) {
       TableRowWriter.Result result = entry.getValue().close();
-      c.output(new Result(result.resourceId.toString(), result.byteSize, 
entry.getKey()));
+      c.output(new Result<>(result.resourceId.toString(), result.byteSize, 
entry.getKey()));
     }
     writers.clear();
   }
@@ -149,8 +152,7 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, 
TableRow>, WriteBund
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
 
-    builder
-        .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
-            .withLabel("Temporary File Prefix"));
+    builder.addIfNotNull(
+        DisplayData.item("tempFilePrefix", 
tempFilePrefix).withLabel("Temporary File Prefix"));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index 0ae1768..4136fa0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import com.google.api.services.bigquery.model.TableReference;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.util.List;
@@ -27,7 +26,6 @@ import java.util.UUID;
 
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
-import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -37,12 +35,12 @@ import org.apache.beam.sdk.values.TupleTag;
  * Partitions temporary files based on number of files and file sizes. Output 
key is a pair of
  * tablespec and the list of files corresponding to each partition of that 
table.
  */
-class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, 
List<String>>> {
-  private final ValueProvider<String> singletonOutputJsonTableRef;
-  private final String singletonOutputTableDescription;
-  private final PCollectionView<Iterable<WriteBundlesToFiles.Result>> 
resultsView;
-  private TupleTag<KV<ShardedKey<TableDestination>, List<String>>> 
multiPartitionsTag;
-  private TupleTag<KV<ShardedKey<TableDestination>, List<String>>> 
singlePartitionTag;
+class WritePartition<DestinationT>
+    extends DoFn<String, KV<ShardedKey<DestinationT>, List<String>>> {
+  private final boolean singletonTable;
+  private final 
PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> resultsView;
+  private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> 
multiPartitionsTag;
+  private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> 
singlePartitionTag;
 
   private static class PartitionData {
     private int numFiles = 0;
@@ -66,7 +64,7 @@ class WritePartition extends DoFn<String, 
KV<ShardedKey<TableDestination>, List<
     }
 
     List<String> getFilenames() {
-      return  filenames;
+      return filenames;
     }
 
     void addFilename(String filename) {
@@ -98,18 +96,16 @@ class WritePartition extends DoFn<String, 
KV<ShardedKey<TableDestination>, List<
     }
 
     void addPartition(PartitionData partition) {
-       partitions.add(partition);
+      partitions.add(partition);
     }
   }
 
   WritePartition(
-      ValueProvider<String> singletonOutputJsonTableRef,
-      String singletonOutputTableDescription,
-      PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView,
-      TupleTag<KV<ShardedKey<TableDestination>, List<String>>> 
multiPartitionsTag,
-      TupleTag<KV<ShardedKey<TableDestination>, List<String>>> 
singlePartitionTag) {
-    this.singletonOutputJsonTableRef = singletonOutputJsonTableRef;
-    this.singletonOutputTableDescription = singletonOutputTableDescription;
+      boolean singletonTable,
+      PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> 
resultsView,
+      TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag,
+      TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag) 
{
+    this.singletonTable = singletonTable;
     this.resultsView = resultsView;
     this.multiPartitionsTag = multiPartitionsTag;
     this.singlePartitionTag = singlePartitionTag;
@@ -117,30 +113,31 @@ class WritePartition extends DoFn<String, 
KV<ShardedKey<TableDestination>, List<
 
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
-    List<WriteBundlesToFiles.Result> results = 
Lists.newArrayList(c.sideInput(resultsView));
+    List<WriteBundlesToFiles.Result<DestinationT>> results =
+        Lists.newArrayList(c.sideInput(resultsView));
 
     // If there are no elements to write _and_ the user specified a constant 
output table, then
     // generate an empty table of that name.
-    if (results.isEmpty() && singletonOutputJsonTableRef != null) {
-      TableReference singletonTable = BigQueryHelpers.fromJsonString(
-          singletonOutputJsonTableRef.get(), TableReference.class);
-      if (singletonTable != null) {
+    if (results.isEmpty() && singletonTable) {
         TableRowWriter writer = new TableRowWriter(c.element());
         writer.open(UUID.randomUUID().toString());
         TableRowWriter.Result writerResult = writer.close();
-        results.add(new Result(writerResult.resourceId.toString(), 
writerResult.byteSize,
-            new TableDestination(singletonTable, 
singletonOutputTableDescription)));
-      }
+        // Return a null destination in this case - the constant 
DynamicDestinations class will
+        // resolve it to the singleton output table.
+        results.add(
+            new Result<DestinationT>(
+                writerResult.resourceId.toString(),
+                writerResult.byteSize,
+                null));
     }
 
-
-    Map<TableDestination, DestinationData> currentResults = Maps.newHashMap();
-    for (WriteBundlesToFiles.Result fileResult : results) {
-      TableDestination tableDestination = fileResult.tableDestination;
-      DestinationData destinationData = currentResults.get(tableDestination);
+    Map<DestinationT, DestinationData> currentResults = Maps.newHashMap();
+    for (WriteBundlesToFiles.Result<DestinationT> fileResult : results) {
+      DestinationT destination = fileResult.destination;
+      DestinationData destinationData = currentResults.get(destination);
       if (destinationData == null) {
         destinationData = new DestinationData();
-        currentResults.put(tableDestination, destinationData);
+        currentResults.put(destination, destinationData);
       }
 
       PartitionData latestPartition = destinationData.getLatestPartition();
@@ -156,18 +153,18 @@ class WritePartition extends DoFn<String, 
KV<ShardedKey<TableDestination>, List<
 
     // Now that we've figured out which tables and partitions to write out, 
emit this information
     // to the next stage.
-    for (Map.Entry<TableDestination, DestinationData> entry : 
currentResults.entrySet()) {
-      TableDestination tableDestination = entry.getKey();
+    for (Map.Entry<DestinationT, DestinationData> entry : 
currentResults.entrySet()) {
+      DestinationT destination = entry.getKey();
       DestinationData destinationData = entry.getValue();
       // In the fast-path case where we only output one table, the transform 
loads it directly
       // to the final table. In this case, we output on a special TupleTag so 
the enclosing
       // transform knows to skip the rename step.
-      TupleTag<KV<ShardedKey<TableDestination>, List<String>>> outputTag =
+      TupleTag<KV<ShardedKey<DestinationT>, List<String>>> outputTag =
           (destinationData.getPartitions().size() == 1) ? singlePartitionTag : 
multiPartitionsTag;
       for (int i = 0; i < destinationData.getPartitions().size(); ++i) {
         PartitionData partitionData = destinationData.getPartitions().get(i);
-        c.output(outputTag, KV.of(ShardedKey.of(tableDestination, i + 1),
-            partitionData.getFilenames()));
+        c.output(
+            outputTag, KV.of(ShardedKey.of(destination, i + 1), 
partitionData.getFilenames()));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index f336849..b299244 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -23,12 +23,14 @@ import 
com.google.api.services.bigquery.model.JobConfigurationLoad;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import javax.annotation.Nullable;
 
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
@@ -39,7 +41,6 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.FileIOChannelFactory;
 import org.apache.beam.sdk.util.GcsIOChannelFactory;
@@ -52,72 +53,86 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Writes partitions to BigQuery tables.
  *
- * <p>The input is a list of files corresponding to each partition of a table. 
These files are
- * load into a temporary table (or into the final table if there is only one 
partition). The output
- * is a {@link KV} mapping each final table to a list of the temporary tables 
containing its data.
+ * <p>The input is a list of files corresponding to each partition of a table. 
loadThese files are
+ * loaded into a temporary table (or into the final table if there is only one 
partition). The
+ * output is a {@link KV} mapping each final table to a list of the temporary 
tables containing its
+ * data.
  *
  * <p>In the case where all the data in the files fit into a single load job, 
this transform loads
  * the data directly into the final table, skipping temporary tables. In this 
case, the output
  * {@link KV} maps the final table to itself.
  */
-class WriteTables extends DoFn<KV<ShardedKey<TableDestination>, List<String>>,
-    KV<TableDestination, String>> {
+class WriteTables<DestinationT>
+    extends DoFn<KV<ShardedKey<DestinationT>, List<String>>, 
KV<TableDestination, String>> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class);
 
   private final boolean singlePartition;
   private final BigQueryServices bqServices;
   private final PCollectionView<String> jobIdToken;
+  private final PCollectionView<Map<DestinationT, String>> schemasView;
   private final String tempFilePrefix;
   private final WriteDisposition writeDisposition;
   private final CreateDisposition createDisposition;
-  private final SerializableFunction<TableDestination, TableSchema> 
schemaFunction;
+  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
 
   public WriteTables(
       boolean singlePartition,
       BigQueryServices bqServices,
       PCollectionView<String> jobIdToken,
+      PCollectionView<Map<DestinationT, String>> schemasView,
       String tempFilePrefix,
       WriteDisposition writeDisposition,
       CreateDisposition createDisposition,
-      SerializableFunction<TableDestination, TableSchema> schemaFunction) {
+      DynamicDestinations<?, DestinationT> dynamicDestinations) {
     this.singlePartition = singlePartition;
     this.bqServices = bqServices;
     this.jobIdToken = jobIdToken;
+    this.schemasView = schemasView;
     this.tempFilePrefix = tempFilePrefix;
     this.writeDisposition = writeDisposition;
     this.createDisposition = createDisposition;
-    this.schemaFunction = schemaFunction;
+    this.dynamicDestinations = dynamicDestinations;
   }
 
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
-    TableDestination tableDestination = c.element().getKey().getKey();
+    dynamicDestinations.setSideInputAccessorFromProcessContext(c);
+    DestinationT destination = c.element().getKey().getKey();
+    TableSchema tableSchema =
+        BigQueryHelpers.fromJsonString(
+            c.sideInput(schemasView).get(destination), TableSchema.class);
+    TableDestination tableDestination = 
dynamicDestinations.getTable(destination);
+    TableReference tableReference = tableDestination.getTableReference();
+    if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
+      tableReference.setProjectId(
+          c.getPipelineOptions().as(BigQueryOptions.class).getProject());
+      tableDestination = new TableDestination(
+          tableReference, tableDestination.getTableDescription());
+    }
+
     Integer partition = c.element().getKey().getShardNumber();
     List<String> partitionFiles = Lists.newArrayList(c.element().getValue());
-    String jobIdPrefix = BigQueryHelpers.createJobId(
-        c.sideInput(jobIdToken), tableDestination, partition);
+    String jobIdPrefix =
+        BigQueryHelpers.createJobId(c.sideInput(jobIdToken), tableDestination, 
partition);
 
-    TableReference ref = tableDestination.getTableReference();
     if (!singlePartition) {
-      ref.setTableId(jobIdPrefix);
+      tableReference.setTableId(jobIdPrefix);
     }
 
-    TableSchema schema = (schemaFunction != null) ? 
schemaFunction.apply(tableDestination) : null;
     load(
         
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
         
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
         jobIdPrefix,
-        ref,
-        schema,
+        tableReference,
+        tableSchema,
         partitionFiles,
         writeDisposition,
         createDisposition,
         tableDestination.getTableDescription());
-    c.output(KV.of(tableDestination, BigQueryHelpers.toJsonString(ref)));
+    c.output(KV.of(tableDestination, 
BigQueryHelpers.toJsonString(tableReference)));
 
     removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, 
partitionFiles);
   }
@@ -131,22 +146,22 @@ class WriteTables extends 
DoFn<KV<ShardedKey<TableDestination>, List<String>>,
       List<String> gcsUris,
       WriteDisposition writeDisposition,
       CreateDisposition createDisposition,
-      @Nullable String tableDescription) throws InterruptedException, 
IOException {
-    JobConfigurationLoad loadConfig = new JobConfigurationLoad()
-        .setDestinationTable(ref)
-        .setSchema(schema)
-        .setSourceUris(gcsUris)
-        .setWriteDisposition(writeDisposition.name())
-        .setCreateDisposition(createDisposition.name())
-        .setSourceFormat("NEWLINE_DELIMITED_JSON");
+      @Nullable String tableDescription)
+      throws InterruptedException, IOException {
+    JobConfigurationLoad loadConfig =
+        new JobConfigurationLoad()
+            .setDestinationTable(ref)
+            .setSchema(schema)
+            .setSourceUris(gcsUris)
+            .setWriteDisposition(writeDisposition.name())
+            .setCreateDisposition(createDisposition.name())
+            .setSourceFormat("NEWLINE_DELIMITED_JSON");
 
     String projectId = ref.getProjectId();
     Job lastFailedLoadJob = null;
     for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
       String jobId = jobIdPrefix + "-" + i;
-      JobReference jobRef = new JobReference()
-          .setProjectId(projectId)
-          .setJobId(jobId);
+      JobReference jobRef = new 
JobReference().setProjectId(projectId).setJobId(jobId);
       jobService.startLoadJob(jobRef, loadConfig);
       Job loadJob = jobService.pollJob(jobRef, 
Write.LOAD_JOB_POLL_MAX_RETRIES);
       Status jobStatus = BigQueryHelpers.parseStatus(loadJob);
@@ -157,31 +172,31 @@ class WriteTables extends 
DoFn<KV<ShardedKey<TableDestination>, List<String>>,
           }
           return;
         case UNKNOWN:
-          throw new RuntimeException(String.format(
-              "UNKNOWN status of load job [%s]: %s.", jobId,
-              BigQueryHelpers.jobToPrettyString(loadJob)));
+          throw new RuntimeException(
+              String.format(
+                  "UNKNOWN status of load job [%s]: %s.",
+                  jobId, BigQueryHelpers.jobToPrettyString(loadJob)));
         case FAILED:
           lastFailedLoadJob = loadJob;
           continue;
         default:
-          throw new IllegalStateException(String.format(
-              "Unexpected status [%s] of load job: %s.",
-              jobStatus, BigQueryHelpers.jobToPrettyString(loadJob)));
+          throw new IllegalStateException(
+              String.format(
+                  "Unexpected status [%s] of load job: %s.",
+                  jobStatus, BigQueryHelpers.jobToPrettyString(loadJob)));
       }
     }
-    throw new RuntimeException(String.format(
-        "Failed to create load job with id prefix %s, "
-            + "reached max retries: %d, last failed load job: %s.",
-        jobIdPrefix,
-        Write.MAX_RETRY_JOBS,
-        BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
+    throw new RuntimeException(
+        String.format(
+            "Failed to create load job with id prefix %s, "
+                + "reached max retries: %d, last failed load job: %s.",
+            jobIdPrefix,
+            Write.MAX_RETRY_JOBS,
+            BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
   }
 
   static void removeTemporaryFiles(
-      PipelineOptions options,
-      String tempFilePrefix,
-      Collection<String> files)
-      throws IOException {
+      PipelineOptions options, String tempFilePrefix, Collection<String> 
files) throws IOException {
     IOChannelFactory factory = IOChannelUtils.getFactory(tempFilePrefix);
     if (factory instanceof GcsIOChannelFactory) {
       GcsUtil gcsUtil = new GcsUtilFactory().create(options);
@@ -203,8 +218,7 @@ class WriteTables extends 
DoFn<KV<ShardedKey<TableDestination>, List<String>>,
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
 
-    builder
-        .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
-            .withLabel("Temporary File Prefix"));
+    builder.addIfNotNull(
+        DisplayData.item("tempFilePrefix", 
tempFilePrefix).withLabel("Temporary File Prefix"));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/35db7457/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index ef3419e..e267dab 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -24,7 +24,10 @@ import static 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdTok
 import static 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createTempTableReference;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -44,6 +47,7 @@ import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -66,6 +70,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
@@ -206,7 +212,13 @@ public class BigQueryIOTest implements Serializable {
     assertEquals(project, write.getTable().get().getProjectId());
     assertEquals(dataset, write.getTable().get().getDatasetId());
     assertEquals(table, write.getTable().get().getTableId());
-    assertEquals(schema, write.getSchema());
+    if (schema == null) {
+      assertNull(write.getJsonSchema());
+      assertNull(write.getSchemaFromView());
+    } else {
+      assertEquals(schema, BigQueryHelpers.fromJsonString(
+          write.getJsonSchema().get(), TableSchema.class));
+    }
     assertEquals(createDisposition, write.getCreateDisposition());
     assertEquals(writeDisposition, write.getWriteDisposition());
     assertEquals(tableDescription, write.getTableDescription());
@@ -443,6 +455,127 @@ public class BigQueryIOTest implements Serializable {
     p.run();
   }
 
+  // Create an intermediate type to ensure that coder inference up the 
inheritance tree is tested.
+  abstract static class StringIntegerDestinations extends 
DynamicDestinations<String, Integer> {
+  }
+
+  @Test
+  public void testWriteDynamicDestinationsBatch() throws Exception {
+    writeDynamicDestinations(false);
+  }
+
+  @Test
+  public void testWriteDynamicDestinationsStreaming() throws Exception {
+    writeDynamicDestinations(true);
+  }
+
+  public void writeDynamicDestinations(boolean streaming) throws Exception {
+    BigQueryOptions bqOptions = 
TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject("project-id");
+    
bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
+    FakeDatasetService datasetService = new FakeDatasetService();
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(new FakeJobService())
+        .withDatasetService(datasetService);
+
+    datasetService.createDataset("project-id", "dataset-id", "", "");
+
+    final Pattern userPattern = Pattern.compile("([a-z]+)([0-9]+)");
+    Pipeline p = TestPipeline.create(bqOptions);
+
+    final PCollectionView<List<String>> sideInput1 =
+        p.apply("Create SideInput 1", Create.of("a", "b", 
"c").withCoder(StringUtf8Coder.of()))
+            .apply("asList", View.<String>asList());
+    final PCollectionView<Map<String, String>> sideInput2 =
+    p.apply("Create SideInput2", Create.of(KV.of("a", "a"), KV.of("b", "b"), 
KV.of("c", "c")))
+        .apply("AsMap", View.<String, String>asMap());
+
+    PCollection<String> users = p.apply("CreateUsers",
+        Create.of("bill1", "sam2", "laurence3")
+            .withCoder(StringUtf8Coder.of()))
+        .apply(Window.into(new PartitionedGlobalWindows<>(
+            new SerializableFunction<String, String>() {
+              @Override
+              public String apply(String arg) {
+                return arg;
+              }
+        })));
+
+    if (streaming) {
+      users = users.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+    }
+    users.apply("WriteBigQuery", BigQueryIO.<String>write()
+            .withTestServices(fakeBqServices)
+            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+            .withFormatFunction(new SerializableFunction<String, TableRow>() {
+              @Override
+              public TableRow apply(String user) {
+                Matcher matcher = userPattern.matcher(user);
+                if (matcher.matches()) {
+                  return new TableRow().set("name", matcher.group(1))
+                      .set("id", Integer.valueOf(matcher.group(2)));
+                }
+                throw new RuntimeException("Unmatching element " + user);
+              }
+            })
+            .to(new StringIntegerDestinations() {
+              @Override
+              public Integer getDestination(ValueInSingleWindow<String> 
element) {
+                assertThat(element.getWindow(), 
Matchers.instanceOf(PartitionedGlobalWindow.class));
+                Matcher matcher = userPattern.matcher(element.getValue());
+                if (matcher.matches()) {
+                  // Since we name tables by userid, we can simply store an 
Integer to represent
+                  // a table.
+                  return Integer.valueOf(matcher.group(2));
+                }
+                throw new RuntimeException("Unmatching destination " + 
element.getValue());
+              }
+
+              @Override
+              public TableDestination getTable(Integer userId) {
+                verifySideInputs();
+                // Each user in it's own table.
+                return new TableDestination("dataset-id.userid-" + userId,
+                    "table for userid " + userId);
+              }
+
+              @Override
+              public TableSchema getSchema(Integer userId) {
+                verifySideInputs();
+                return new TableSchema().setFields(
+                    ImmutableList.of(
+                        new 
TableFieldSchema().setName("name").setType("STRING"),
+                        new 
TableFieldSchema().setName("id").setType("INTEGER")));
+              }
+
+              @Override
+              public List<PCollectionView<?>> getSideInputs() {
+                return ImmutableList.of(sideInput1, sideInput2);
+              }
+
+              private void verifySideInputs() {
+                assertThat(sideInput(sideInput1), containsInAnyOrder("a", "b", 
"c"));
+                Map<String, String> mapSideInput = sideInput(sideInput2);
+                assertEquals(3, mapSideInput.size());
+                assertThat(mapSideInput,
+                    allOf(hasEntry("a", "a"), hasEntry("b", "b"), 
hasEntry("c", "c")));
+              }
+            })
+            .withoutValidation());
+    p.run();
+
+    File tempDir = new File(bqOptions.getTempLocation());
+    testNumFiles(tempDir, 0);
+
+    assertThat(datasetService.getAllRows("project-id", "dataset-id", 
"userid-1"),
+        containsInAnyOrder(new TableRow().set("name", "bill").set("id", 1)));
+    assertThat(datasetService.getAllRows("project-id", "dataset-id", 
"userid-2"),
+        containsInAnyOrder(new TableRow().set("name", "sam").set("id", 2)));
+    assertThat(datasetService.getAllRows("project-id", "dataset-id", 
"userid-3"),
+        containsInAnyOrder(new TableRow().set("name", "laurence").set("id", 
3)));
+  }
+
   @Test
   public void testWrite() throws Exception {
     BigQueryOptions bqOptions = 
TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
@@ -571,8 +704,6 @@ public class BigQueryIOTest implements Serializable {
       return GlobalWindow.INSTANCE.maxTimestamp();
     }
 
-    // The following methods are only needed due to BEAM-1022. Once this issue 
is fixed, we will
-    // no longer need these.
     @Override
     public boolean equals(Object other) {
       if (other instanceof PartitionedGlobalWindow) {
@@ -635,7 +766,7 @@ public class BigQueryIOTest implements Serializable {
 
     // Create a windowing strategy that puts the input into five different 
windows depending on
     // record value.
-    WindowFn<Integer, PartitionedGlobalWindow> window = new 
PartitionedGlobalWindows(
+    WindowFn<Integer, PartitionedGlobalWindow> windowFn = new 
PartitionedGlobalWindows(
         new SerializableFunction<Integer, String>() {
           @Override
           public String apply(Integer i) {
@@ -644,24 +775,44 @@ public class BigQueryIOTest implements Serializable {
         }
     );
 
+    final Map<Integer, TableDestination> targetTables = Maps.newHashMap();
+    Map<String, String> schemas = Maps.newHashMap();
+    for (int i = 0; i < 5; i++) {
+      TableDestination destination = new 
TableDestination("project-id:dataset-id"
+          + ".table-id-" + i, "");
+      targetTables.put(i, destination);
+      // Make sure each target table has its own custom table.
+      schemas.put(destination.getTableSpec(),
+          BigQueryHelpers.toJsonString(new TableSchema().setFields(
+              ImmutableList.of(
+                  new TableFieldSchema().setName("name").setType("STRING"),
+                  new TableFieldSchema().setName("number").setType("INTEGER"),
+                  new TableFieldSchema().setName("custom_" + 
i).setType("STRING")))));
+    }
+
     SerializableFunction<ValueInSingleWindow<Integer>, TableDestination> 
tableFunction =
         new SerializableFunction<ValueInSingleWindow<Integer>, 
TableDestination>() {
           @Override
           public TableDestination apply(ValueInSingleWindow<Integer> input) {
             PartitionedGlobalWindow window = (PartitionedGlobalWindow) 
input.getWindow();
-            // Check that we can access the element as well here.
+            // Check that we can access the element as well here and that it 
matches the window.
             
checkArgument(window.value.equals(Integer.toString(input.getValue() % 5)),
                 "Incorrect element");
-            return new TableDestination("project-id:dataset-id.table-id-" + 
window.value, "");
+            return targetTables.get(input.getValue() % 5);
           }
     };
 
     Pipeline p = TestPipeline.create(bqOptions);
-    PCollection<Integer> input = p.apply(Create.of(inserts));
+    PCollection<Integer> input = p.apply("CreateSource", Create.of(inserts));
     if (streaming) {
       input = input.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
     }
-    input.apply(Window.<Integer>into(window))
+
+    PCollectionView<Map<String, String>> schemasView =
+        p.apply("CreateSchemaMap", Create.of(schemas))
+        .apply("ViewSchemaAsMap", View.<String, String>asMap());
+
+    input.apply(Window.<Integer>into(windowFn))
         .apply(BigQueryIO.<Integer>write()
             .to(tableFunction)
             .withFormatFunction(new SerializableFunction<Integer, TableRow>() {
@@ -670,35 +821,27 @@ public class BigQueryIOTest implements Serializable {
                 return new TableRow().set("name", "number" + i).set("number", 
i);
               }})
             .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-            .withSchema(new TableSchema().setFields(
-                ImmutableList.of(
-                    new TableFieldSchema().setName("name").setType("STRING"),
-                    new 
TableFieldSchema().setName("number").setType("INTEGER"))))
+            .withSchemaFromView(schemasView)
             .withTestServices(fakeBqServices)
             .withoutValidation());
     p.run();
 
-
-    assertThat(datasetService.getAllRows("project-id", "dataset-id", 
"table-id-0"),
-        containsInAnyOrder(
-            new TableRow().set("name", "number0").set("number", 0),
-            new TableRow().set("name", "number5").set("number", 5)));
-    assertThat(datasetService.getAllRows("project-id", "dataset-id", 
"table-id-1"),
-        containsInAnyOrder(
-            new TableRow().set("name", "number1").set("number", 1),
-            new TableRow().set("name", "number6").set("number", 6)));
-    assertThat(datasetService.getAllRows("project-id", "dataset-id", 
"table-id-2"),
-        containsInAnyOrder(
-            new TableRow().set("name", "number2").set("number", 2),
-            new TableRow().set("name", "number7").set("number", 7)));
-    assertThat(datasetService.getAllRows("project-id", "dataset-id", 
"table-id-3"),
-        containsInAnyOrder(
-            new TableRow().set("name", "number3").set("number", 3),
-            new TableRow().set("name", "number8").set("number", 8)));
-    assertThat(datasetService.getAllRows("project-id", "dataset-id", 
"table-id-4"),
-        containsInAnyOrder(
-            new TableRow().set("name", "number4").set("number", 4),
-            new TableRow().set("name", "number9").set("number", 9)));
+    for (int i = 0; i < 5; ++i) {
+      String tableId = String.format("table-id-%d", i);
+      String tableSpec = String.format("project-id:dataset-id.%s", tableId);
+
+      // Verify that table was created with the correct schema.
+      assertThat(BigQueryHelpers.toJsonString(
+          datasetService.getTable(new 
TableReference().setProjectId("project-id")
+              .setDatasetId("dataset-id").setTableId(tableId)).getSchema()),
+          equalTo(schemas.get(tableSpec)));
+
+      // Verify that the table has the expected contents.
+      assertThat(datasetService.getAllRows("project-id", "dataset-id", 
tableId),
+          containsInAnyOrder(
+              new TableRow().set("name", String.format("number%d", 
i)).set("number", i),
+              new TableRow().set("name", String.format("number%d", i + 
5)).set("number", i + 5)));
+    }
   }
 
   @Test
@@ -1585,72 +1728,59 @@ public class BigQueryIOTest implements Serializable {
     // In the case where a static destination is specified (i.e. not through a 
dynamic table
     // function) and there is no input data, WritePartition will generate an 
empty table. This
     // code is to test that path.
-    TableReference singletonReference = new TableReference()
-        .setProjectId("projectid")
-        .setDatasetId("dataset")
-        .setTableId("table");
-    String singletonDescription = "singleton";
     boolean isSingleton = numTables == 1 && numFilesPerTable == 0;
 
-    List<ShardedKey<TableDestination>> expectedPartitions = 
Lists.newArrayList();
+    List<ShardedKey<String>> expectedPartitions = Lists.newArrayList();
     if (isSingleton) {
-      expectedPartitions.add(ShardedKey.of(
-          new TableDestination(singletonReference, singletonDescription), 1));
+      expectedPartitions.add(ShardedKey.<String>of(null, 1));
     } else {
       for (int i = 0; i < numTables; ++i) {
         for (int j = 1; j <= expectedNumPartitionsPerTable; ++j) {
           String tableName = String.format("project-id:dataset-id.tables%05d", 
i);
-          TableDestination destination = new TableDestination(tableName, 
tableName);
-          expectedPartitions.add(ShardedKey.of(destination, j));
+          expectedPartitions.add(ShardedKey.of(tableName, j));
         }
       }
     }
 
-    List<WriteBundlesToFiles.Result> files = Lists.newArrayList();
-    Map<TableDestination, List<String>> filenamesPerTable = Maps.newHashMap();
+    List<WriteBundlesToFiles.Result<String>> files = Lists.newArrayList();
+    Map<String, List<String>> filenamesPerTable = Maps.newHashMap();
     for (int i = 0; i < numTables; ++i) {
       String tableName = String.format("project-id:dataset-id.tables%05d", i);
-      TableDestination destination = new TableDestination(tableName, 
tableName);
-      List<String> filenames = filenamesPerTable.get(destination);
+      List<String> filenames = filenamesPerTable.get(tableName);
       if (filenames == null) {
         filenames = Lists.newArrayList();
-        filenamesPerTable.put(destination, filenames);
+        filenamesPerTable.put(tableName, filenames);
       }
       for (int j = 0; j < numFilesPerTable; ++j) {
         String fileName = String.format("%s_files%05d", tableName, j);
         filenames.add(fileName);
-        files.add(new Result(fileName, fileSize, destination));
+        files.add(new Result<>(fileName, fileSize, tableName));
       }
     }
 
-    TupleTag<KV<ShardedKey<TableDestination>, List<String>>> 
multiPartitionsTag =
-        new TupleTag<KV<ShardedKey<TableDestination>, 
List<String>>>("multiPartitionsTag") {};
-    TupleTag<KV<ShardedKey<TableDestination>, List<String>>> 
singlePartitionTag =
-        new TupleTag<KV<ShardedKey<TableDestination>, 
List<String>>>("singlePartitionTag") {};
+    TupleTag<KV<ShardedKey<String>, List<String>>> multiPartitionsTag =
+        new TupleTag<KV<ShardedKey<String>, 
List<String>>>("multiPartitionsTag") {};
+    TupleTag<KV<ShardedKey<String>, List<String>>> singlePartitionTag =
+        new TupleTag<KV<ShardedKey<String>, 
List<String>>>("singlePartitionTag") {};
 
-    PCollection<WriteBundlesToFiles.Result> filesPCollection =
-        p.apply(Create.of(files).withType(new 
TypeDescriptor<WriteBundlesToFiles.Result>() {}));
-    PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView =
+    PCollection<WriteBundlesToFiles.Result<String>> filesPCollection =
+        p.apply(Create.of(files)
+        .withCoder(WriteBundlesToFiles.ResultCoder.of(StringUtf8Coder.of())));
+    PCollectionView<Iterable<WriteBundlesToFiles.Result<String>>> resultsView =
         PCollectionViews.iterableView(
         filesPCollection,
         WindowingStrategy.globalDefault(),
-        WriteBundlesToFiles.ResultCoder.of());
+        WriteBundlesToFiles.ResultCoder.of(StringUtf8Coder.of()));
 
-    ValueProvider<String> singletonTable = null;
-    if (isSingleton) {
-      singletonTable = 
StaticValueProvider.of(BigQueryHelpers.toJsonString(singletonReference));
-    }
-    WritePartition writePartition =
-        new WritePartition(singletonTable,
-            "singleton", resultsView,
-            multiPartitionsTag, singlePartitionTag);
+    WritePartition<String> writePartition =
+        new WritePartition<>(isSingleton, resultsView, multiPartitionsTag, 
singlePartitionTag);
 
-    DoFnTester<String, KV<ShardedKey<TableDestination>, List<String>>> tester =
+    DoFnTester<String, KV<ShardedKey<String>, List<String>>> tester =
         DoFnTester.of(writePartition);
     tester.setSideInput(resultsView, GlobalWindow.INSTANCE, files);
     
tester.processElement(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
-    List<KV<ShardedKey<TableDestination>, List<String>>> partitions;
+    List<KV<ShardedKey<String>, List<String>>> partitions;
     if (expectedNumPartitionsPerTable > 1) {
       partitions = tester.takeOutputElements(multiPartitionsTag);
     } else {
@@ -1658,10 +1788,10 @@ public class BigQueryIOTest implements Serializable {
     }
 
 
-    List<ShardedKey<TableDestination>> partitionsResult = Lists.newArrayList();
-    Map<TableDestination, List<String>> filesPerTableResult = 
Maps.newHashMap();
-    for (KV<ShardedKey<TableDestination>, List<String>> partition : 
partitions) {
-      TableDestination table = partition.getKey().getKey();
+    List<ShardedKey<String>> partitionsResult = Lists.newArrayList();
+    Map<String, List<String>> filesPerTableResult = Maps.newHashMap();
+    for (KV<ShardedKey<String>, List<String>> partition : partitions) {
+      String table = partition.getKey().getKey();
       partitionsResult.add(partition.getKey());
       List<String> tableFilesResult = filesPerTableResult.get(table);
       if (tableFilesResult == null) {
@@ -1685,6 +1815,23 @@ public class BigQueryIOTest implements Serializable {
     }
   }
 
+  static class IdentityDynamicTables extends DynamicDestinations<String, 
String> {
+    @Override
+    public String getDestination(ValueInSingleWindow<String> element) {
+      throw new UnsupportedOperationException("getDestination not expected in 
this test.");
+    }
+
+    @Override
+    public TableDestination getTable(String destination) {
+      return new TableDestination(destination, destination);
+    }
+
+    @Override
+    public TableSchema getSchema(String destination) {
+      throw new UnsupportedOperationException("getSchema not expected in this 
test.");
+    }
+  }
+
   @Test
   public void testWriteTables() throws Exception {
     p.enableAbandonedNodeEnforcement(false);
@@ -1703,7 +1850,7 @@ public class BigQueryIOTest implements Serializable {
 
     Path baseDir = Files.createTempDirectory(tempFolder, "testWriteTables");
 
-    List<KV<ShardedKey<TableDestination>, List<String>>> partitions = 
Lists.newArrayList();
+    List<KV<ShardedKey<String>, List<String>>> partitions = 
Lists.newArrayList();
     for (int i = 0; i < numTables; ++i) {
       String tableName = String.format("project-id:dataset-id.table%05d", i);
       TableDestination tableDestination = new TableDestination(tableName, 
tableName);
@@ -1722,7 +1869,7 @@ public class BigQueryIOTest implements Serializable {
           }
           filesPerPartition.add(filename);
         }
-        partitions.add(KV.of(ShardedKey.of(tableDestination, j),
+        partitions.add(KV.of(ShardedKey.of(tableDestination.getTableSpec(), j),
             filesPerPartition));
 
         List<String> expectedTables = expectedTempTables.get(tableDestination);
@@ -1737,23 +1884,30 @@ public class BigQueryIOTest implements Serializable {
       }
     }
 
-    PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", 
Create.of("jobId"));
-    PCollectionView<String> jobIdTokenView =
-        jobIdTokenCollection.apply(View.<String>asSingleton());
-
-    WriteTables writeTables = new WriteTables(
-        false,
-        fakeBqServices,
-        jobIdTokenView,
-        tempFilePrefix,
-        WriteDisposition.WRITE_EMPTY,
-        CreateDisposition.CREATE_IF_NEEDED,
-        null);
-
-    DoFnTester<KV<ShardedKey<TableDestination>, List<String>>,
+    PCollectionView<String> jobIdTokenView = p
+        .apply("CreateJobId", Create.of("jobId"))
+        .apply(View.<String>asSingleton());
+
+    PCollectionView<Map<String, String>> schemaMapView =
+        p.apply("CreateEmptySchema",
+            Create.empty(new TypeDescriptor<KV<String, String>>() {}))
+            .apply(View.<String, String>asMap());
+    WriteTables<String> writeTables =
+        new WriteTables<>(
+            false,
+            fakeBqServices,
+            jobIdTokenView,
+            schemaMapView,
+            tempFilePrefix,
+            WriteDisposition.WRITE_EMPTY,
+            CreateDisposition.CREATE_IF_NEEDED,
+            new IdentityDynamicTables());
+
+    DoFnTester<KV<ShardedKey<String>, List<String>>,
         KV<TableDestination, String>> tester = DoFnTester.of(writeTables);
     tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
-    for (KV<ShardedKey<TableDestination>, List<String>> partition : 
partitions) {
+    tester.setSideInput(schemaMapView, GlobalWindow.INSTANCE, 
ImmutableMap.<String, String>of());
+    for (KV<ShardedKey<String>, List<String>> partition : partitions) {
       tester.processElement(partition);
     }
 
@@ -1867,9 +2021,9 @@ public class BigQueryIOTest implements Serializable {
         KvCoder.of(TableDestinationCoder.of(),
             StringUtf8Coder.of()));
 
-    PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", 
Create.of("jobId"));
-    PCollectionView<String> jobIdTokenView =
-        jobIdTokenCollection.apply(View.<String>asSingleton());
+    PCollectionView<String> jobIdTokenView = p
+        .apply("CreateJobId", Create.of("jobId"))
+        .apply(View.<String>asSingleton());
 
     WriteRename writeRename = new WriteRename(
         fakeBqServices,

Reply via email to