This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f549fd33abd Merge pull request #26063: #21431 Pubsub dynamic topic 
destinations
f549fd33abd is described below

commit f549fd33abdc672143ccbe3f0f66104995d30fe6
Author: Reuven Lax <re...@google.com>
AuthorDate: Fri Apr 21 21:01:11 2023 -0700

    Merge pull request #26063: #21431 Pubsub dynamic topic destinations
---
 .../core/construction/PTransformTranslation.java   |   2 +
 .../beam/runners/dataflow/DataflowRunner.java      |  26 +-
 .../dataflow/options/DataflowPipelineOptions.java  |   9 +-
 .../beam/runners/dataflow/util/PropertyNames.java  |   4 +
 .../beam/runners/dataflow/DataflowRunnerTest.java  |  74 +++++
 .../runners/dataflow/worker/PubsubDynamicSink.java | 163 ++++++++++
 .../dataflow/worker/PubsubDynamicSinkTest.java     | 163 ++++++++++
 .../beam/sdk/io/gcp/pubsub/ExternalWrite.java      |   1 +
 .../sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java  | 142 ++++++++
 .../io/gcp/pubsub/PubSubPayloadTranslation.java    |  39 ++-
 .../beam/sdk/io/gcp/pubsub/PubsubClient.java       |  19 +-
 .../beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java   |   4 +-
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java    | 357 ++++++++++++---------
 .../beam/sdk/io/gcp/pubsub/PubsubJsonClient.java   |  13 +-
 .../beam/sdk/io/gcp/pubsub/PubsubMessage.java      |  27 +-
 .../io/gcp/pubsub/PubsubMessageWithTopicCoder.java |  72 +++++
 .../beam/sdk/io/gcp/pubsub/PubsubTestClient.java   |  27 +-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java     | 194 ++++++++---
 .../io/gcp/pubsub/PreparePubsubWriteDoFnTest.java  | 135 ++++++++
 .../pubsub/PubSubWritePayloadTranslationTest.java  |  41 +++
 .../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java    |   3 +-
 .../sdk/io/gcp/pubsub/PubsubIOExternalTest.java    |   4 +-
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java       | 183 +++++------
 .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java    |   9 +-
 .../sdk/io/gcp/pubsub/PubsubTestClientTest.java    |   9 +-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java |  82 ++++-
 26 files changed, 1472 insertions(+), 330 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index e701ae60bb5..485350715c9 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -107,6 +107,8 @@ public class PTransformTranslation {
   public static final String PUBSUB_READ = "beam:transform:pubsub_read:v1";
   public static final String PUBSUB_WRITE = "beam:transform:pubsub_write:v1";
 
+  public static final String PUBSUB_WRITE_DYNAMIC = 
"beam:transform:pubsub_write:v2";
+
   // CombineComponents
   public static final String COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN =
       "beam:transform:combine_per_key_precombine:v1";
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index dfa3d37a400..2c24df1852b 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1797,8 +1797,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
    * Suppress application of {@link PubsubUnboundedSink#expand} in streaming 
mode so that we can
    * instead defer to Windmill's implementation.
    */
-  private static class StreamingPubsubIOWrite
-      extends PTransform<PCollection<PubsubMessage>, PDone> {
+  static class StreamingPubsubIOWrite extends 
PTransform<PCollection<PubsubMessage>, PDone> {
 
     private final PubsubUnboundedSink transform;
 
@@ -1850,13 +1849,24 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
         StepTranslationContext stepContext,
         PCollection input) {
       stepContext.addInput(PropertyNames.FORMAT, "pubsub");
-      if (overriddenTransform.getTopicProvider().isAccessible()) {
-        stepContext.addInput(
-            PropertyNames.PUBSUB_TOPIC, 
overriddenTransform.getTopic().getFullPath());
+      if (overriddenTransform.getTopicProvider() != null) {
+        if (overriddenTransform.getTopicProvider().isAccessible()) {
+          stepContext.addInput(
+              PropertyNames.PUBSUB_TOPIC, 
overriddenTransform.getTopic().getFullPath());
+        } else {
+          stepContext.addInput(
+              PropertyNames.PUBSUB_TOPIC_OVERRIDE,
+              ((NestedValueProvider) 
overriddenTransform.getTopicProvider()).propertyName());
+        }
       } else {
-        stepContext.addInput(
-            PropertyNames.PUBSUB_TOPIC_OVERRIDE,
-            ((NestedValueProvider) 
overriddenTransform.getTopicProvider()).propertyName());
+        DataflowPipelineOptions options =
+            input.getPipeline().getOptions().as(DataflowPipelineOptions.class);
+        if (options.getEnableDynamicPubsubDestinations()) {
+          stepContext.addInput(PropertyNames.PUBSUB_DYNAMIC_DESTINATIONS, 
true);
+        } else {
+          throw new RuntimeException(
+              "Dynamic Pubsub destinations not yet supported. Topic must be 
set.");
+        }
       }
       if (overriddenTransform.getTimestampAttribute() != null) {
         stepContext.addInput(
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index c716cfe82ec..3f4c4c975f5 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -153,7 +153,14 @@ public interface DataflowPipelineOptions
   @Description("The customized dataflow worker jar")
   String getDataflowWorkerJar();
 
-  void setDataflowWorkerJar(String dataflowWorkerJar);
+  void setDataflowWorkerJar(String dataflowWorkerJafr);
+
+  // Disable this support for now until the Dataflow backend fully supports 
this option.
+  @Description("Whether to allow dynamic pubsub destinations. Temporary 
option: will be removed.")
+  @Default.Boolean(false)
+  Boolean getEnableDynamicPubsubDestinations();
+
+  void setEnableDynamicPubsubDestinations(Boolean enable);
 
   /** Set of available Flexible Resource Scheduling goals. */
   enum FlexResourceSchedulingGoal {
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
index f1a8993a3be..80441e8c921 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
@@ -46,11 +46,15 @@ public class PropertyNames {
   public static final String PARALLEL_INPUT = "parallel_input";
   public static final String PUBSUB_ID_ATTRIBUTE = "pubsub_id_label";
   public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN = 
"pubsub_serialized_attributes_fn";
+
   public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
   public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = 
"pubsub_subscription_runtime_override";
   public static final String PUBSUB_TIMESTAMP_ATTRIBUTE = 
"pubsub_timestamp_label";
   public static final String PUBSUB_TOPIC = "pubsub_topic";
   public static final String PUBSUB_TOPIC_OVERRIDE = 
"pubsub_topic_runtime_override";
+
+  public static final String PUBSUB_DYNAMIC_DESTINATIONS = 
"pubsub_with_dynamic_destinations";
+
   public static final String SCALAR_FIELD_NAME = "value";
   public static final String SERIALIZED_FN = "serialized_fn";
   public static final String SERIALIZED_TEST_STREAM = "serialized_test_stream";
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 7f9c4a0c5b9..c534af38a8d 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -76,6 +76,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
@@ -120,6 +121,8 @@ import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.io.WriteFilesResult;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
@@ -163,7 +166,10 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.InvalidProtocolBufferException;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
 import org.hamcrest.Description;
 import org.hamcrest.Matchers;
 import org.hamcrest.TypeSafeMatcher;
@@ -2341,6 +2347,74 @@ public class DataflowRunnerTest implements Serializable {
     verifyGroupIntoBatchesOverrideBytes(p, true, true);
   }
 
+  @Test
+  public void testPubsubSinkOverride() throws IOException {
+    PipelineOptions options = buildPipelineOptions();
+    List<String> experiments =
+        new ArrayList<>(
+            ImmutableList.of(
+                GcpOptions.STREAMING_ENGINE_EXPERIMENT,
+                GcpOptions.WINDMILL_SERVICE_EXPERIMENT,
+                "use_runner_v2"));
+    DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
+    dataflowOptions.setExperiments(experiments);
+    dataflowOptions.setStreaming(true);
+    Pipeline p = Pipeline.create(options);
+
+    List<PubsubMessage> testValues =
+        Arrays.asList(
+            new PubsubMessage("foo".getBytes(StandardCharsets.UTF_8), 
Collections.emptyMap()));
+    PCollection<PubsubMessage> input =
+        p.apply("CreateValuesBytes", Create.of(testValues))
+            .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+    input.apply(PubsubIO.writeMessages().to("projects/project/topics/topic"));
+    p.run();
+
+    AtomicBoolean sawPubsubOverride = new AtomicBoolean(false);
+    p.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+          @Override
+          public void visitPrimitiveTransform(@UnknownKeyFor @NonNull 
@Initialized Node node) {
+            if (node.getTransform() instanceof 
DataflowRunner.StreamingPubsubIOWrite) {
+              sawPubsubOverride.set(true);
+            }
+          }
+        });
+    assertTrue(sawPubsubOverride.get());
+  }
+
+  @Test
+  public void testPubinkDynamicOverride() throws IOException {
+    PipelineOptions options = buildPipelineOptions();
+    DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
+    dataflowOptions.setStreaming(true);
+    dataflowOptions.setEnableDynamicPubsubDestinations(true);
+    Pipeline p = Pipeline.create(options);
+
+    List<PubsubMessage> testValues =
+        Arrays.asList(
+            new PubsubMessage("foo".getBytes(StandardCharsets.UTF_8), 
Collections.emptyMap())
+                .withTopic(""));
+    PCollection<PubsubMessage> input =
+        p.apply("CreateValuesBytes", Create.of(testValues))
+            .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+    input.apply(PubsubIO.writeMessagesDynamic());
+    p.run();
+
+    AtomicBoolean sawPubsubOverride = new AtomicBoolean(false);
+    p.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+
+          @Override
+          public void visitPrimitiveTransform(@UnknownKeyFor @NonNull 
@Initialized Node node) {
+            if (node.getTransform() instanceof 
DataflowRunner.StreamingPubsubIOWrite) {
+              sawPubsubOverride.set(true);
+            }
+          }
+        });
+    assertTrue(sawPubsubOverride.get());
+  }
+
   static class TestExpansionServiceClientFactory implements 
ExpansionServiceClientFactory {
     ExpansionApi.ExpansionResponse response;
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java
new file mode 100644
index 00000000000..70a74c3f156
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.apache.beam.runners.dataflow.util.Structs.getString;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
+import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+})
+public class PubsubDynamicSink extends Sink<WindowedValue<PubsubMessage>> {
+  private final String timestampLabel;
+  private final String idLabel;
+  private final StreamingModeExecutionContext context;
+
+  PubsubDynamicSink(String timestampLabel, String idLabel, 
StreamingModeExecutionContext context) {
+    this.timestampLabel = timestampLabel;
+    this.idLabel = idLabel;
+    this.context = context;
+  }
+
+  /** A {@link SinkFactory.Registrar} for pubsub sinks. */
+  @AutoService(SinkFactory.Registrar.class)
+  public static class Registrar implements SinkFactory.Registrar {
+
+    @Override
+    public Map<String, SinkFactory> factories() {
+      PubsubDynamicSink.Factory factory = new Factory();
+      return ImmutableMap.of(
+          "PubsubDynamicSink",
+          factory,
+          "org.apache.beam.runners.dataflow.worker.PubsubDynamicSink",
+          factory);
+    }
+  }
+
+  static class Factory implements SinkFactory {
+    @Override
+    public PubsubDynamicSink create(
+        CloudObject spec,
+        Coder<?> coder,
+        @Nullable PipelineOptions options,
+        @Nullable DataflowExecutionContext executionContext,
+        DataflowOperationContext operationContext)
+        throws Exception {
+      String timestampLabel = getString(spec, 
PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "");
+      String idLabel = getString(spec, PropertyNames.PUBSUB_ID_ATTRIBUTE, "");
+
+      return new PubsubDynamicSink(
+          timestampLabel,
+          idLabel,
+          (StreamingModeExecutionContext) 
checkArgumentNotNull(executionContext));
+    }
+  }
+
+  @Override
+  public Sink.SinkWriter<WindowedValue<PubsubMessage>> writer() {
+    return new PubsubDynamicSink.PubsubWriter();
+  }
+
+  class PubsubWriter implements Sink.SinkWriter<WindowedValue<PubsubMessage>> {
+    private final Map<String, Windmill.PubSubMessageBundle.Builder> 
outputBuilders;
+    private final ByteStringOutputStream stream; // Kept across adds for 
buffer reuse.
+
+    PubsubWriter() {
+      outputBuilders = Maps.newHashMap();
+      stream = new ByteStringOutputStream();
+    }
+
+    public ByteString getDataFromMessage(PubsubMessage formatted, 
ByteStringOutputStream stream)
+        throws IOException {
+      Pubsub.PubsubMessage.Builder pubsubMessageBuilder =
+          
Pubsub.PubsubMessage.newBuilder().setData(ByteString.copyFrom(formatted.getPayload()));
+      Map<String, String> attributeMap = formatted.getAttributeMap();
+      if (attributeMap != null) {
+        pubsubMessageBuilder.putAllAttributes(attributeMap);
+      }
+      pubsubMessageBuilder.build().writeTo(stream);
+      return stream.toByteStringAndReset();
+    }
+
+    public void close(Windmill.PubSubMessageBundle.Builder outputBuilder) 
throws IOException {
+      context.getOutputBuilder().addPubsubMessages(outputBuilder);
+      outputBuilder.clear();
+    }
+
+    @Override
+    public long add(WindowedValue<PubsubMessage> data) throws IOException {
+      String dataTopic =
+          checkArgumentNotNull(
+              data.getValue().getTopic(), "No topic set for message when using 
dynamic topics.");
+      Preconditions.checkArgument(
+          !dataTopic.isEmpty(), "No topic set for message when using dynamic 
topics.");
+      ByteString byteString = getDataFromMessage(data.getValue(), stream);
+      Windmill.PubSubMessageBundle.Builder builder =
+          outputBuilders.computeIfAbsent(
+              dataTopic,
+              topic ->
+                  context
+                      .getOutputBuilder()
+                      .addPubsubMessagesBuilder()
+                      .setTopic(topic)
+                      .setTimestampLabel(timestampLabel)
+                      .setIdLabel(idLabel)
+                      .setWithAttributes(true));
+      builder.addMessages(
+          Windmill.Message.newBuilder()
+              .setData(byteString)
+              
.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(data.getTimestamp()))
+              .build());
+      return byteString.size();
+    }
+
+    @Override
+    public void close() throws IOException {
+      outputBuilders.clear();
+    }
+
+    @Override
+    public void abort() throws IOException {
+      close();
+    }
+  }
+
+  @Override
+  public boolean supportsRestart() {
+    return true;
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSinkTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSinkTest.java
new file mode 100644
index 00000000000..71d6e649427
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSinkTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.PropertyNames;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
+import org.apache.beam.runners.dataflow.worker.windmill.Pubsub;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Unit tests for {@link PubsubSink}. */
+@RunWith(JUnit4.class)
+public class PubsubDynamicSinkTest {
+  @Mock StreamingModeExecutionContext mockContext;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void testWriteDynamicDestinations() throws Exception {
+    Windmill.WorkItemCommitRequest.Builder outputBuilder =
+        Windmill.WorkItemCommitRequest.newBuilder()
+            .setKey(ByteString.copyFromUtf8("key"))
+            .setWorkToken(0);
+
+    when(mockContext.getOutputBuilder()).thenReturn(outputBuilder);
+
+    Map<String, Object> spec = new HashMap<>();
+    spec.put(PropertyNames.OBJECT_TYPE_NAME, "PubsubDynamicSink");
+    spec.put(PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "ts");
+    spec.put(PropertyNames.PUBSUB_ID_ATTRIBUTE, "id");
+
+    CloudObject cloudSinkSpec = CloudObject.fromSpec(spec);
+    PubsubDynamicSink sink =
+        (PubsubDynamicSink)
+            SinkRegistry.defaultRegistry()
+                .create(
+                    cloudSinkSpec,
+                    WindowedValue.getFullCoder(VoidCoder.of(), 
IntervalWindow.getCoder()),
+                    null,
+                    mockContext,
+                    null)
+                .getUnderlyingSink();
+
+    Sink.SinkWriter<WindowedValue<PubsubMessage>> writer = sink.writer();
+
+    List<Windmill.Message> expectedMessages1 = Lists.newArrayList();
+    List<Windmill.Message> expectedMessages2 = Lists.newArrayList();
+    List<Windmill.Message> expectedMessages3 = Lists.newArrayList();
+
+    for (int i = 0; i < 10; ++i) {
+      int baseTimestamp = i * 10;
+      byte[] payload1 = String.format("value_%d_%d", i, 
1).getBytes(StandardCharsets.UTF_8);
+      byte[] payload2 = String.format("value_%d_%d", i, 
2).getBytes(StandardCharsets.UTF_8);
+      byte[] payload3 = String.format("value_%d_%d", i, 
3).getBytes(StandardCharsets.UTF_8);
+
+      expectedMessages1.add(
+          Windmill.Message.newBuilder()
+              .setTimestamp(baseTimestamp * 1000)
+              .setData(
+                  Pubsub.PubsubMessage.newBuilder()
+                      .setData(ByteString.copyFrom(payload1))
+                      .build()
+                      .toByteString())
+              .build());
+      expectedMessages2.add(
+          Windmill.Message.newBuilder()
+              .setTimestamp((baseTimestamp + 1) * 1000)
+              .setData(
+                  Pubsub.PubsubMessage.newBuilder()
+                      .setData(ByteString.copyFrom(payload2))
+                      .build()
+                      .toByteString())
+              .build());
+      expectedMessages3.add(
+          Windmill.Message.newBuilder()
+              .setTimestamp((baseTimestamp + 2) * 1000)
+              .setData(
+                  Pubsub.PubsubMessage.newBuilder()
+                      .setData(ByteString.copyFrom(payload3))
+                      .build()
+                      .toByteString())
+              .build());
+      writer.add(
+          WindowedValue.timestampedValueInGlobalWindow(
+              new PubsubMessage(payload1, null).withTopic("topic1"), new 
Instant(baseTimestamp)));
+      writer.add(
+          WindowedValue.timestampedValueInGlobalWindow(
+              new PubsubMessage(payload2, null).withTopic("topic2"),
+              new Instant(baseTimestamp + 1)));
+      writer.add(
+          WindowedValue.timestampedValueInGlobalWindow(
+              new PubsubMessage(payload3, null).withTopic("topic3"),
+              new Instant(baseTimestamp + 2)));
+    }
+    writer.close();
+
+    Windmill.WorkItemCommitRequest expectedCommit =
+        Windmill.WorkItemCommitRequest.newBuilder()
+            .setKey(ByteString.copyFromUtf8("key"))
+            .setWorkToken(0)
+            .addPubsubMessages(
+                Windmill.PubSubMessageBundle.newBuilder()
+                    .setTopic("topic1")
+                    .setTimestampLabel("ts")
+                    .setIdLabel("id")
+                    .setWithAttributes(true)
+                    .addAllMessages(expectedMessages1))
+            .addPubsubMessages(
+                Windmill.PubSubMessageBundle.newBuilder()
+                    .setTopic("topic2")
+                    .setTimestampLabel("ts")
+                    .setIdLabel("id")
+                    .setWithAttributes(true)
+                    .addAllMessages(expectedMessages2))
+            .addPubsubMessages(
+                Windmill.PubSubMessageBundle.newBuilder()
+                    .setTopic("topic3")
+                    .setTimestampLabel("ts")
+                    .setIdLabel("id")
+                    .setWithAttributes(true)
+                    .addAllMessages(expectedMessages3))
+            .build();
+    assertEquals(expectedCommit, outputBuilder.build());
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
index 196403dacd0..a1e883c8c02 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
@@ -85,6 +85,7 @@ public final class ExternalWrite implements 
ExternalTransformRegistrar {
       if (config.timestampAttribute != null) {
         writeBuilder.setTimestampAttribute(config.timestampAttribute);
       }
+      writeBuilder.setDynamicDestinations(false);
       return writeBuilder.build();
     }
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
new file mode 100644
index 00000000000..c082b2007aa
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import javax.naming.SizeLimitExceededException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, 
PubsubMessage> {
+  // See https://cloud.google.com/pubsub/quotas#resource_limits.
+  private static final int PUBSUB_MESSAGE_DATA_MAX_BYTES = 10 << 20;
+  private static final int PUBSUB_MESSAGE_MAX_ATTRIBUTES = 100;
+  private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_KEY_BYTES = 256;
+  private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES = 1024;
+  // The amount of bytes that each attribute entry adds up to the request
+  private static final int PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES = 
6;
+  private int maxPublishBatchSize;
+
+  private SerializableFunction<InputT, PubsubMessage> formatFunction;
+  @Nullable SerializableFunction<ValueInSingleWindow<InputT>, 
PubsubIO.PubsubTopic> topicFunction;
+
+  static int validatePubsubMessageSize(PubsubMessage message, int 
maxPublishBatchSize)
+      throws SizeLimitExceededException {
+    int payloadSize = message.getPayload().length;
+    if (payloadSize > PUBSUB_MESSAGE_DATA_MAX_BYTES) {
+      throw new SizeLimitExceededException(
+          "Pubsub message data field of length "
+              + payloadSize
+              + " exceeds maximum of "
+              + PUBSUB_MESSAGE_DATA_MAX_BYTES
+              + " bytes. See 
https://cloud.google.com/pubsub/quotas#resource_limits";);
+    }
+    int totalSize = payloadSize;
+
+    @Nullable Map<String, String> attributes = message.getAttributeMap();
+    if (attributes != null) {
+      if (attributes.size() > PUBSUB_MESSAGE_MAX_ATTRIBUTES) {
+        throw new SizeLimitExceededException(
+            "Pubsub message contains "
+                + attributes.size()
+                + " attributes which exceeds the maximum of "
+                + PUBSUB_MESSAGE_MAX_ATTRIBUTES
+                + ". See 
https://cloud.google.com/pubsub/quotas#resource_limits";);
+      }
+
+      // Consider attribute encoding overhead, so it doesn't go over the 
request limits
+      totalSize += attributes.size() * 
PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES;
+
+      for (Map.Entry<String, String> attribute : attributes.entrySet()) {
+        String key = attribute.getKey();
+        int keySize = key.getBytes(StandardCharsets.UTF_8).length;
+        if (keySize > PUBSUB_MESSAGE_ATTRIBUTE_MAX_KEY_BYTES) {
+          throw new SizeLimitExceededException(
+              "Pubsub message attribute key '"
+                  + key
+                  + "' exceeds the maximum of "
+                  + PUBSUB_MESSAGE_ATTRIBUTE_MAX_KEY_BYTES
+                  + " bytes. See 
https://cloud.google.com/pubsub/quotas#resource_limits";);
+        }
+        totalSize += keySize;
+
+        String value = attribute.getValue();
+        int valueSize = value.getBytes(StandardCharsets.UTF_8).length;
+        if (valueSize > PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES) {
+          throw new SizeLimitExceededException(
+              "Pubsub message attribute value for key '"
+                  + key
+                  + "' starting with '"
+                  + value.substring(0, Math.min(256, value.length()))
+                  + "' exceeds the maximum of "
+                  + PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES
+                  + " bytes. See 
https://cloud.google.com/pubsub/quotas#resource_limits";);
+        }
+        totalSize += valueSize;
+      }
+    }
+
+    if (totalSize > maxPublishBatchSize) {
+      throw new SizeLimitExceededException(
+          "Pubsub message of length "
+              + totalSize
+              + " exceeds maximum of "
+              + maxPublishBatchSize
+              + " bytes, when considering the payload and attributes. "
+              + "See https://cloud.google.com/pubsub/quotas#resource_limits";);
+    }
+    return totalSize;
+  }
+
+  PreparePubsubWriteDoFn(
+      SerializableFunction<InputT, PubsubMessage> formatFunction,
+      @Nullable
+          SerializableFunction<ValueInSingleWindow<InputT>, 
PubsubIO.PubsubTopic> topicFunction,
+      int maxPublishBatchSize) {
+    this.formatFunction = formatFunction;
+    this.topicFunction = topicFunction;
+    this.maxPublishBatchSize = maxPublishBatchSize;
+  }
+
+  @ProcessElement
+  public void process(
+      @Element InputT element,
+      @Timestamp Instant ts,
+      BoundedWindow window,
+      PaneInfo paneInfo,
+      OutputReceiver<PubsubMessage> o) {
+    PubsubMessage message = formatFunction.apply(element);
+    if (topicFunction != null) {
+      message =
+          message.withTopic(
+              topicFunction.apply(ValueInSingleWindow.of(element, ts, window, 
paneInfo)).asPath());
+    }
+    try {
+      validatePubsubMessageSize(message, maxPublishBatchSize);
+    } catch (SizeLimitExceededException e) {
+      throw new IllegalArgumentException(e);
+    }
+    o.output(message);
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java
index 7e27ac2ce77..3115da55cef 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java
@@ -37,6 +37,8 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.Preconditions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
@@ -103,7 +105,6 @@ public class PubSubPayloadTranslation {
 
   static class PubSubWritePayloadTranslator
       implements TransformPayloadTranslator<PubsubUnboundedSink.PubsubSink> {
-
     @Override
     public String getUrn(PubsubUnboundedSink.PubsubSink transform) {
       return PTransformTranslation.PUBSUB_WRITE;
@@ -114,7 +115,8 @@ public class PubSubPayloadTranslation {
         AppliedPTransform<?, ?, PubsubUnboundedSink.PubsubSink> transform,
         SdkComponents components) {
       PubSubWritePayload.Builder payloadBuilder = 
PubSubWritePayload.newBuilder();
-      ValueProvider<TopicPath> topicProvider = 
transform.getTransform().outer.getTopicProvider();
+      ValueProvider<TopicPath> topicProvider =
+          
Preconditions.checkStateNotNull(transform.getTransform().outer.getTopicProvider());
       if (topicProvider.isAccessible()) {
         payloadBuilder.setTopic(topicProvider.get().getFullPath());
       } else {
@@ -135,6 +137,32 @@ public class PubSubPayloadTranslation {
     }
   }
 
+  static class PubSubDynamicWritePayloadTranslator
+      implements 
TransformPayloadTranslator<PubsubUnboundedSink.PubsubDynamicSink> {
+    @Override
+    public String getUrn(PubsubUnboundedSink.PubsubDynamicSink transform) {
+      return PTransformTranslation.PUBSUB_WRITE_DYNAMIC;
+    }
+
+    @Override
+    public RunnerApi.FunctionSpec translate(
+        AppliedPTransform<?, ?, PubsubUnboundedSink.PubsubDynamicSink> 
transform,
+        SdkComponents components) {
+      PubSubWritePayload.Builder payloadBuilder = 
PubSubWritePayload.newBuilder();
+      if (transform.getTransform().outer.getTimestampAttribute() != null) {
+        payloadBuilder.setTimestampAttribute(
+            transform.getTransform().outer.getTimestampAttribute());
+      }
+      if (transform.getTransform().outer.getIdAttribute() != null) {
+        
payloadBuilder.setIdAttribute(transform.getTransform().outer.getIdAttribute());
+      }
+      return FunctionSpec.newBuilder()
+          .setUrn(getUrn(transform.getTransform()))
+          .setPayload(payloadBuilder.build().toByteString())
+          .build();
+    }
+  }
+
   @AutoService(TransformPayloadTranslatorRegistrar.class)
   public static class WriteRegistrar implements 
TransformPayloadTranslatorRegistrar {
 
@@ -142,8 +170,11 @@ public class PubSubPayloadTranslation {
     @SuppressWarnings("rawtypes")
     public Map<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
         getTransformPayloadTranslators() {
-      return Collections.singletonMap(
-          PubsubUnboundedSink.PubsubSink.class, new 
PubSubWritePayloadTranslator());
+      return ImmutableMap.of(
+          PubsubUnboundedSink.PubsubSink.class,
+          new PubSubWritePayloadTranslator(),
+          PubsubUnboundedSink.PubsubDynamicSink.class,
+          new PubSubDynamicWritePayloadTranslator());
     }
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index f075daf2c22..06d7c344a08 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -357,10 +357,10 @@ public abstract class PubsubClient implements Closeable {
   public abstract static class OutgoingMessage implements Serializable {
 
     /** Underlying Message. May not have publish timestamp set. */
-    public abstract PubsubMessage message();
+    public abstract PubsubMessage getMessage();
 
     /** Timestamp for element (ms since epoch). */
-    public abstract long timestampMsSinceEpoch();
+    public abstract long getTimestampMsSinceEpoch();
 
     /**
      * If using an id attribute, the record id to associate with this record's 
metadata so the
@@ -368,15 +368,22 @@ public abstract class PubsubClient implements Closeable {
      */
     public abstract @Nullable String recordId();
 
+    public abstract @Nullable String topic();
+
     public static OutgoingMessage of(
-        PubsubMessage message, long timestampMsSinceEpoch, @Nullable String 
recordId) {
-      return new AutoValue_PubsubClient_OutgoingMessage(message, 
timestampMsSinceEpoch, recordId);
+        PubsubMessage message,
+        long timestampMsSinceEpoch,
+        @Nullable String recordId,
+        @Nullable String topic) {
+      return new AutoValue_PubsubClient_OutgoingMessage(
+          message, timestampMsSinceEpoch, recordId, topic);
     }
 
     public static OutgoingMessage of(
         org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage message,
         long timestampMsSinceEpoch,
-        @Nullable String recordId) {
+        @Nullable String recordId,
+        @Nullable String topic) {
       PubsubMessage.Builder builder =
           
PubsubMessage.newBuilder().setData(ByteString.copyFrom(message.getPayload()));
       if (message.getAttributeMap() != null) {
@@ -385,7 +392,7 @@ public abstract class PubsubClient implements Closeable {
       if (message.getOrderingKey() != null) {
         builder.setOrderingKey(message.getOrderingKey());
       }
-      return of(builder.build(), timestampMsSinceEpoch, recordId);
+      return of(builder.build(), timestampMsSinceEpoch, recordId, topic);
     }
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
index 60c096f72f8..7d8ca3f7517 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
@@ -230,11 +230,11 @@ public class PubsubGrpcClient extends PubsubClient {
     PublishRequest.Builder request = 
PublishRequest.newBuilder().setTopic(topic.getPath());
     for (OutgoingMessage outgoingMessage : outgoingMessages) {
       PubsubMessage.Builder message =
-          
outgoingMessage.message().toBuilder().clearMessageId().clearPublishTime();
+          
outgoingMessage.getMessage().toBuilder().clearMessageId().clearPublishTime();
 
       if (timestampAttribute != null) {
         message.putAttributes(
-            timestampAttribute, 
String.valueOf(outgoingMessage.timestampMsSinceEpoch()));
+            timestampAttribute, 
String.valueOf(outgoingMessage.getTimestampMsSinceEpoch()));
       }
 
       if (idAttribute != null && 
!Strings.isNullOrEmpty(outgoingMessage.recordId())) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 7bac875e237..7dde9c98649 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -29,9 +29,9 @@ import com.google.protobuf.Message;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.naming.SizeLimitExceededException;
@@ -67,6 +67,7 @@ import org.apache.beam.sdk.transforms.WithFailures.Result;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.sdk.values.EncodableThrowable;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
@@ -74,9 +75,12 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -103,6 +107,65 @@ import org.slf4j.LoggerFactory;
  * reviewers mentioned <a
  * 
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS";>
  * here</a>.
+ *
+ * <h3>Example PubsubIO read usage</h3>
+ *
+ * <pre>{@code
+ * // Read from a specific topic; a subscription will be created at pipeline 
start time.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessages().fromTopic(topic);
+ *
+ * // Read from a subscription.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessages().fromSubscription(subscription);
+ *
+ * // Read messages including attributes. All PubSub attributes will be 
included in the PubsubMessage.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessagesWithAttributes().fromTopic(topic);
+ *
+ * // Examples of reading different types from PubSub.
+ * PCollection<String> strings = PubsubIO.readStrings().fromTopic(topic);
+ * PCollection<MyProto> protos = 
PubsubIO.readProtos(MyProto.class).fromTopic(topic);
+ * PCollection<MyType> avros = 
PubsubIO.readAvros(MyType.class).fromTopic(topic);
+ *
+ * }</pre>
+ *
+ * <h3>Example PubsubIO write usage</h3>
+ *
+ * Data can be written to a single topic or to a dynamic set of topics. In 
order to write to a
+ * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For 
example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic));
+ * protos.apply(PubsubIO.writeProtos(MyProto.class).to(topic));
+ * strings.apply(PubsubIO.writeStrings().to(topic));
+ * }</pre>
+ *
+ * Dynamic topic destinations can be accomplished by specifying a function to 
extract the topic from
+ * the record using the {@link PubsubIO.Write#to(SerializableFunction)} 
method. For example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).
+ *      to((ValueInSingleWindow<Event> quote) -> {
+ *               String country = quote.getCountry();
+ *               return "projects/myproject/topics/events_" + country;
+ *              });
+ * }</pre>
+ *
+ * Dynamic topics can also be specified by writing {@link PubsubMessage} 
objects containing the
+ * topic and writing using the {@link PubsubIO#writeMessagesDynamic()} method. 
For example:
+ *
+ * <pre>{@code
+ * events.apply(MapElements.into(new TypeDescriptor<PubsubMessage>() {})
+ *                         .via(e -> new PubsubMessage(
+ *                             e.toByteString(), 
Collections.emptyMap()).withTopic(e.getCountry())))
+ * .apply(PubsubIO.writeMessagesDynamic());
+ * }</pre>
+ *
+ * <h3>Custom timestamps</h3>
+ *
+ * All messages read from PubSub have a stable publish timestamp that is 
independent of when the
+ * message is read from the PubSub topic. By default, the publish time is used 
as the timestamp for
+ * all messages read and the watermark is based on that. If there is a 
different logical timestamp
+ * to be used, that timestamp must be published in a PubSub attribute and 
specified using {@link
+ * PubsubIO.Read#withTimestampAttribute}. See the Javadoc for that method for 
the timestamp format.
  */
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
@@ -134,19 +197,11 @@ public class PubsubIO {
 
   private static final Pattern PUBSUB_NAME_REGEXP = 
Pattern.compile("[a-zA-Z][-._~%+a-zA-Z0-9]+");
 
+  static final int PUBSUB_MESSAGE_MAX_TOTAL_SIZE = 10 << 20;
+
   private static final int PUBSUB_NAME_MIN_LENGTH = 3;
   private static final int PUBSUB_NAME_MAX_LENGTH = 255;
 
-  // See https://cloud.google.com/pubsub/quotas#resource_limits.
-  private static final int PUBSUB_MESSAGE_MAX_TOTAL_SIZE = 10 << 20;
-  private static final int PUBSUB_MESSAGE_DATA_MAX_BYTES = 10 << 20;
-  private static final int PUBSUB_MESSAGE_MAX_ATTRIBUTES = 100;
-  private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_KEY_BYTES = 256;
-  private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES = 1024;
-
-  // The amount of bytes that each attribute entry adds up to the request
-  private static final int PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES = 
6;
-
   private static final String SUBSCRIPTION_RANDOM_TEST_PREFIX = "_random/";
   private static final String SUBSCRIPTION_STARTING_SIGNAL = 
"_starting_signal/";
   private static final String TOPIC_DEV_NULL_TEST_NAME = "/topics/dev/null";
@@ -182,76 +237,6 @@ public class PubsubIO {
     }
   }
 
-  @VisibleForTesting
-  static int validateAndGetPubsubMessageSize(PubsubMessage message)
-      throws SizeLimitExceededException {
-    int payloadSize = message.getPayload().length;
-    if (payloadSize > PUBSUB_MESSAGE_DATA_MAX_BYTES) {
-      throw new SizeLimitExceededException(
-          "Pubsub message data field of length "
-              + payloadSize
-              + " exceeds maximum of "
-              + PUBSUB_MESSAGE_DATA_MAX_BYTES
-              + " bytes. See 
https://cloud.google.com/pubsub/quotas#resource_limits";);
-    }
-    int totalSize = payloadSize;
-
-    @Nullable Map<String, String> attributes = message.getAttributeMap();
-    if (attributes != null) {
-      if (attributes.size() > PUBSUB_MESSAGE_MAX_ATTRIBUTES) {
-        throw new SizeLimitExceededException(
-            "Pubsub message contains "
-                + attributes.size()
-                + " attributes which exceeds the maximum of "
-                + PUBSUB_MESSAGE_MAX_ATTRIBUTES
-                + ". See 
https://cloud.google.com/pubsub/quotas#resource_limits";);
-      }
-
-      // Consider attribute encoding overhead, so it doesn't go over the 
request limits
-      totalSize += attributes.size() * 
PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES;
-
-      for (Map.Entry<String, String> attribute : attributes.entrySet()) {
-        String key = attribute.getKey();
-        int keySize = key.getBytes(StandardCharsets.UTF_8).length;
-        if (keySize > PUBSUB_MESSAGE_ATTRIBUTE_MAX_KEY_BYTES) {
-          throw new SizeLimitExceededException(
-              "Pubsub message attribute key '"
-                  + key
-                  + "' exceeds the maximum of "
-                  + PUBSUB_MESSAGE_ATTRIBUTE_MAX_KEY_BYTES
-                  + " bytes. See 
https://cloud.google.com/pubsub/quotas#resource_limits";);
-        }
-        totalSize += keySize;
-
-        String value = attribute.getValue();
-        int valueSize = value.getBytes(StandardCharsets.UTF_8).length;
-        if (valueSize > PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES) {
-          throw new SizeLimitExceededException(
-              "Pubsub message attribute value for key '"
-                  + key
-                  + "' starting with '"
-                  + value.substring(0, Math.min(256, value.length()))
-                  + "' exceeds the maximum of "
-                  + PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES
-                  + " bytes. See 
https://cloud.google.com/pubsub/quotas#resource_limits";);
-        }
-        totalSize += valueSize;
-      }
-    }
-
-    if (totalSize > PUBSUB_MESSAGE_MAX_TOTAL_SIZE) {
-      throw new SizeLimitExceededException(
-          "Pubsub message of length "
-              + totalSize
-              + " exceeds maximum of "
-              + PUBSUB_MESSAGE_MAX_TOTAL_SIZE
-              + " bytes, when considering the payload and attributes. "
-              + "See https://cloud.google.com/pubsub/quotas#resource_limits";);
-    }
-
-    return totalSize;
-  }
-
   /** Populate common {@link DisplayData} between Pubsub source and sink. */
   private static void populateCommonDisplayData(
       DisplayData.Builder builder,
@@ -402,6 +387,22 @@ public class PubsubIO {
 
   /** Class representing a Cloud Pub/Sub Topic. */
   public static class PubsubTopic implements Serializable {
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof PubsubTopic)) {
+        return false;
+      }
+      PubsubTopic that = (PubsubTopic) o;
+      return type == that.type && project.equals(that.project) && 
topic.equals(that.topic);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(type, project, topic);
+    }
 
     private enum Type {
       NORMAL,
@@ -709,7 +710,25 @@ public class PubsubIO {
 
   /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub 
stream. */
   public static Write<PubsubMessage> writeMessages() {
-    return Write.newBuilder().build();
+    return Write.newBuilder()
+        .setTopicProvider(null)
+        .setTopicFunction(null)
+        .setDynamicDestinations(false)
+        .build();
+  }
+
+  /**
+   * Enables dynamic destination topics. The {@link PubsubMessage} elements 
are each expected to
+   * contain a destination topic, which can be set using {@link 
PubsubMessage#withTopic}. If {@link
+   * Write#to} is called, that will be used instead to generate the topic and 
the value returned by
+   * {@link PubsubMessage#getTopic} will be ignored.
+   */
+  public static Write<PubsubMessage> writeMessagesDynamic() {
+    return Write.newBuilder()
+        .setTopicProvider(null)
+        .setTopicFunction(null)
+        .setDynamicDestinations(true)
+        .build();
   }
 
   /**
@@ -720,6 +739,7 @@ public class PubsubIO {
     return Write.newBuilder(
             (String string) ->
                 new PubsubMessage(string.getBytes(StandardCharsets.UTF_8), 
ImmutableMap.of()))
+        .setDynamicDestinations(false)
         .build();
   }
 
@@ -729,7 +749,9 @@ public class PubsubIO {
    */
   public static <T extends Message> Write<T> writeProtos(Class<T> 
messageClass) {
     // TODO: Like in readProtos(), stop using ProtoCoder and instead format 
the payload directly.
-    return 
Write.newBuilder(formatPayloadUsingCoder(ProtoCoder.of(messageClass))).build();
+    return 
Write.newBuilder(formatPayloadUsingCoder(ProtoCoder.of(messageClass)))
+        .setDynamicDestinations(false)
+        .build();
   }
 
   /**
@@ -738,7 +760,9 @@ public class PubsubIO {
    */
   public static <T> Write<T> writeAvros(Class<T> clazz) {
     // TODO: Like in readAvros(), stop using AvroCoder and instead format the 
payload directly.
-    return 
Write.newBuilder(formatPayloadUsingCoder(AvroCoder.of(clazz))).build();
+    return Write.newBuilder(formatPayloadUsingCoder(AvroCoder.of(clazz)))
+        .setDynamicDestinations(false)
+        .build();
   }
 
   /** Implementation of read methods. */
@@ -1128,6 +1152,10 @@ public class PubsubIO {
 
     abstract @Nullable ValueProvider<PubsubTopic> getTopicProvider();
 
+    abstract @Nullable SerializableFunction<ValueInSingleWindow<T>, 
PubsubTopic> getTopicFunction();
+
+    abstract boolean getDynamicDestinations();
+
     abstract PubsubClient.PubsubClientFactory getPubsubClientFactory();
 
     /** the batch size for bulk submissions to pubsub. */
@@ -1164,6 +1192,11 @@ public class PubsubIO {
     abstract static class Builder<T> {
       abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> 
topicProvider);
 
+      abstract Builder<T> setTopicFunction(
+          SerializableFunction<ValueInSingleWindow<T>, PubsubTopic> 
topicFunction);
+
+      abstract Builder<T> setDynamicDestinations(boolean dynamicDestinations);
+
       abstract Builder<T> 
setPubsubClientFactory(PubsubClient.PubsubClientFactory factory);
 
       abstract Builder<T> setMaxBatchSize(Integer batchSize);
@@ -1195,6 +1228,21 @@ public class PubsubIO {
     public Write<T> to(ValueProvider<String> topic) {
       return toBuilder()
           .setTopicProvider(NestedValueProvider.of(topic, 
PubsubTopic::fromPath))
+          .setTopicFunction(null)
+          .setDynamicDestinations(false)
+          .build();
+    }
+
+    /**
+     * Provides a function to dynamically specify the target topic per 
message. Not compatible with
+     * any of the other to methods. If {@link #to} is called again specifying 
a topic, then this
+     * topicFunction will be ignored.
+     */
+    public Write<T> to(SerializableFunction<ValueInSingleWindow<T>, String> 
topicFunction) {
+      return toBuilder()
+          .setTopicProvider(null)
+          .setTopicFunction(v -> PubsubTopic.fromPath(topicFunction.apply(v)))
+          .setDynamicDestinations(true)
           .build();
     }
 
@@ -1263,13 +1311,34 @@ public class PubsubIO {
 
     @Override
     public PDone expand(PCollection<T> input) {
-      if (getTopicProvider() == null) {
-        throw new IllegalStateException("need to set the topic of a 
PubsubIO.Write transform");
+      if (getTopicProvider() == null && !getDynamicDestinations()) {
+        throw new IllegalStateException(
+            "need to set the topic of a PubsubIO.Write transform if not using "
+                + "dynamic topic destinations.");
       }
 
+      SerializableFunction<ValueInSingleWindow<T>, PubsubIO.PubsubTopic> 
topicFunction =
+          getTopicFunction();
+      if (topicFunction == null && getTopicProvider() != null) {
+        topicFunction = v -> getTopicProvider().get();
+      }
+      int maxMessageSize = PUBSUB_MESSAGE_MAX_TOTAL_SIZE;
+      if (input.isBounded() == PCollection.IsBounded.BOUNDED) {
+        maxMessageSize =
+            Math.min(
+                maxMessageSize,
+                MoreObjects.firstNonNull(
+                    getMaxBatchBytesSize(), 
MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT));
+      }
+      PCollection<PubsubMessage> pubsubMessages =
+          input
+              .apply(
+                  ParDo.of(
+                      new PreparePubsubWriteDoFn<>(getFormatFn(), 
topicFunction, maxMessageSize)))
+              .setCoder(new PubsubMessageWithTopicCoder());
       switch (input.isBounded()) {
         case BOUNDED:
-          input.apply(
+          pubsubMessages.apply(
               ParDo.of(
                   new PubsubBoundedWriter(
                       MoreObjects.firstNonNull(getMaxBatchSize(), 
MAX_PUBLISH_BATCH_SIZE),
@@ -1277,31 +1346,20 @@ public class PubsubIO {
                           getMaxBatchBytesSize(), 
MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT))));
           return PDone.in(input.getPipeline());
         case UNBOUNDED:
-          return input
-              .apply(
-                  MapElements.into(new TypeDescriptor<PubsubMessage>() {})
-                      .via(
-                          elem -> {
-                            PubsubMessage message = getFormatFn().apply(elem);
-                            try {
-                              validateAndGetPubsubMessageSize(message);
-                            } catch (SizeLimitExceededException e) {
-                              throw new IllegalArgumentException(e);
-                            }
-                            return message;
-                          }))
-              .apply(
-                  new PubsubUnboundedSink(
-                      getPubsubClientFactory(),
-                      NestedValueProvider.of(getTopicProvider(), new 
TopicPathTranslator()),
-                      getTimestampAttribute(),
-                      getIdAttribute(),
-                      100 /* numShards */,
-                      MoreObjects.firstNonNull(
-                          getMaxBatchSize(), 
PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE),
-                      MoreObjects.firstNonNull(
-                          getMaxBatchBytesSize(), 
PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES),
-                      getPubsubRootUrl()));
+          return pubsubMessages.apply(
+              new PubsubUnboundedSink(
+                  getPubsubClientFactory(),
+                  getTopicProvider() != null
+                      ? NestedValueProvider.of(getTopicProvider(), new 
TopicPathTranslator())
+                      : null,
+                  getTimestampAttribute(),
+                  getIdAttribute(),
+                  100 /* numShards */,
+                  MoreObjects.firstNonNull(
+                      getMaxBatchSize(), 
PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE),
+                  MoreObjects.firstNonNull(
+                      getMaxBatchBytesSize(), 
PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES),
+                  getPubsubRootUrl()));
       }
       throw new RuntimeException(); // cases are exhaustive.
     }
@@ -1318,10 +1376,20 @@ public class PubsubIO {
      *
      * <p>Public so can be suppressed by runners.
      */
-    public class PubsubBoundedWriter extends DoFn<T, Void> {
-      private transient List<OutgoingMessage> output;
+    public class PubsubBoundedWriter extends DoFn<PubsubMessage, Void> {
+      private class OutgoingData {
+        List<OutgoingMessage> messages;
+        long bytes;
+
+        OutgoingData() {
+          this.messages = Lists.newArrayList();
+          this.bytes = 0;
+        }
+      }
+
+      private transient Map<PubsubTopic, OutgoingData> output;
+
       private transient PubsubClient pubsubClient;
-      private transient int currentOutputBytes;
 
       private int maxPublishBatchByteSize;
       private int maxPublishBatchSize;
@@ -1337,8 +1405,7 @@ public class PubsubIO {
 
       @StartBundle
       public void startBundle(StartBundleContext c) throws IOException {
-        this.output = new ArrayList<>();
-        this.currentOutputBytes = 0;
+        this.output = Maps.newHashMap();
 
         // NOTE: idAttribute is ignored.
         this.pubsubClient =
@@ -1348,25 +1415,31 @@ public class PubsubIO {
       }
 
       @ProcessElement
-      public void processElement(ProcessContext c) throws IOException, 
SizeLimitExceededException {
-        PubsubMessage message = getFormatFn().apply(c.element());
-        int messageSize = validateAndGetPubsubMessageSize(message);
-        if (messageSize > maxPublishBatchByteSize) {
-          String msg =
-              String.format(
-                  "Pub/Sub message size (%d) exceeded maximum batch size (%d)",
-                  messageSize, maxPublishBatchByteSize);
-          throw new SizeLimitExceededException(msg);
+      public void processElement(@Element PubsubMessage message, @Timestamp 
Instant timestamp)
+          throws IOException, SizeLimitExceededException {
+        // Validate again here just as a sanity check.
+        PreparePubsubWriteDoFn.validatePubsubMessageSize(message, 
maxPublishBatchSize);
+        byte[] payload = message.getPayload();
+        int messageSize = payload.length;
+
+        PubsubTopic pubsubTopic;
+        if (getTopicProvider() != null) {
+          pubsubTopic = getTopicProvider().get();
+        } else {
+          pubsubTopic =
+              
PubsubTopic.fromPath(Preconditions.checkArgumentNotNull(message.getTopic()));
         }
-
         // Checking before adding the message stops us from violating max 
batch size or bytes
-        if (output.size() >= maxPublishBatchSize
-            || (!output.isEmpty()
-                && (currentOutputBytes + messageSize) >= 
maxPublishBatchByteSize)) {
-          publish();
+        OutgoingData currentTopicOutput =
+            output.computeIfAbsent(pubsubTopic, t -> new OutgoingData());
+        if (currentTopicOutput.messages.size() >= maxPublishBatchSize
+            || (!currentTopicOutput.messages.isEmpty()
+                && (currentTopicOutput.bytes + messageSize) >= 
maxPublishBatchByteSize)) {
+          publish(pubsubTopic, currentTopicOutput.messages);
+          currentTopicOutput.messages.clear();
+          currentTopicOutput.bytes = 0;
         }
 
-        byte[] payload = message.getPayload();
         Map<String, String> attributes = message.getAttributeMap();
         String orderingKey = message.getOrderingKey();
 
@@ -1380,29 +1453,27 @@ public class PubsubIO {
         }
 
         // NOTE: The record id is always null.
-        output.add(OutgoingMessage.of(msgBuilder.build(), 
c.timestamp().getMillis(), null));
-        currentOutputBytes += messageSize;
+        currentTopicOutput.messages.add(
+            OutgoingMessage.of(
+                msgBuilder.build(), timestamp.getMillis(), null, 
message.getTopic()));
+        currentTopicOutput.bytes += messageSize;
       }
 
       @FinishBundle
       public void finishBundle() throws IOException {
-        if (!output.isEmpty()) {
-          publish();
+        for (Map.Entry<PubsubTopic, OutgoingData> entry : output.entrySet()) {
+          publish(entry.getKey(), entry.getValue().messages);
         }
         output = null;
-        currentOutputBytes = 0;
         pubsubClient.close();
         pubsubClient = null;
       }
 
-      private void publish() throws IOException {
-        PubsubTopic topic = getTopicProvider().get();
+      private void publish(PubsubTopic topic, List<OutgoingMessage> messages) 
throws IOException {
         int n =
             pubsubClient.publish(
-                PubsubClient.topicPathFromName(topic.project, topic.topic), 
output);
-        checkState(n == output.size());
-        output.clear();
-        currentOutputBytes = 0;
+                PubsubClient.topicPathFromName(topic.project, topic.topic), 
messages);
+        checkState(n == messages.size());
       }
 
       @Override
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index ab6b6533343..140931a9f05 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -141,10 +141,10 @@ public class PubsubJsonClient extends PubsubClient {
     List<PubsubMessage> pubsubMessages = new 
ArrayList<>(outgoingMessages.size());
     for (OutgoingMessage outgoingMessage : outgoingMessages) {
       PubsubMessage pubsubMessage =
-          new 
PubsubMessage().encodeData(outgoingMessage.message().getData().toByteArray());
+          new 
PubsubMessage().encodeData(outgoingMessage.getMessage().getData().toByteArray());
       pubsubMessage.setAttributes(getMessageAttributes(outgoingMessage));
-      if (!outgoingMessage.message().getOrderingKey().isEmpty()) {
-        
pubsubMessage.setOrderingKey(outgoingMessage.message().getOrderingKey());
+      if (!outgoingMessage.getMessage().getOrderingKey().isEmpty()) {
+        
pubsubMessage.setOrderingKey(outgoingMessage.getMessage().getOrderingKey());
       }
 
       // N.B. publishTime and messageId are intentionally not set on the 
message that is published
@@ -158,13 +158,14 @@ public class PubsubJsonClient extends PubsubClient {
 
   private Map<String, String> getMessageAttributes(OutgoingMessage 
outgoingMessage) {
     Map<String, String> attributes = null;
-    if (outgoingMessage.message().getAttributesMap() == null) {
+    if (outgoingMessage.getMessage().getAttributesMap() == null) {
       attributes = new TreeMap<>();
     } else {
-      attributes = new TreeMap<>(outgoingMessage.message().getAttributesMap());
+      attributes = new 
TreeMap<>(outgoingMessage.getMessage().getAttributesMap());
     }
     if (timestampAttribute != null) {
-      attributes.put(timestampAttribute, 
String.valueOf(outgoingMessage.timestampMsSinceEpoch()));
+      attributes.put(
+          timestampAttribute, 
String.valueOf(outgoingMessage.getTimestampMsSinceEpoch()));
     }
     if (idAttribute != null && 
!Strings.isNullOrEmpty(outgoingMessage.recordId())) {
       attributes.put(idAttribute, outgoingMessage.recordId());
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java
index 549daf92657..3649c6c6d46 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java
@@ -33,6 +33,8 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 public class PubsubMessage {
   @AutoValue
   abstract static class Impl {
+    abstract @Nullable String getTopic();
+
     @SuppressWarnings("mutable")
     abstract byte[] getPayload();
 
@@ -43,11 +45,12 @@ public class PubsubMessage {
     abstract @Nullable String getOrderingKey();
 
     static Impl create(
+        @Nullable String topic,
         byte[] payload,
         @Nullable Map<String, String> attributes,
         @Nullable String messageId,
         @Nullable String orderingKey) {
-      return new AutoValue_PubsubMessage_Impl(payload, attributes, messageId, 
orderingKey);
+      return new AutoValue_PubsubMessage_Impl(topic, payload, attributes, 
messageId, orderingKey);
     }
   }
 
@@ -59,7 +62,7 @@ public class PubsubMessage {
 
   public PubsubMessage(
       byte[] payload, @Nullable Map<String, String> attributes, @Nullable 
String messageId) {
-    impl = Impl.create(payload, attributes, messageId, null);
+    impl = Impl.create(null, payload, attributes, messageId, null);
   }
 
   public PubsubMessage(
@@ -67,7 +70,25 @@ public class PubsubMessage {
       @Nullable Map<String, String> attributes,
       @Nullable String messageId,
       @Nullable String orderingKey) {
-    impl = Impl.create(payload, attributes, messageId, orderingKey);
+    impl = Impl.create(null, payload, attributes, messageId, orderingKey);
+  }
+
+  private PubsubMessage(Impl impl) {
+    this.impl = impl;
+  }
+
+  public PubsubMessage withTopic(String topic) {
+    return new PubsubMessage(
+        Impl.create(
+            topic,
+            impl.getPayload(),
+            impl.getAttributeMap(),
+            impl.getMessageId(),
+            impl.getOrderingKey()));
+  }
+
+  public @Nullable String getTopic() {
+    return impl.getTopic();
   }
 
   /** Returns the main PubSub message. */
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java
new file mode 100644
index 00000000000..d10b9a2f106
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** A coder for PubsubMessage including the topic from the PubSub server. */
+public class PubsubMessageWithTopicCoder extends CustomCoder<PubsubMessage> {
+  // A message's payload cannot be null
+  private static final Coder<byte[]> PAYLOAD_CODER = ByteArrayCoder.of();
+  private static final Coder<@Nullable Map<String, String>> ATTRIBUTES_CODER =
+      NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()));
+  private static final Coder<@Nullable String> MESSAGE_ID_CODER =
+      NullableCoder.of(StringUtf8Coder.of());
+
+  private static final Coder<@Nullable String> TOPIC_CODER = 
NullableCoder.of(StringUtf8Coder.of());
+
+  public static Coder<PubsubMessage> of(TypeDescriptor<PubsubMessage> ignored) 
{
+    return of();
+  }
+
+  public static PubsubMessageWithAttributesAndMessageIdCoder of() {
+    return new PubsubMessageWithAttributesAndMessageIdCoder();
+  }
+
+  @Override
+  public void encode(PubsubMessage value, OutputStream outStream) throws 
IOException {
+    PAYLOAD_CODER.encode(value.getPayload(), outStream);
+    ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream);
+    MESSAGE_ID_CODER.encode(value.getMessageId(), outStream);
+    TOPIC_CODER.encode(value.getTopic(), outStream);
+  }
+
+  @Override
+  public PubsubMessage decode(InputStream inStream) throws IOException {
+    byte[] payload = PAYLOAD_CODER.decode(inStream);
+    @Nullable Map<String, String> attributes = 
ATTRIBUTES_CODER.decode(inStream);
+    @Nullable String messageId = MESSAGE_ID_CODER.decode(inStream);
+    @Nullable String topic = TOPIC_CODER.decode(inStream);
+    PubsubMessage pubsubMessage = new PubsubMessage(payload, attributes, 
messageId);
+    if (topic != null) {
+      pubsubMessage = pubsubMessage.withTopic(topic);
+    }
+    return pubsubMessage;
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
index 575957c6072..9c41ff8b2f2 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java
@@ -57,6 +57,8 @@ public class PubsubTestClient extends PubsubClient implements 
Serializable {
     /** True if has been primed for a test but not yet validated. */
     boolean isActive;
 
+    boolean isPublish;
+
     /** Publish mode only: Only publish calls for this topic are allowed. */
     @Nullable TopicPath expectedTopic;
 
@@ -111,7 +113,7 @@ public class PubsubTestClient extends PubsubClient 
implements Serializable {
    * factory must be closed when the test is complete, at which point final 
validation will occur.
    */
   public static PubsubTestClientFactory createFactoryForPublish(
-      final TopicPath expectedTopic,
+      final @Nullable TopicPath expectedTopic,
       final Iterable<OutgoingMessage> expectedOutgoingMessages,
       final Iterable<OutgoingMessage> failingOutgoingMessages) {
     activate(
@@ -315,9 +317,10 @@ public class PubsubTestClient extends PubsubClient 
implements Serializable {
 
   /** Handles setting {@code STATE} values for a publishing client. */
   private static void setPublishState(
-      final TopicPath expectedTopic,
+      final @Nullable TopicPath expectedTopic,
       final Iterable<OutgoingMessage> expectedOutgoingMessages,
       final Iterable<OutgoingMessage> failingOutgoingMessages) {
+    STATE.isPublish = true;
     STATE.expectedTopic = expectedTopic;
     STATE.remainingExpectedOutgoingMessages = 
Sets.newHashSet(expectedOutgoingMessages);
     STATE.remainingFailingOutgoingMessages = 
Sets.newHashSet(failingOutgoingMessages);
@@ -422,7 +425,7 @@ public class PubsubTestClient extends PubsubClient 
implements Serializable {
   /** Return true if in publish mode. */
   private boolean inPublishMode() {
     checkState(STATE.isActive, "No test is active");
-    return STATE.expectedTopic != null;
+    return STATE.isPublish;
   }
 
   /**
@@ -452,12 +455,20 @@ public class PubsubTestClient extends PubsubClient 
implements Serializable {
   public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) 
throws IOException {
     synchronized (STATE) {
       checkState(inPublishMode(), "Can only publish in publish mode");
-      checkState(
-          topic.equals(STATE.expectedTopic),
-          "Topic %s does not match expected %s",
-          topic,
-          STATE.expectedTopic);
+      boolean isDynamic = STATE.expectedTopic == null;
+      if (!isDynamic) {
+        checkState(
+            topic.equals(STATE.expectedTopic),
+            "Topic %s does not match expected %s",
+            topic,
+            STATE.expectedTopic);
+      }
       for (OutgoingMessage outgoingMessage : outgoingMessages) {
+        if (isDynamic) {
+          checkState(outgoingMessage.topic().equals(topic.getPath()));
+        } else {
+          checkState(outgoingMessage.topic() == null);
+        }
         if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) {
           throw new RuntimeException("Simulating failure for " + 
outgoingMessage);
         }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index cc3009c7313..b53aef87bc3 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.pubsub;
 
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -46,8 +47,14 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.MapValues;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.AfterFirst;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
@@ -59,10 +66,13 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
+import org.joda.time.Instant;
 
 /**
  * A PTransform which streams messages to Pubsub.
@@ -94,17 +104,20 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
   /** Default longest delay between receiving a message and pushing it to 
Pubsub. */
   private static final Duration DEFAULT_MAX_LATENCY = 
Duration.standardSeconds(2);
 
+  /** Coder for conveying outgoing messages between internal stages. */
   /** Coder for conveying outgoing messages between internal stages. */
   private static class OutgoingMessageCoder extends 
AtomicCoder<OutgoingMessage> {
     private static final NullableCoder<String> RECORD_ID_CODER =
         NullableCoder.of(StringUtf8Coder.of());
+    private static final NullableCoder<String> TOPIC_CODER = 
NullableCoder.of(StringUtf8Coder.of());
 
     @Override
     public void encode(OutgoingMessage value, OutputStream outStream)
         throws CoderException, IOException {
-      
ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).encode(value.message(), 
outStream);
-      BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch(), outStream);
+      
ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).encode(value.getMessage(),
 outStream);
+      BigEndianLongCoder.of().encode(value.getTimestampMsSinceEpoch(), 
outStream);
       RECORD_ID_CODER.encode(value.recordId(), outStream);
+      TOPIC_CODER.encode(value.topic(), outStream);
     }
 
     @Override
@@ -113,7 +126,8 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
           
ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).decode(inStream);
       long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream);
       @Nullable String recordId = RECORD_ID_CODER.decode(inStream);
-      return OutgoingMessage.of(message, timestampMsSinceEpoch, recordId);
+      @Nullable String topic = TOPIC_CODER.decode(inStream);
+      return OutgoingMessage.of(message, timestampMsSinceEpoch, recordId, 
topic);
     }
   }
 
@@ -139,24 +153,38 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
   // 
================================================================================
 
   /** Convert elements to messages and shard them. */
-  private static class ShardFn extends DoFn<byte[], KV<Integer, 
OutgoingMessage>> {
+  private static class ShardFn<T, K> extends DoFn<T, KV<K, OutgoingMessage>> {
     private final Counter elementCounter = Metrics.counter(ShardFn.class, 
"elements");
     private final int numShards;
     private final RecordIdMethod recordIdMethod;
 
-    ShardFn(int numShards, RecordIdMethod recordIdMethod) {
+    private final SerializableFunction<T, com.google.pubsub.v1.PubsubMessage> 
toProto;
+    private final SerializableFunction<T, @Nullable String> dynamicTopicFn;
+
+    private final SerializableBiFunction<Integer, @Nullable String, K> 
keyFunction;
+
+    ShardFn(
+        int numShards,
+        RecordIdMethod recordIdMethod,
+        SerializableFunction<T, com.google.pubsub.v1.PubsubMessage> toProto,
+        SerializableFunction<T, @Nullable String> dynamicTopicFn,
+        SerializableBiFunction<Integer, @Nullable String, K> keyFunction) {
       this.numShards = numShards;
       this.recordIdMethod = recordIdMethod;
+      this.toProto = toProto;
+      this.dynamicTopicFn = dynamicTopicFn;
+      this.keyFunction = keyFunction;
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
+    public void processElement(
+        @Element T element, @Timestamp Instant timestamp, OutputReceiver<KV<K, 
OutgoingMessage>> o)
+        throws Exception {
+      com.google.pubsub.v1.PubsubMessage message = toProto.apply(element);
       elementCounter.inc();
-      com.google.pubsub.v1.PubsubMessage message =
-          com.google.pubsub.v1.PubsubMessage.parseFrom(c.element());
       byte[] elementBytes = message.getData().toByteArray();
 
-      long timestampMsSinceEpoch = c.timestamp().getMillis();
+      long timestampMsSinceEpoch = timestamp.getMillis();
       @Nullable String recordId = null;
       switch (recordIdMethod) {
         case NONE:
@@ -172,10 +200,10 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
           recordId = UUID.randomUUID().toString();
           break;
       }
-      c.output(
-          KV.of(
-              ThreadLocalRandom.current().nextInt(numShards),
-              OutgoingMessage.of(message, timestampMsSinceEpoch, recordId)));
+
+      @Nullable String topic = dynamicTopicFn.apply(element);
+      K key = 
keyFunction.apply(ThreadLocalRandom.current().nextInt(numShards), topic);
+      o.output(KV.of(key, OutgoingMessage.of(message, timestampMsSinceEpoch, 
recordId, topic)));
     }
 
     @Override
@@ -190,9 +218,9 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
   // 
================================================================================
 
   /** Publish messages to Pubsub in batches. */
-  private static class WriterFn extends DoFn<KV<Integer, 
Iterable<OutgoingMessage>>, Void> {
+  private static class WriterFn extends DoFn<Iterable<OutgoingMessage>, Void> {
     private final PubsubClientFactory pubsubFactory;
-    private final ValueProvider<TopicPath> topic;
+    private final @Nullable ValueProvider<TopicPath> topic;
     private final String timestampAttribute;
     private final String idAttribute;
     private final int publishBatchSize;
@@ -209,7 +237,7 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
 
     WriterFn(
         PubsubClientFactory pubsubFactory,
-        ValueProvider<TopicPath> topic,
+        @Nullable ValueProvider<TopicPath> topic,
         String timestampAttribute,
         String idAttribute,
         int publishBatchSize,
@@ -225,7 +253,7 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
 
     WriterFn(
         PubsubClientFactory pubsubFactory,
-        ValueProvider<TopicPath> topic,
+        @Nullable ValueProvider<TopicPath> topic,
         String timestampAttribute,
         String idAttribute,
         int publishBatchSize,
@@ -242,7 +270,18 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
 
     /** BLOCKING Send {@code messages} as a batch to Pubsub. */
     private void publishBatch(List<OutgoingMessage> messages, int bytes) 
throws IOException {
-      int n = pubsubClient.publish(topic.get(), messages);
+      Preconditions.checkState(!messages.isEmpty());
+      TopicPath topicPath;
+      if (topic != null) {
+        topicPath = topic.get();
+      } else {
+        // This is the dynamic topic destinations case. Since we first group 
by topic, we can assume
+        // that all messages in the batch have the same topic.
+        topicPath =
+            PubsubClient.topicPathFromPath(
+                
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(messages.get(0).topic()));
+      }
+      int n = pubsubClient.publish(topicPath, messages);
       checkState(
           n == messages.size(),
           "Attempted to publish %s messages but %s were successful",
@@ -256,6 +295,7 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
     @StartBundle
     public void startBundle(StartBundleContext c) throws Exception {
       checkState(pubsubClient == null, "startBundle invoked without prior 
finishBundle");
+      // TODO: Do we really need to recreate the client on every bundle?
       pubsubClient =
           pubsubFactory.newClient(
               timestampAttribute,
@@ -268,9 +308,9 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
     public void processElement(ProcessContext c) throws Exception {
       List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize);
       int bytes = 0;
-      for (OutgoingMessage message : c.element().getValue()) {
+      for (OutgoingMessage message : c.element()) {
         if (!pubsubMessages.isEmpty()
-            && bytes + message.message().getData().size() > publishBatchBytes) 
{
+            && bytes + message.getMessage().getData().size() > 
publishBatchBytes) {
           // Break large (in bytes) batches into smaller.
           // (We've already broken by batch size using the trigger below, 
though that may
           // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider 
that ok since
@@ -281,7 +321,7 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
           bytes = 0;
         }
         pubsubMessages.add(message);
-        bytes += message.message().getData().size();
+        bytes += message.getMessage().getData().size();
       }
       if (!pubsubMessages.isEmpty()) {
         // BLOCKS until published.
@@ -298,7 +338,7 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      builder.add(DisplayData.item("topic", topic));
+      builder.addIfNotNull(DisplayData.item("topic", topic));
       builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
       builder.addIfNotNull(DisplayData.item("timestampAttribute", 
timestampAttribute));
       builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute));
@@ -312,8 +352,11 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
   /** Which factory to use for creating Pubsub transport. */
   private final PubsubClientFactory pubsubFactory;
 
-  /** Pubsub topic to publish to. */
-  private final ValueProvider<TopicPath> topic;
+  /**
+   * Pubsub topic to publish to. If null, that indicates that the 
PubsubMessage instead contains the
+   * topic.
+   */
+  private final @Nullable ValueProvider<TopicPath> topic;
 
   /**
    * Pubsub metadata field holding timestamp of each element, or {@literal 
null} if should use
@@ -355,7 +398,7 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
   @VisibleForTesting
   PubsubUnboundedSink(
       PubsubClientFactory pubsubFactory,
-      ValueProvider<TopicPath> topic,
+      @Nullable ValueProvider<TopicPath> topic,
       String timestampAttribute,
       String idAttribute,
       int numShards,
@@ -458,12 +501,12 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
         pubsubRootUrl);
   }
   /** Get the topic being written to. */
-  public TopicPath getTopic() {
-    return topic.get();
+  public @Nullable TopicPath getTopic() {
+    return topic != null ? topic.get() : null;
   }
 
   /** Get the {@link ValueProvider} for the topic being written to. */
-  public ValueProvider<TopicPath> getTopicProvider() {
+  public @Nullable ValueProvider<TopicPath> getTopicProvider() {
     return topic;
   }
 
@@ -479,13 +522,79 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
 
   @Override
   public PDone expand(PCollection<PubsubMessage> input) {
-    return input
-        .apply(
-            "Output Serialized PubsubMessage Proto",
-            MapElements.into(new TypeDescriptor<byte[]>() {})
-                .via(new PubsubMessages.ParsePayloadAsPubsubMessageProto()))
-        .setCoder(ByteArrayCoder.of())
-        .apply(new PubsubSink(this));
+    if (topic != null) {
+      return input
+          .apply(
+              "Output Serialized PubsubMessage Proto",
+              MapElements.into(new TypeDescriptor<byte[]>() {})
+                  .via(new PubsubMessages.ParsePayloadAsPubsubMessageProto()))
+          .setCoder(ByteArrayCoder.of())
+          .apply(new PubsubSink(this));
+    } else {
+      // dynamic destinations.
+      return input
+          .apply(
+              "WithDynamicTopics",
+              
WithKeys.of(PubsubMessage::getTopic).withKeyType(TypeDescriptors.strings()))
+          .apply(
+              MapValues.into(new TypeDescriptor<byte[]>() {})
+                  .via(new PubsubMessages.ParsePayloadAsPubsubMessageProto()))
+          .setCoder(KvCoder.of(StringUtf8Coder.of(), ByteArrayCoder.of()))
+          .apply(new PubsubDynamicSink(this));
+    }
+  }
+
+  static class PubsubDynamicSink extends PTransform<PCollection<KV<String, 
byte[]>>, PDone> {
+    public final PubsubUnboundedSink outer;
+
+    PubsubDynamicSink(PubsubUnboundedSink outer) {
+      this.outer = outer;
+    }
+
+    @Override
+    public PDone expand(PCollection<KV<String, byte[]>> input) {
+      input
+          .apply(
+              "PubsubUnboundedSink.Window",
+              Window.<KV<String, byte[]>>into(new GlobalWindows())
+                  .triggering(
+                      Repeatedly.forever(
+                          AfterFirst.of(
+                              
AfterPane.elementCountAtLeast(outer.publishBatchSize),
+                              AfterProcessingTime.pastFirstElementInPane()
+                                  .plusDelayOf(outer.maxLatency))))
+                  .discardingFiredPanes())
+          .apply(
+              "PubsubUnboundedSink.ShardDynamicDestinations",
+              ParDo.of(
+                  new ShardFn<KV<String, byte[]>, KV<Integer, String>>(
+                      outer.numShards,
+                      outer.recordIdMethod,
+                      kv -> {
+                        try {
+                          return 
com.google.pubsub.v1.PubsubMessage.parseFrom(kv.getValue());
+                        } catch (InvalidProtocolBufferException e) {
+                          throw new RuntimeException(e);
+                        }
+                      },
+                      KV::getKey,
+                      KV::of)))
+          .setCoder(KvCoder.of(KvCoder.of(VarIntCoder.of(), 
StringUtf8Coder.of()), CODER))
+          .apply(GroupByKey.create())
+          .apply(Values.create())
+          .apply(
+              "PubsubUnboundedSink.Writer",
+              ParDo.of(
+                  new WriterFn(
+                      outer.pubsubFactory,
+                      outer.topic,
+                      outer.timestampAttribute,
+                      outer.idAttribute,
+                      outer.publishBatchSize,
+                      outer.publishBatchBytes,
+                      outer.pubsubRootUrl)));
+      return PDone.in(input.getPipeline());
+    }
   }
 
   static class PubsubSink extends PTransform<PCollection<byte[]>, PDone> {
@@ -510,9 +619,22 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
                   .discardingFiredPanes())
           .apply(
               "PubsubUnboundedSink.Shard",
-              ParDo.of(new ShardFn(outer.numShards, outer.recordIdMethod)))
+              ParDo.of(
+                  new ShardFn<>(
+                      outer.numShards,
+                      outer.recordIdMethod,
+                      m -> {
+                        try {
+                          return 
com.google.pubsub.v1.PubsubMessage.parseFrom(m);
+                        } catch (InvalidProtocolBufferException e) {
+                          throw new RuntimeException(e);
+                        }
+                      },
+                      SerializableFunctions.constant(null),
+                      (s, t) -> s)))
           .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
           .apply(GroupByKey.create())
+          .apply(Values.create())
           .apply(
               "PubsubUnboundedSink.Writer",
               ParDo.of(
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java
new file mode 100644
index 00000000000..ae0cbb82727
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PUBSUB_MESSAGE_MAX_TOTAL_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import javax.naming.SizeLimitExceededException;
+import 
org.apache.beam.repackaged.core.org.apache.commons.lang3.RandomStringUtils;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class PreparePubsubWriteDoFnTest implements Serializable {
+  @Test
+  public void testValidatePubsubMessageSizeOnlyPayload() throws 
SizeLimitExceededException {
+    byte[] data = new byte[1024];
+    PubsubMessage message = new PubsubMessage(data, null);
+
+    int messageSize =
+        PreparePubsubWriteDoFn.validatePubsubMessageSize(message, 
PUBSUB_MESSAGE_MAX_TOTAL_SIZE);
+
+    assertEquals(data.length, messageSize);
+  }
+
+  @Test
+  public void testValidatePubsubMessageSizePayloadAndAttributes()
+      throws SizeLimitExceededException {
+    byte[] data = new byte[1024];
+    String attributeKey = "key";
+    String attributeValue = "value";
+    Map<String, String> attributes = ImmutableMap.of(attributeKey, 
attributeValue);
+    PubsubMessage message = new PubsubMessage(data, attributes);
+
+    int messageSize =
+        PreparePubsubWriteDoFn.validatePubsubMessageSize(message, 
PUBSUB_MESSAGE_MAX_TOTAL_SIZE);
+
+    assertEquals(
+        data.length
+            + 6 // PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES
+            + attributeKey.getBytes(StandardCharsets.UTF_8).length
+            + attributeValue.getBytes(StandardCharsets.UTF_8).length,
+        messageSize);
+  }
+
+  @Test
+  public void testValidatePubsubMessageSizePayloadTooLarge() {
+    byte[] data = new byte[(10 << 20) + 1];
+    PubsubMessage message = new PubsubMessage(data, null);
+
+    assertThrows(
+        SizeLimitExceededException.class,
+        () ->
+            PreparePubsubWriteDoFn.validatePubsubMessageSize(
+                message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE));
+  }
+
+  @Test
+  public void testValidatePubsubMessageSizePayloadPlusAttributesTooLarge() {
+    byte[] data = new byte[(10 << 20)];
+    String attributeKey = "key";
+    String attributeValue = "value";
+    Map<String, String> attributes = ImmutableMap.of(attributeKey, 
attributeValue);
+    PubsubMessage message = new PubsubMessage(data, attributes);
+
+    assertThrows(
+        SizeLimitExceededException.class,
+        () ->
+            PreparePubsubWriteDoFn.validatePubsubMessageSize(
+                message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE));
+  }
+
+  @Test
+  public void testValidatePubsubMessageSizeAttributeKeyTooLarge() {
+    byte[] data = new byte[1024];
+    String attributeKey = RandomStringUtils.randomAscii(257);
+    String attributeValue = "value";
+    Map<String, String> attributes = ImmutableMap.of(attributeKey, 
attributeValue);
+    PubsubMessage message = new PubsubMessage(data, attributes);
+
+    assertThrows(
+        SizeLimitExceededException.class,
+        () ->
+            PreparePubsubWriteDoFn.validatePubsubMessageSize(
+                message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE));
+  }
+
+  @Test
+  public void testValidatePubsubMessageSizeAttributeValueTooLarge() {
+    byte[] data = new byte[1024];
+    String attributeKey = "key";
+    String attributeValue = RandomStringUtils.randomAscii(1025);
+    Map<String, String> attributes = ImmutableMap.of(attributeKey, 
attributeValue);
+    PubsubMessage message = new PubsubMessage(data, attributes);
+
+    assertThrows(
+        SizeLimitExceededException.class,
+        () ->
+            PreparePubsubWriteDoFn.validatePubsubMessageSize(
+                message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE));
+  }
+
+  @Test
+  public void testValidatePubsubMessagePayloadTooLarge() {
+    byte[] data = new byte[(10 << 20) + 1];
+    PubsubMessage message = new PubsubMessage(data, null);
+
+    assertThrows(
+        SizeLimitExceededException.class,
+        () ->
+            PreparePubsubWriteDoFn.validatePubsubMessageSize(
+                message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE));
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java
index 75f484d9d29..02e424bc1c8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PValues;
@@ -52,6 +53,8 @@ public class PubSubWritePayloadTranslationTest {
   private static final TopicPath TOPIC = 
PubsubClient.topicPathFromName("testProject", "testTopic");
   private final PubSubPayloadTranslation.PubSubWritePayloadTranslator 
sinkTranslator =
       new PubSubWritePayloadTranslator();
+  private final PubSubPayloadTranslation.PubSubDynamicWritePayloadTranslator 
dynamicSinkTranslator =
+      new PubSubPayloadTranslation.PubSubDynamicWritePayloadTranslator();
 
   @Rule public TestPipeline pipeline = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
@@ -92,6 +95,44 @@ public class PubSubWritePayloadTranslationTest {
     assertEquals(ID_ATTRIBUTE, payload.getIdAttribute());
   }
 
+  @Test
+  public void testTranslateDynamicSink() throws Exception {
+    PubsubUnboundedSink pubsubUnboundedSink =
+        new PubsubUnboundedSink(
+            null,
+            StaticValueProvider.of(TOPIC),
+            TIMESTAMP_ATTRIBUTE,
+            ID_ATTRIBUTE,
+            0,
+            0,
+            0,
+            Duration.ZERO,
+            null,
+            null);
+    PubsubUnboundedSink.PubsubDynamicSink pubsubSink =
+        new PubsubUnboundedSink.PubsubDynamicSink(pubsubUnboundedSink);
+    PCollection<KV<String, byte[]>> input = 
pipeline.apply(Create.of(KV.of("foo", new byte[0])));
+    PDone output = input.apply(pubsubSink);
+    AppliedPTransform<?, ?, PubsubUnboundedSink.PubsubDynamicSink> 
appliedPTransform =
+        AppliedPTransform.of(
+            "sink",
+            PValues.expandInput(input),
+            PValues.expandOutput(output),
+            pubsubSink,
+            ResourceHints.create(),
+            pipeline);
+    SdkComponents components = SdkComponents.create();
+    
components.registerEnvironment(Environments.createDockerEnvironment("java"));
+    RunnerApi.FunctionSpec spec = 
dynamicSinkTranslator.translate(appliedPTransform, components);
+
+    assertEquals(PTransformTranslation.PUBSUB_WRITE_DYNAMIC, spec.getUrn());
+    PubSubWritePayload payload = 
PubSubWritePayload.parseFrom(spec.getPayload());
+    assertEquals("", payload.getTopic());
+    assertTrue(payload.getTopicRuntimeOverridden().isEmpty());
+    assertEquals(TIMESTAMP_ATTRIBUTE, payload.getTimestampAttribute());
+    assertEquals(ID_ATTRIBUTE, payload.getIdAttribute());
+  }
+
   @Test
   public void testTranslateSinkWithTopicOverridden() throws Exception {
     ValueProvider<TopicPath> runtimeProvider = pipeline.newProvider(TOPIC);
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
index ce70f4f4079..022608c87d8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java
@@ -269,7 +269,8 @@ public class PubsubGrpcClientTest {
                   .putAllAttributes(ATTRIBUTES)
                   .build(),
               MESSAGE_TIME_MS,
-              RECORD_ID);
+              RECORD_ID,
+              null);
       int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
       assertEquals(1, n);
       assertEquals(expectedRequest, 
Iterables.getOnlyElement(requestsReceived));
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
index 89ac47244ea..620f5b22806 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
@@ -150,7 +150,7 @@ public class PubsubIOExternalTest {
     RunnerApi.PTransform transform = result.getTransform();
     assertThat(
         transform.getSubtransformsList(),
-        Matchers.hasItem(MatchesPattern.matchesPattern(".*MapElements.*")));
+        
Matchers.hasItem(MatchesPattern.matchesPattern(".*PreparePubsubWrite.*")));
     assertThat(
         transform.getSubtransformsList(),
         
Matchers.hasItem(MatchesPattern.matchesPattern(".*PubsubUnboundedSink.*")));
@@ -167,7 +167,7 @@ public class PubsubIOExternalTest {
 
     // 
test_namespacetest/PubsubUnboundedSink/PubsubSink/PubsubUnboundedSink.Writer
     RunnerApi.PTransform writeComposite3 =
-        
result.getComponents().getTransformsOrThrow(writeComposite2.getSubtransforms(3));
+        
result.getComponents().getTransformsOrThrow(writeComposite2.getSubtransforms(4));
 
     // 
test_namespacetest/PubsubUnboundedSink/PubsubSink/PubsubUnboundedSink.Writer/ParMultiDo(Writer)
     RunnerApi.PTransform writeParDo =
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index c7b3ce764a7..9c9b1abfc3e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -27,7 +27,6 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
 
 import com.google.api.client.util.Clock;
 import com.google.protobuf.ByteString;
@@ -36,14 +35,13 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
-import javax.naming.SizeLimitExceededException;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.AvroSchema;
@@ -67,22 +65,23 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
-import org.apache.commons.lang3.RandomStringUtils;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
+import org.joda.time.Instant;
 import org.junit.After;
 import org.junit.Rule;
 import org.junit.Test;
@@ -361,7 +360,7 @@ public class PubsubIOTest {
   private static final TopicPath TOPIC =
       PubsubClient.topicPathFromName("test-project", "testTopic");
   private static final Clock CLOCK = (Clock & Serializable) () -> 673L;
-  transient TestPipeline readPipeline;
+  transient TestPipeline pipeline;
 
   private static final String SCHEMA_STRING =
       "{\"namespace\": \"example.avro\",\n"
@@ -391,8 +390,8 @@ public class PubsubIOTest {
                 public void evaluate() throws Throwable {
                   options = TestPipeline.testingPipelineOptions();
                   options.as(PubsubOptions.class).setProject("test-project");
-                  readPipeline = TestPipeline.fromOptions(options);
-                  readPipeline.apply(base, description).evaluate();
+                  pipeline = TestPipeline.fromOptions(options);
+                  pipeline.apply(base, description).evaluate();
                 }
               };
           return withPipeline;
@@ -457,13 +456,14 @@ public class PubsubIOTest {
                     .putAttributes("pubsubMessageId", "<null>")
                     .build(),
                 1234L,
+                null,
                 null));
     clientFactory =
         PubsubTestClient.createFactoryForPullAndPublish(
             SUBSCRIPTION, TOPIC, CLOCK, 60, expectedReads, expectedWrites, 
ImmutableList.of());
 
     PCollection<String> read =
-        readPipeline.apply(
+        pipeline.apply(
             PubsubIO.readStrings()
                 .fromSubscription(SUBSCRIPTION.getPath())
                 .withDeadLetterTopic(TOPIC.getPath())
@@ -478,7 +478,7 @@ public class PubsubIOTest {
                         TypeDescriptors.strings())));
 
     PAssert.that(read).empty();
-    readPipeline.run();
+    pipeline.run();
   }
 
   @Test
@@ -491,13 +491,13 @@ public class PubsubIOTest {
             Primitive.newBuilder().setPrimitiveString("Hello, 
World!").build());
     setupTestClient(inputs, coder);
     PCollection<Primitive> read =
-        readPipeline.apply(
+        pipeline.apply(
             PubsubIO.readProtos(Primitive.class)
                 .fromSubscription(SUBSCRIPTION.getPath())
                 .withClock(CLOCK)
                 .withClientFactory(clientFactory));
     PAssert.that(read).containsInAnyOrder(inputs);
-    readPipeline.run();
+    pipeline.run();
   }
 
   @Test
@@ -513,7 +513,7 @@ public class PubsubIOTest {
     ProtoDomain domain = ProtoDomain.buildFrom(Primitive.getDescriptor());
     String name = Primitive.getDescriptor().getFullName();
     PCollection<Primitive> read =
-        readPipeline
+        pipeline
             .apply(
                 PubsubIO.readProtoDynamicMessages(domain, name)
                     .fromSubscription(SUBSCRIPTION.getPath())
@@ -535,7 +535,7 @@ public class PubsubIOTest {
                         }));
 
     PAssert.that(read).containsInAnyOrder(inputs);
-    readPipeline.run();
+    pipeline.run();
   }
 
   @Test
@@ -549,7 +549,7 @@ public class PubsubIOTest {
     setupTestClient(inputs, coder);
 
     PCollection<Primitive> read =
-        readPipeline
+        pipeline
             .apply(
                 PubsubIO.readProtoDynamicMessages(Primitive.getDescriptor())
                     .fromSubscription(SUBSCRIPTION.getPath())
@@ -568,7 +568,7 @@ public class PubsubIOTest {
                         }));
 
     PAssert.that(read).containsInAnyOrder(inputs);
-    readPipeline.run();
+    pipeline.run();
   }
 
   @Test
@@ -581,13 +581,13 @@ public class PubsubIOTest {
             new AvroGeneratedUser("Ted", null, "white"));
     setupTestClient(inputs, coder);
     PCollection<GenericRecord> read =
-        readPipeline.apply(
+        pipeline.apply(
             PubsubIO.readAvroGenericRecords(SCHEMA)
                 .fromSubscription(SUBSCRIPTION.getPath())
                 .withClock(CLOCK)
                 .withClientFactory(clientFactory));
     PAssert.that(read).containsInAnyOrder(inputs);
-    readPipeline.run();
+    pipeline.run();
   }
 
   @Test
@@ -601,13 +601,13 @@ public class PubsubIOTest {
                 2, "bar", new DateTime().withDate(1986, 10, 
1).withZone(DateTimeZone.UTC)));
     setupTestClient(inputs, coder);
     PCollection<GenericClass> read =
-        readPipeline.apply(
+        pipeline.apply(
             PubsubIO.readAvrosWithBeamSchema(GenericClass.class)
                 .fromSubscription(SUBSCRIPTION.getPath())
                 .withClock(CLOCK)
                 .withClientFactory(clientFactory));
     PAssert.that(read).containsInAnyOrder(inputs);
-    readPipeline.run();
+    pipeline.run();
   }
 
   @Test
@@ -620,13 +620,13 @@ public class PubsubIOTest {
             new AvroGeneratedUser("Ted", null, "white"));
     setupTestClient(inputs, coder);
     PCollection<AvroGeneratedUser> read =
-        readPipeline.apply(
+        pipeline.apply(
             PubsubIO.readAvrosWithBeamSchema(AvroGeneratedUser.class)
                 .fromSubscription(SUBSCRIPTION.getPath())
                 .withClock(CLOCK)
                 .withClientFactory(clientFactory));
     PAssert.that(read).containsInAnyOrder(inputs);
-    readPipeline.run();
+    pipeline.run();
   }
 
   @Test
@@ -663,7 +663,7 @@ public class PubsubIOTest {
     setupTestClient(inputs, coder);
 
     PCollection<String> read =
-        readPipeline.apply(
+        pipeline.apply(
             PubsubIO.readMessagesWithCoderAndParseFn(
                     StringUtf8Coder.of(), new StringPayloadParseFn())
                 .fromSubscription(SUBSCRIPTION.getPath())
@@ -672,89 +672,74 @@ public class PubsubIOTest {
 
     List<String> outputs = ImmutableList.of("foo", "bar");
     PAssert.that(read).containsInAnyOrder(outputs);
-    readPipeline.run();
+    pipeline.run();
   }
 
   @Test
-  public void testValidatePubsubMessageSizeOnlyPayload() throws 
SizeLimitExceededException {
-    byte[] data = new byte[1024];
-    PubsubMessage message = new PubsubMessage(data, null);
-
-    int messageSize = PubsubIO.validateAndGetPubsubMessageSize(message);
-
-    assertEquals(data.length, messageSize);
-  }
-
-  @Test
-  public void testValidatePubsubMessageSizePayloadAndAttributes()
-      throws SizeLimitExceededException {
-    byte[] data = new byte[1024];
-    String attributeKey = "key";
-    String attributeValue = "value";
-    Map<String, String> attributes = ImmutableMap.of(attributeKey, 
attributeValue);
-    PubsubMessage message = new PubsubMessage(data, attributes);
-
-    int messageSize = PubsubIO.validateAndGetPubsubMessageSize(message);
-
-    assertEquals(
-        data.length
-            + 6 // PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES
-            + attributeKey.getBytes(StandardCharsets.UTF_8).length
-            + attributeValue.getBytes(StandardCharsets.UTF_8).length,
-        messageSize);
-  }
-
-  @Test
-  public void testValidatePubsubMessageSizePayloadTooLarge() {
-    byte[] data = new byte[(10 << 20) + 1];
-    PubsubMessage message = new PubsubMessage(data, null);
-
-    assertThrows(
-        SizeLimitExceededException.class, () -> 
PubsubIO.validateAndGetPubsubMessageSize(message));
-  }
-
-  @Test
-  public void testValidatePubsubMessageSizePayloadPlusAttributesTooLarge() {
-    byte[] data = new byte[(10 << 20)];
-    String attributeKey = "key";
-    String attributeValue = "value";
-    Map<String, String> attributes = ImmutableMap.of(attributeKey, 
attributeValue);
-    PubsubMessage message = new PubsubMessage(data, attributes);
-
-    assertThrows(
-        SizeLimitExceededException.class, () -> 
PubsubIO.validateAndGetPubsubMessageSize(message));
+  public void testDynamicTopicsBounded() throws IOException {
+    testDynamicTopics(true);
   }
 
   @Test
-  public void testValidatePubsubMessageSizeAttributeKeyTooLarge() {
-    byte[] data = new byte[1024];
-    String attributeKey = RandomStringUtils.randomAscii(257);
-    String attributeValue = "value";
-    Map<String, String> attributes = ImmutableMap.of(attributeKey, 
attributeValue);
-    PubsubMessage message = new PubsubMessage(data, attributes);
-
-    assertThrows(
-        SizeLimitExceededException.class, () -> 
PubsubIO.validateAndGetPubsubMessageSize(message));
+  public void testDynamicTopicsUnbounded() throws IOException {
+    testDynamicTopics(false);
   }
 
-  @Test
-  public void testValidatePubsubMessageSizeAttributeValueTooLarge() {
-    byte[] data = new byte[1024];
-    String attributeKey = "key";
-    String attributeValue = RandomStringUtils.randomAscii(1025);
-    Map<String, String> attributes = ImmutableMap.of(attributeKey, 
attributeValue);
-    PubsubMessage message = new PubsubMessage(data, attributes);
-
-    assertThrows(
-        SizeLimitExceededException.class, () -> 
PubsubIO.validateAndGetPubsubMessageSize(message));
-  }
-
-  @Test
-  public void testValidatePubsubMessagePayloadTooLarge() {
-    byte[] data = new byte[(10 << 20) + 1];
-    PubsubMessage message = new PubsubMessage(data, null);
-
-    assertThrows(
-        SizeLimitExceededException.class, () -> 
PubsubIO.validateAndGetPubsubMessageSize(message));
+  public void testDynamicTopics(boolean isBounded) throws IOException {
+    List<OutgoingMessage> expectedOutgoing =
+        ImmutableList.of(
+            OutgoingMessage.of(
+                com.google.pubsub.v1.PubsubMessage.newBuilder()
+                    .setData(ByteString.copyFromUtf8("0"))
+                    .build(),
+                0,
+                null,
+                "projects/project/topics/topic1"),
+            OutgoingMessage.of(
+                com.google.pubsub.v1.PubsubMessage.newBuilder()
+                    .setData(ByteString.copyFromUtf8("1"))
+                    .build(),
+                1,
+                null,
+                "projects/project/topics/topic1"),
+            OutgoingMessage.of(
+                com.google.pubsub.v1.PubsubMessage.newBuilder()
+                    .setData(ByteString.copyFromUtf8("2"))
+                    .build(),
+                2,
+                null,
+                "projects/project/topics/topic2"),
+            OutgoingMessage.of(
+                com.google.pubsub.v1.PubsubMessage.newBuilder()
+                    .setData(ByteString.copyFromUtf8("3"))
+                    .build(),
+                3,
+                null,
+                "projects/project/topics/topic2"));
+
+    try (PubsubTestClientFactory factory =
+        PubsubTestClient.createFactoryForPublish(null, expectedOutgoing, 
ImmutableList.of())) {
+      List<TimestampedValue<PubsubMessage>> pubsubMessages =
+          expectedOutgoing.stream()
+              .map(
+                  o ->
+                      TimestampedValue.of(
+                          new PubsubMessage(
+                                  o.getMessage().getData().toByteArray(),
+                                  Collections.emptyMap(),
+                                  o.recordId())
+                              .withTopic(o.topic()),
+                          Instant.ofEpochMilli(o.getTimestampMsSinceEpoch())))
+              .collect(Collectors.toList());
+
+      PCollection<PubsubMessage> messages =
+          pipeline.apply(
+              Create.timestamped(pubsubMessages).withCoder(new 
PubsubMessageWithTopicCoder()));
+      if (!isBounded) {
+        messages = 
messages.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+      }
+      
messages.apply(PubsubIO.writeMessagesDynamic().withClientFactory(factory));
+      pipeline.run();
+    }
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
index e815df25896..5de8f68aee8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
@@ -219,7 +219,8 @@ public class PubsubJsonClientTest {
                 .setOrderingKey(ORDERING_KEY)
                 .build(),
             MESSAGE_TIME,
-            RECORD_ID);
+            RECORD_ID,
+            null);
     int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
     assertEquals(1, n);
   }
@@ -247,7 +248,8 @@ public class PubsubJsonClientTest {
                 .setData(ByteString.copyFromUtf8(DATA))
                 .build(),
             MESSAGE_TIME,
-            RECORD_ID);
+            RECORD_ID,
+            null);
     int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
     assertEquals(1, n);
   }
@@ -278,7 +280,8 @@ public class PubsubJsonClientTest {
                 .putAllAttributes(attrs)
                 .build(),
             MESSAGE_TIME,
-            RECORD_ID);
+            RECORD_ID,
+            null);
     int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
     assertEquals(1, n);
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
index b0746392d99..aeeca762c56 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java
@@ -116,7 +116,8 @@ public class PubsubTestClientTest {
         OutgoingMessage.of(
             
PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(DATA)).build(),
             MESSAGE_TIME,
-            MESSAGE_ID);
+            MESSAGE_ID,
+            null);
     try (PubsubTestClientFactory factory =
         PubsubTestClient.createFactoryForPublish(
             TOPIC, Sets.newHashSet(expectedOutgoingMessage), 
ImmutableList.of())) {
@@ -134,7 +135,8 @@ public class PubsubTestClientTest {
         
PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(DATA)).build();
     IncomingMessage expectedIncomingMessage =
         IncomingMessage.of(message, MESSAGE_TIME, REQ_TIME, ACK_ID, 
MESSAGE_ID);
-    OutgoingMessage expectedOutgoingMessage = OutgoingMessage.of(message, 
MESSAGE_TIME, MESSAGE_ID);
+    OutgoingMessage expectedOutgoingMessage =
+        OutgoingMessage.of(message, MESSAGE_TIME, MESSAGE_ID, null);
 
     try (PubsubTestClientFactory factory =
         PubsubTestClient.createFactoryForPullAndPublish(
@@ -164,7 +166,8 @@ public class PubsubTestClientTest {
             OutgoingMessage.of(
                 incomingMessage.message(),
                 incomingMessage.timestampMsSinceEpoch(),
-                incomingMessage.recordId());
+                incomingMessage.recordId(),
+                null);
         client.publish(TOPIC, ImmutableList.of(actualOutgoingMessage));
       }
     }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
index 418f65551e1..f79732e7427 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
@@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
 import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
@@ -34,6 +35,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.TimestampedValue;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
@@ -89,7 +91,8 @@ public class PubsubUnboundedSinkTest implements Serializable {
                 .setData(ByteString.copyFromUtf8(DATA))
                 .build(),
             TIMESTAMP,
-            getRecordId(DATA));
+            getRecordId(DATA),
+            null);
     CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message);
     CoderProperties.coderSerializable(PubsubUnboundedSink.CODER);
   }
@@ -104,7 +107,8 @@ public class PubsubUnboundedSinkTest implements 
Serializable {
                     .putAllAttributes(ATTRIBUTES)
                     .build(),
                 TIMESTAMP,
-                getRecordId(DATA)));
+                getRecordId(DATA),
+                null));
     int batchSize = 1;
     int batchBytes = 1;
     try (PubsubTestClientFactory factory =
@@ -137,7 +141,8 @@ public class PubsubUnboundedSinkTest implements 
Serializable {
                     .setData(ByteString.copyFromUtf8(DATA))
                     .build(),
                 TIMESTAMP,
-                getRecordId(DATA)));
+                getRecordId(DATA),
+                null));
     try (PubsubTestClientFactory factory =
         PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, 
ImmutableList.of())) {
       PubsubUnboundedSink sink =
@@ -161,6 +166,71 @@ public class PubsubUnboundedSinkTest implements 
Serializable {
     // message does not match the expected publish message.
   }
 
+  @Test
+  public void testDynamicTopics() throws IOException {
+    List<OutgoingMessage> outgoing =
+        ImmutableList.of(
+            OutgoingMessage.of(
+                com.google.pubsub.v1.PubsubMessage.newBuilder()
+                    .setData(ByteString.copyFromUtf8(DATA + "0"))
+                    .build(),
+                TIMESTAMP,
+                getRecordId(DATA + "0"),
+                "topic1"),
+            OutgoingMessage.of(
+                com.google.pubsub.v1.PubsubMessage.newBuilder()
+                    .setData(ByteString.copyFromUtf8(DATA + "1"))
+                    .build(),
+                TIMESTAMP + 1,
+                getRecordId(DATA + "1"),
+                "topic1"),
+            OutgoingMessage.of(
+                com.google.pubsub.v1.PubsubMessage.newBuilder()
+                    .setData(ByteString.copyFromUtf8(DATA + "2"))
+                    .build(),
+                TIMESTAMP + 2,
+                getRecordId(DATA + "2"),
+                "topic2"),
+            OutgoingMessage.of(
+                com.google.pubsub.v1.PubsubMessage.newBuilder()
+                    .setData(ByteString.copyFromUtf8(DATA + "3"))
+                    .build(),
+                TIMESTAMP + 3,
+                getRecordId(DATA + "3"),
+                "topic2"));
+    try (PubsubTestClientFactory factory =
+        PubsubTestClient.createFactoryForPublish(null, outgoing, 
ImmutableList.of())) {
+      PubsubUnboundedSink sink =
+          new PubsubUnboundedSink(
+              factory,
+              null,
+              TIMESTAMP_ATTRIBUTE,
+              ID_ATTRIBUTE,
+              NUM_SHARDS,
+              1 /* batchSize */,
+              1 /* batchBytes */,
+              Duration.standardSeconds(2),
+              RecordIdMethod.DETERMINISTIC,
+              null);
+
+      List<TimestampedValue<PubsubMessage>> pubsubMessages =
+          outgoing.stream()
+              .map(
+                  o ->
+                      TimestampedValue.of(
+                          new 
PubsubMessage(o.getMessage().getData().toByteArray(), null)
+                              .withTopic(o.topic()),
+                          Instant.ofEpochMilli(o.getTimestampMsSinceEpoch())))
+              .collect(Collectors.toList());
+
+      p.apply(Create.timestamped(pubsubMessages).withCoder(new 
PubsubMessageWithTopicCoder()))
+          .apply(sink);
+      p.run();
+    }
+    // The PubsubTestClientFactory will assert fail on close if the actual 
published
+    // message does not match the expected publish message.
+  }
+
   @Test
   public void sendMoreThanOneBatchByNumMessages() throws IOException {
     List<OutgoingMessage> outgoing = new ArrayList<>();
@@ -175,7 +245,8 @@ public class PubsubUnboundedSinkTest implements 
Serializable {
                   .setData(ByteString.copyFromUtf8(str))
                   .build(),
               TIMESTAMP,
-              getRecordId(str)));
+              getRecordId(str),
+              null));
       data.add(str);
     }
     try (PubsubTestClientFactory factory =
@@ -218,7 +289,8 @@ public class PubsubUnboundedSinkTest implements 
Serializable {
                   .setData(ByteString.copyFromUtf8(str))
                   .build(),
               TIMESTAMP,
-              getRecordId(str)));
+              getRecordId(str),
+              null));
       data.add(str);
       n += str.length();
     }

Reply via email to