reuvenlax commented on code in PR #26063:
URL: https://github.com/apache/beam/pull/26063#discussion_r1170485518


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java:
##########
@@ -357,26 +357,33 @@ public static TopicPath topicPathFromName(String 
projectId, String topicName) {
   public abstract static class OutgoingMessage implements Serializable {
 
     /** Underlying Message. May not have publish timestamp set. */
-    public abstract PubsubMessage message();
+    public abstract PubsubMessage getMessage();
 
     /** Timestamp for element (ms since epoch). */
-    public abstract long timestampMsSinceEpoch();
+    public abstract long getTimestampMsSinceEpoch();
 
     /**
      * If using an id attribute, the record id to associate with this record's 
metadata so the
      * receiver can reject duplicates. Otherwise {@literal null}.
      */
     public abstract @Nullable String recordId();
 
+    public abstract @Nullable String topic();
+
     public static OutgoingMessage of(
-        PubsubMessage message, long timestampMsSinceEpoch, @Nullable String 
recordId) {
-      return new AutoValue_PubsubClient_OutgoingMessage(message, 
timestampMsSinceEpoch, recordId);
+        PubsubMessage message,
+        long timestampMsSinceEpoch,
+        @Nullable String recordId,
+        String topic) {
+      return new AutoValue_PubsubClient_OutgoingMessage(
+          message, timestampMsSinceEpoch, recordId, topic);
     }
 
     public static OutgoingMessage of(
         org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage message,
         long timestampMsSinceEpoch,
-        @Nullable String recordId) {
+        @Nullable String recordId,
+        String topic) {

Review Comment:
   done



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -103,6 +104,64 @@
  * reviewers mentioned <a
  * 
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS";>
  * here</a>.
+ *
+ * <h3>Example PubsubIO read usage</h3>
+ *
+ * <pre>{@code
+ * // Read from a specific topic; a subscription will be created at pipeline 
start time.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessages().fromTopic(topic);
+ *
+ * // Read from a subscription.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessages().fromSubscription(subscription);
+ *
+ * // Read messages including attributes. All PubSub attributes will be 
included in the PubsubMessage.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessagesWithAttributes().fromTopic(topic);
+ *
+ * // Examples of reading different types from PubSub.
+ * PCollection<String> strings = PubsubIO.readStrings().fromTopic(topic);
+ * PCollection<MyProto> protos = 
PubsubIO.readProtos(MyProto.class).fromTopic(topic);
+ * PCollection<MyType> avros = 
PubsubIO.readAvros(MyType.class).fromTopic(topic);
+ *
+ * }</pre>
+ *
+ * <h3>Example PubsubIO write usage</h3>
+ *
+ * Data can be written to a single topic or to a dynamic set of topics. In 
order to write to a
+ * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For 
example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic));

Review Comment:
   done



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -103,6 +104,64 @@
  * reviewers mentioned <a
  * 
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS";>
  * here</a>.
+ *
+ * <h3>Example PubsubIO read usage</h3>
+ *
+ * <pre>{@code
+ * // Read from a specific topic; a subscription will be created at pipeline 
start time.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessages().fromTopic(topic);
+ *
+ * // Read from a subscription.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessages().fromSubscription(subscription);
+ *
+ * // Read messages including attributes. All PubSub attributes will be 
included in the PubsubMessage.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessagesWithAttributes().fromTopic(topic);
+ *
+ * // Examples of reading different types from PubSub.
+ * PCollection<String> strings = PubsubIO.readStrings().fromTopic(topic);
+ * PCollection<MyProto> protos = 
PubsubIO.readProtos(MyProto.class).fromTopic(topic);
+ * PCollection<MyType> avros = 
PubsubIO.readAvros(MyType.class).fromTopic(topic);
+ *
+ * }</pre>
+ *
+ * <h3>Example PubsubIO write usage</h3>
+ *
+ * Data can be written to a single topic or to a dynamic set of topics. In 
order to write to a
+ * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For 
example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic));
+ * }</pre>
+ *
+ * Dynamic topic destinations can be accomplished by specifying a function to 
extract the topic from

