reuvenlax commented on code in PR #26063: URL: https://github.com/apache/beam/pull/26063#discussion_r1170485518
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java: ########## @@ -357,26 +357,33 @@ public static TopicPath topicPathFromName(String projectId, String topicName) { public abstract static class OutgoingMessage implements Serializable { /** Underlying Message. May not have publish timestamp set. */ - public abstract PubsubMessage message(); + public abstract PubsubMessage getMessage(); /** Timestamp for element (ms since epoch). */ - public abstract long timestampMsSinceEpoch(); + public abstract long getTimestampMsSinceEpoch(); /** * If using an id attribute, the record id to associate with this record's metadata so the * receiver can reject duplicates. Otherwise {@literal null}. */ public abstract @Nullable String recordId(); + public abstract @Nullable String topic(); + public static OutgoingMessage of( - PubsubMessage message, long timestampMsSinceEpoch, @Nullable String recordId) { - return new AutoValue_PubsubClient_OutgoingMessage(message, timestampMsSinceEpoch, recordId); + PubsubMessage message, + long timestampMsSinceEpoch, + @Nullable String recordId, + String topic) { + return new AutoValue_PubsubClient_OutgoingMessage( + message, timestampMsSinceEpoch, recordId, topic); } public static OutgoingMessage of( org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage message, long timestampMsSinceEpoch, - @Nullable String recordId) { + @Nullable String recordId, + String topic) { Review Comment: done ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java: ########## @@ -103,6 +104,64 @@ * reviewers mentioned <a * href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS"> * here</a>. + * + * <h3>Example PubsubIO read usage</h3> + * + * <pre>{@code + * // Read from a specific topic; a subscription will be created at pipeline start time. + * PCollection<PubsubMessage> messages = PubsubIO.readMessages().fromTopic(topic); + * + * // Read from a subscription. + * PCollection<PubsubMessage> messages = PubsubIO.readMessages().fromSubscription(subscription); + * + * // Read messages including attributes. All PubSub attributes will be included in the PubsubMessage. + * PCollection<PubsubMessage> messages = PubsubIO.readMessagesWithAttributes().fromTopic(topic); + * + * // Examples of reading different types from PubSub. + * PCollection<String> strings = PubsubIO.readStrings().fromTopic(topic); + * PCollection<MyProto> protos = PubsubIO.readProtos(MyProto.class).fromTopic(topic); + * PCollection<MyType> avros = PubsubIO.readAvros(MyType.class).fromTopic(topic); + * + * }</pre> + * + * <h3>Example PubsubIO write usage</h3> + * + * Data can be written to a single topic or to a dynamic set of topics. In order to write to a + * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For example: + * + * <pre>{@code + * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic)); Review Comment: done ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java: ########## @@ -103,6 +104,64 @@ * reviewers mentioned <a * href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS"> * here</a>. + * + * <h3>Example PubsubIO read usage</h3> + * + * <pre>{@code + * // Read from a specific topic; a subscription will be created at pipeline start time. + * PCollection<PubsubMessage> messages = PubsubIO.readMessages().fromTopic(topic); + * + * // Read from a subscription. + * PCollection<PubsubMessage> messages = PubsubIO.readMessages().fromSubscription(subscription); + * + * // Read messages including attributes. All PubSub attributes will be included in the PubsubMessage. + * PCollection<PubsubMessage> messages = PubsubIO.readMessagesWithAttributes().fromTopic(topic); + * + * // Examples of reading different types from PubSub. + * PCollection<String> strings = PubsubIO.readStrings().fromTopic(topic); + * PCollection<MyProto> protos = PubsubIO.readProtos(MyProto.class).fromTopic(topic); + * PCollection<MyType> avros = PubsubIO.readAvros(MyType.class).fromTopic(topic); + * + * }</pre> + * + * <h3>Example PubsubIO write usage</h3> + * + * Data can be written to a single topic or to a dynamic set of topics. In order to write to a + * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For example: + * + * <pre>{@code + * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic)); + * }</pre> + * + * Dynamic topic destinations can be accomplished by specifying a function to extract the topic from Review Comment: done ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java: ########## @@ -103,6 +104,64 @@ * reviewers mentioned <a * href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS"> * here</a>. + * + * <h3>Example PubsubIO read usage</h3> + * + * <pre>{@code + * // Read from a specific topic; a subscription will be created at pipeline start time. + * PCollection<PubsubMessage> messages = PubsubIO.readMessages().fromTopic(topic); + * + * // Read from a subscription. + * PCollection<PubsubMessage> messages = PubsubIO.readMessages().fromSubscription(subscription); + * + * // Read messages including attributes. All PubSub attributes will be included in the PubsubMessage. + * PCollection<PubsubMessage> messages = PubsubIO.readMessagesWithAttributes().fromTopic(topic); + * + * // Examples of reading different types from PubSub. + * PCollection<String> strings = PubsubIO.readStrings().fromTopic(topic); + * PCollection<MyProto> protos = PubsubIO.readProtos(MyProto.class).fromTopic(topic); + * PCollection<MyType> avros = PubsubIO.readAvros(MyType.class).fromTopic(topic); + * + * }</pre> + * + * <h3>Example PubsubIO write usage</h3> + * + * Data can be written to a single topic or to a dynamic set of topics. In order to write to a + * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For example: + * + * <pre>{@code + * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic)); + * }</pre> + * + * Dynamic topic destinations can be accomplished by specifying a function to extract the topic from + * the record. For example: + * + * <pre>{@code + * avros.apply(PubsubIO.writeAvros(MyType.class). + * to((ValueInSingleWindow<Event> quote) -> { + * String country = quote.getCountry(); + * return "projects/myproject/topics/events_" + country; + * }); + * }</pre> + * + * Dynamic topics can also be specified by writing {@link PubsubMessage} objects containing the + * topic. For example: + * + * <pre>{@code + * events.apply(MapElements.into(new TypeDescriptor<PubsubMessage>() {}) + * .via(e -> new PubsubMessage( + * e.toByteString(), Collections.emptyMap()).withTopic(e.getCountry()))) + * .apply(PubsubIO.writeMessagesDynamic()); + * }</pre> + * + * <h3>Custom timestamps</h3> + * + * All messages read from PubSub have a stable publish timestamp that is independent of when the + * message is read from the PubSub topic. By default, the publish time is used as the timestamp for + * all messages read and the watermark is based on that. If there is a different logical timestamp + * to be used, that timestamp must be published in a PubSub attribute. The attribute is specified Review Comment: done ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java: ########## @@ -1350,23 +1385,15 @@ public void startBundle(StartBundleContext c) throws IOException { @ProcessElement public void processElement(ProcessContext c) throws IOException, SizeLimitExceededException { PubsubMessage message = getFormatFn().apply(c.element()); - int messageSize = validateAndGetPubsubMessageSize(message); - if (messageSize > maxPublishBatchByteSize) { Review Comment: sure, done ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java: ########## @@ -242,7 +271,19 @@ private static class WriterFn extends DoFn<KV<Integer, Iterable<OutgoingMessage> /** BLOCKING Send {@code messages} as a batch to Pubsub. */ private void publishBatch(List<OutgoingMessage> messages, int bytes) throws IOException { - int n = pubsubClient.publish(topic.get(), messages); + int n = 0; + if (topic != null) { + n = pubsubClient.publish(topic.get(), messages); + } else { + Map<TopicPath, List<OutgoingMessage>> messagesPerTopic = Maps.newHashMap(); + for (OutgoingMessage message : messages) { + TopicPath topicPath = PubsubClient.topicPathFromPath(message.topic()); Review Comment: done ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java: ########## @@ -495,6 +604,10 @@ static class PubsubSink extends PTransform<PCollection<byte[]>, PDone> { this.outer = outer; } + boolean isDynamic() { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org