Converts PubsubIO.Write to AutoValue
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/df6ef969 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/df6ef969 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/df6ef969 Branch: refs/heads/master Commit: df6ef969d6df5c42d091cc00997b0ed7680315fb Parents: 9e81548 Author: Eugene Kirpichov <[email protected]> Authored: Thu Apr 20 17:34:11 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Sat Apr 29 13:15:48 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 166 +++++++------------ 1 file changed, 61 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/df6ef969/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 69a5bd6..5702af1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -461,8 +461,9 @@ public class PubsubIO { return new AutoValue_PubsubIO_Read.Builder<T>().build(); } + /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */ public static <T> Write<T> write() { - return new Write<>(); + return new AutoValue_PubsubIO_Write.Builder<T>().build(); } /** Implementation of {@link #read}. */ @@ -696,43 +697,47 @@ public class PubsubIO { private PubsubIO() {} - /** - * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings} - * to a Cloud Pub/Sub stream. - */ - public static class Write<T> extends PTransform<PCollection<T>, PDone> { - - /** The Cloud Pub/Sub topic to publish to. */ + /** Implementation of {@link #write}. */ + @AutoValue + public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> { @Nullable - private final ValueProvider<PubsubTopic> topic; + abstract ValueProvider<PubsubTopic> getTopicProvider(); + /** The name of the message attribute to publish message timestamps in. */ @Nullable - private final String timestampLabel; + abstract String getTimestampLabel(); + /** The name of the message attribute to publish unique message IDs in. */ @Nullable - private final String idLabel; + abstract String getIdLabel(); + /** The input type Coder. */ - private final Coder<T> coder; + @Nullable + abstract Coder<T> getCoder(); + /** The format function for input PubsubMessage objects. */ - SimpleFunction<T, PubsubMessage> formatFn; + @Nullable + abstract SimpleFunction<T, PubsubMessage> getFormatFn(); - private Write() { - this(null, null, null, null, null, null); - } + abstract Builder<T> toBuilder(); - private Write( - String name, ValueProvider<PubsubTopic> topic, String timestampLabel, - String idLabel, Coder<T> coder, SimpleFunction<T, PubsubMessage> formatFn) { - super(name); - this.topic = topic; - this.timestampLabel = timestampLabel; - this.idLabel = idLabel; - this.coder = coder; - this.formatFn = formatFn; + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> topicProvider); + + abstract Builder<T> setTimestampLabel(String timestampLabel); + + abstract Builder<T> setIdLabel(String idLabel); + + abstract Builder<T> setCoder(Coder<T> coder); + + abstract Builder<T> setFormatFn(SimpleFunction<T, PubsubMessage> formatFn); + + abstract Write<T> build(); } /** - * Creates a transform that publishes to the specified topic. + * Publishes to the specified topic. * * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the * {@code topic} string. @@ -745,14 +750,15 @@ public class PubsubIO { * Like {@code topic()} but with a {@link ValueProvider}. */ public Write<T> topic(ValueProvider<String> topic) { - return new Write<>(name, NestedValueProvider.of(topic, new TopicTranslator()), - timestampLabel, idLabel, coder, formatFn); + return toBuilder() + .setTopicProvider(NestedValueProvider.of(topic, new TopicTranslator())) + .build(); } /** - * Creates a transform that writes to Pub/Sub, adds each record's timestamp to the published - * messages in an attribute with the specified name. The value of the attribute will be a number - * representing the number of milliseconds since the Unix epoch. For example, if using the Joda + * Writes to Pub/Sub and adds each record's timestamp to the published messages in an attribute + * with the specified name. The value of the attribute will be a number representing the number + * of milliseconds since the Unix epoch. For example, if using the Joda * time classes, {@link Instant#Instant(long)} can be used to parse this value. * * <p>If the output from this sink is being read by another Beam pipeline, then @@ -760,32 +766,27 @@ public class PubsubIO { * these timestamps from the appropriate attribute. */ public Write<T> timestampLabel(String timestampLabel) { - return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn); + return toBuilder().setTimestampLabel(timestampLabel).build(); } /** - * Creates a transform that writes to Pub/Sub, adding each record's unique identifier to the - * published messages in an attribute with the specified name. The value of the attribute is an - * opaque string. + * Writes to Pub/Sub, adding each record's unique identifier to the published messages in an + * attribute with the specified name. The value of the attribute is an opaque string. * * <p>If the the output from this sink is being read by another Beam pipeline, then * {@link PubsubIO.Read#withIdLabel(String)} can be used to ensure that* the other source reads * these unique identifiers from the appropriate attribute. */ public Write<T> idLabel(String idLabel) { - return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn); + return toBuilder().setIdLabel(idLabel).build(); } /** - * Returns a new transform that's like this one - * but that uses the given {@link Coder} to encode each of - * the elements of the input {@link PCollection} into an - * output record. - * - * <p>Does not modify this object. + * Uses the given {@link Coder} to encode each of the elements of the input {@link PCollection} + * into an output record. */ public Write<T> withCoder(Coder<T> coder) { - return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn); + return toBuilder().setCoder(coder).build(); } /** @@ -794,12 +795,12 @@ public class PubsubIO { * to separately set the PubSub message's payload and attributes. */ public Write<T> withAttributes(SimpleFunction<T, PubsubMessage> formatFn) { - return new Write<T>(name, topic, timestampLabel, idLabel, coder, formatFn); + return toBuilder().setFormatFn(formatFn).build(); } @Override public PDone expand(PCollection<T> input) { - if (topic == null) { + if (getTopicProvider() == null) { throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform"); } switch (input.isBounded()) { @@ -809,11 +810,11 @@ public class PubsubIO { case UNBOUNDED: return input.apply(new PubsubUnboundedSink<T>( FACTORY, - NestedValueProvider.of(topic, new TopicPathTranslator()), - coder, - timestampLabel, - idLabel, - formatFn, + NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()), + getCoder(), + getTimestampLabel(), + getIdLabel(), + getFormatFn(), 100 /* numShards */)); } throw new RuntimeException(); // cases are exhaustive. @@ -822,7 +823,7 @@ public class PubsubIO { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - populateCommonDisplayData(builder, timestampLabel, idLabel, topic); + populateCommonDisplayData(builder, getTimestampLabel(), getIdLabel(), getTopicProvider()); } @Override @@ -831,54 +832,6 @@ public class PubsubIO { } /** - * Returns the PubSub topic being written to. - */ - @Nullable - public PubsubTopic getTopic() { - return (topic == null) ? null : topic.get(); - } - - /** - * Returns the {@link ValueProvider} for the topic being written to. - */ - @Nullable - public ValueProvider<PubsubTopic> getTopicProvider() { - return topic; - } - - /** - * Returns the timestamp label. - */ - @Nullable - public String getTimestampLabel() { - return timestampLabel; - } - - /** - * Returns the id label. - */ - @Nullable - public String getIdLabel() { - return idLabel; - } - - /** - * Returns the output coder. - */ - @Nullable - public Coder<T> getCoder() { - return coder; - } - - /** - * Returns the formatting function used if publishing attributes. - */ - @Nullable - public SimpleFunction<T, PubsubMessage> getFormatFn() { - return formatFn; - } - - /** * Writer to Pubsub which batches messages from bounded collections. * * <p>Public so can be suppressed by runners. @@ -894,7 +847,7 @@ public class PubsubIO { this.output = new ArrayList<>(); // NOTE: idLabel is ignored. this.pubsubClient = - FACTORY.newClient(timestampLabel, null, + FACTORY.newClient(getTimestampLabel(), null, c.getPipelineOptions().as(PubsubOptions.class)); } @@ -902,8 +855,8 @@ public class PubsubIO { public void processElement(ProcessContext c) throws IOException { byte[] payload = null; Map<String, String> attributes = null; - if (formatFn != null) { - PubsubMessage message = formatFn.apply(c.element()); + if (getFormatFn() != null) { + PubsubMessage message = getFormatFn().apply(c.element()); payload = message.getMessage(); attributes = message.getAttributeMap(); } else { @@ -930,9 +883,12 @@ public class PubsubIO { } private void publish() throws IOException { - int n = pubsubClient.publish( - PubsubClient.topicPathFromName(getTopic().project, getTopic().topic), - output); + PubsubTopic topic = getTopicProvider().get(); + int n = + pubsubClient.publish( + PubsubClient.topicPathFromName( + topic.project, topic.topic), + output); checkState(n == output.size()); output.clear(); }