Review Comment:
   done



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -103,6 +104,64 @@
  * reviewers mentioned <a
  * 
href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS";>
  * here</a>.
+ *
+ * <h3>Example PubsubIO read usage</h3>
+ *
+ * <pre>{@code
+ * // Read from a specific topic; a subscription will be created at pipeline 
start time.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessages().fromTopic(topic);
+ *
+ * // Read from a subscription.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessages().fromSubscription(subscription);
+ *
+ * // Read messages including attributes. All PubSub attributes will be 
included in the PubsubMessage.
+ * PCollection<PubsubMessage> messages = 
PubsubIO.readMessagesWithAttributes().fromTopic(topic);
+ *
+ * // Examples of reading different types from PubSub.
+ * PCollection<String> strings = PubsubIO.readStrings().fromTopic(topic);
+ * PCollection<MyProto> protos = 
PubsubIO.readProtos(MyProto.class).fromTopic(topic);
+ * PCollection<MyType> avros = 
PubsubIO.readAvros(MyType.class).fromTopic(topic);
+ *
+ * }</pre>
+ *
+ * <h3>Example PubsubIO write usage</h3>
+ *
+ * Data can be written to a single topic or to a dynamic set of topics. In 
order to write to a
+ * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For 
example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic));
+ * }</pre>
+ *
+ * Dynamic topic destinations can be accomplished by specifying a function to 
extract the topic from
+ * the record. For example:
+ *
+ * <pre>{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).
+ *      to((ValueInSingleWindow<Event> quote) -> {
+ *               String country = quote.getCountry();
+ *               return "projects/myproject/topics/events_" + country;
+ *              });
+ * }</pre>
+ *
+ * Dynamic topics can also be specified by writing {@link PubsubMessage} 
objects containing the
+ * topic. For example:
+ *
+ * <pre>{@code
+ * events.apply(MapElements.into(new TypeDescriptor<PubsubMessage>() {})
+ *                         .via(e -> new PubsubMessage(
+ *                             e.toByteString(), 
Collections.emptyMap()).withTopic(e.getCountry())))
+ * .apply(PubsubIO.writeMessagesDynamic());
+ * }</pre>
+ *
+ * <h3>Custom timestamps</h3>
+ *
+ * All messages read from PubSub have a stable publish timestamp that is 
independent of when the
+ * message is read from the PubSub topic. By default, the publish time is used 
as the timestamp for
+ * all messages read and the watermark is based on that. If there is a 
different logical timestamp
+ * to be used, that timestamp must be published in a PubSub attribute. The 
attribute is specified

Review Comment:
   done



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1350,23 +1385,15 @@ public void startBundle(StartBundleContext c) throws 
IOException {
       @ProcessElement
       public void processElement(ProcessContext c) throws IOException, 
SizeLimitExceededException {
         PubsubMessage message = getFormatFn().apply(c.element());
-        int messageSize = validateAndGetPubsubMessageSize(message);
-        if (messageSize > maxPublishBatchByteSize) {

Review Comment:
   sure, done



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java:
##########
@@ -242,7 +271,19 @@ private static class WriterFn extends DoFn<KV<Integer, 
Iterable<OutgoingMessage>
 
     /** BLOCKING Send {@code messages} as a batch to Pubsub. */
     private void publishBatch(List<OutgoingMessage> messages, int bytes) 
throws IOException {
-      int n = pubsubClient.publish(topic.get(), messages);
+      int n = 0;
+      if (topic != null) {
+        n = pubsubClient.publish(topic.get(), messages);
+      } else {
+        Map<TopicPath, List<OutgoingMessage>> messagesPerTopic = 
Maps.newHashMap();
+        for (OutgoingMessage message : messages) {
+          TopicPath topicPath = 
PubsubClient.topicPathFromPath(message.topic());

Review Comment:
   done



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java:
##########
@@ -495,6 +604,10 @@ static class PubsubSink extends 
PTransform<PCollection<byte[]>, PDone> {
       this.outer = outer;
     }
 
+    boolean isDynamic() {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to