[BEAM-716] Use AutoValue in JmsIO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/caf1c720 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/caf1c720 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/caf1c720 Branch: refs/heads/gearpump-runner Commit: caf1c720f66de4d502f79b6c11c64b49c53329b0 Parents: 1c9bf8d Author: Jean-Baptiste Onofré <jbono...@apache.org> Authored: Sun Dec 11 07:43:41 2016 +0100 Committer: Jean-Baptiste Onofré <jbono...@apache.org> Committed: Mon Dec 19 07:24:00 2016 +0100 ---------------------------------------------------------------------- sdks/java/io/jms/pom.xml | 7 + .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 321 +++++++++++++------ 2 files changed, 228 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/caf1c720/sdks/java/io/jms/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/pom.xml b/sdks/java/io/jms/pom.xml index bca0152..b88254e 100644 --- a/sdks/java/io/jms/pom.xml +++ b/sdks/java/io/jms/pom.xml @@ -81,6 +81,13 @@ <artifactId>jsr305</artifactId> </dependency> + <!-- compile dependencies --> + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + <!-- test dependencies --> <dependency> <groupId>org.apache.activemq</groupId> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/caf1c720/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 24fa67d..76dee67 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.jms; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; @@ -101,37 +102,148 @@ public class JmsIO { private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class); public static Read read() { - return new Read(null, null, null, Long.MAX_VALUE, null); + return new AutoValue_JmsIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build(); } public static Write write() { - return new Write(null, null, null); + return new AutoValue_JmsIO_Write.Builder().build(); } /** * A {@link PTransform} to read from a JMS destination. See {@link JmsIO} for more * information on usage and configuration. */ - public static class Read extends PTransform<PBegin, PCollection<JmsRecord>> { + @AutoValue + public abstract static class Read extends PTransform<PBegin, PCollection<JmsRecord>> { + /** + * NB: According to http://docs.oracle.com/javaee/1.4/api/javax/jms/ConnectionFactory.html + * "It is expected that JMS providers will provide the tools an administrator needs to create + * and configure administered objects in a JNDI namespace. JMS provider implementations of + * administered objects should be both javax.jndi.Referenceable and java.io.Serializable so + * that they can be stored in all JNDI naming contexts. In addition, it is recommended that + * these implementations follow the JavaBeansTM design patterns." + * + * <p>So, a {@link ConnectionFactory} implementation is serializable. + */ + @Nullable abstract ConnectionFactory getConnectionFactory(); + @Nullable abstract String getQueue(); + @Nullable abstract String getTopic(); + abstract long getMaxNumRecords(); + @Nullable abstract Duration getMaxReadTime(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setConnectionFactory(ConnectionFactory connectionFactory); + abstract Builder setQueue(String queue); + abstract Builder setTopic(String topic); + abstract Builder setMaxNumRecords(long maxNumRecords); + abstract Builder setMaxReadTime(Duration maxReadTime); + abstract Read build(); + } + + /** + * <p>Specify the JMS connection factory to connect to the JMS broker. + * + * <p>For instance: + * + * <pre> + * {@code + * pipeline.apply(JmsIO.read().withConnectionFactory(myConnectionFactory) + * } + * </pre> + * + * @param connectionFactory The JMS {@link ConnectionFactory}. + * @return The corresponding {@link JmsIO.Read}. + */ public Read withConnectionFactory(ConnectionFactory connectionFactory) { - return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime); + return builder().setConnectionFactory(connectionFactory).build(); } + /** + * <p>Specify the JMS queue destination name where to read messages from. The + * {@link JmsIO.Read} acts as a consumer on the queue. + * + * <p>This method is exclusive with {@link JmsIO.Read#withTopic(String)}. The user has to + * specify a destination: queue or topic. + * + * <p>For instance: + * + * <pre> + * {@code + * pipeline.apply(JmsIO.read().withQueue("my-queue") + * } + * </pre> + * + * @param queue The JMS queue name where to read messages from. + * @return The corresponding {@link JmsIO.Read}. + */ public Read withQueue(String queue) { - return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime); + return builder().setQueue(queue).build(); } + /** + * <p>Specify the JMS topic destination name where to receive messages from. The + * {@link JmsIO.Read} acts as a subscriber on the topic. + * + * <p>This method is exclusive with {@link JmsIO.Read#withQueue(String)}. The user has to + * specify a destination: queue or topic. + * + * <p>For instance: + * + * <pre> + * {@code + * pipeline.apply(JmsIO.read().withTopic("my-topic") + * } + * </pre> + * + * @param topic The JMS topic name. + * @return The corresponding {@link JmsIO.Read}. + */ public Read withTopic(String topic) { - return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime); + return builder().setTopic(topic).build(); } + /** + * <p>Define the max number of records that the source will read. Using a max number of records + * different from {@code Long.MAX_VALUE} means the source will be {@code Bounded}, and will + * stop once the max number of records read is reached. + * + * <p>For instance: + * + * <pre> + * {@code + * pipeline.apply(JmsIO.read().withNumRecords(1000) + * } + * </pre> + * + * @param maxNumRecords The max number of records to read from the JMS destination. + * @return The corresponding {@link JmsIO.Read}. + */ public Read withMaxNumRecords(long maxNumRecords) { - return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime); + return builder().setMaxNumRecords(maxNumRecords).build(); } + /** + * <p>Define the max read time that the source will read. Using a non null max read time + * duration means the source will be {@code Bounded}, and will stop once the max read time is + * reached. + * + * <p>For instance: + * + * <pre> + * {@code + * pipeline.apply(JmsIO.read().withMaxReadTime(Duration.minutes(10)) + * } + * </pre> + * + * @param maxReadTime The max read time duration. + * @return The corresponding {@link JmsIO.Read}. + */ public Read withMaxReadTime(Duration maxReadTime) { - return new Read(connectionFactory, queue, topic, maxNumRecords, maxReadTime); + return builder().setMaxReadTime(maxReadTime).build(); } @Override @@ -141,10 +253,10 @@ public class JmsIO { PTransform<PBegin, PCollection<JmsRecord>> transform = unbounded; - if (maxNumRecords != Long.MAX_VALUE) { - transform = unbounded.withMaxNumRecords(maxNumRecords); - } else if (maxReadTime != null) { - transform = unbounded.withMaxReadTime(maxReadTime); + if (getMaxNumRecords() != Long.MAX_VALUE) { + transform = unbounded.withMaxNumRecords(getMaxNumRecords()); + } else if (getMaxReadTime() != null) { + transform = unbounded.withMaxReadTime(getMaxReadTime()); } return input.getPipeline().apply(transform); @@ -152,65 +264,29 @@ public class JmsIO { @Override public void validate(PBegin input) { - checkNotNull(connectionFactory, "ConnectionFactory not specified"); - checkArgument((queue != null || topic != null), "Either queue or topic not specified"); + checkNotNull(getConnectionFactory(), "ConnectionFactory not specified"); + checkArgument((getQueue() != null || getTopic() != null), "Either queue or topic not " + + "specified"); } @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - - builder.addIfNotNull(DisplayData.item("queue", queue)); - builder.addIfNotNull(DisplayData.item("topic", topic)); + builder.addIfNotNull(DisplayData.item("queue", getQueue())); + builder.addIfNotNull(DisplayData.item("topic", getTopic())); } /////////////////////////////////////////////////////////////////////////////////////// /** - * NB: According to http://docs.oracle.com/javaee/1.4/api/javax/jms/ConnectionFactory.html - * "It is expected that JMS providers will provide the tools an administrator needs to create - * and configure administered objects in a JNDI namespace. JMS provider implementations of - * administered objects should be both javax.jndi.Referenceable and java.io.Serializable so - * that they can be stored in all JNDI naming contexts. In addition, it is recommended that - * these implementations follow the JavaBeansTM design patterns." - * - * <p>So, a {@link ConnectionFactory} implementation is serializable. - */ - protected ConnectionFactory connectionFactory; - @Nullable - protected String queue; - @Nullable - protected String topic; - protected long maxNumRecords; - protected Duration maxReadTime; - - private Read( - ConnectionFactory connectionFactory, - String queue, - String topic, - long maxNumRecords, - Duration maxReadTime) { - super("JmsIO.Read"); - - this.connectionFactory = connectionFactory; - this.queue = queue; - this.topic = topic; - this.maxNumRecords = maxNumRecords; - this.maxReadTime = maxReadTime; - } - - /** * Creates an {@link UnboundedSource UnboundedSource<JmsRecord, ?>} with the configuration * in {@link Read}. Primary use case is unit tests, should not be used in an * application. */ @VisibleForTesting UnboundedSource<JmsRecord, JmsCheckpointMark> createSource() { - return new UnboundedJmsSource( - connectionFactory, - queue, - topic); + return new UnboundedJmsSource(this); } } @@ -219,17 +295,10 @@ public class JmsIO { private static class UnboundedJmsSource extends UnboundedSource<JmsRecord, JmsCheckpointMark> { - private final ConnectionFactory connectionFactory; - private final String queue; - private final String topic; + private final Read spec; - public UnboundedJmsSource( - ConnectionFactory connectionFactory, - String queue, - String topic) { - this.connectionFactory = connectionFactory; - this.queue = queue; - this.topic = topic; + public UnboundedJmsSource(Read spec) { + this.spec = spec; } @Override @@ -237,7 +306,7 @@ public class JmsIO { int desiredNumSplits, PipelineOptions options) throws Exception { List<UnboundedJmsSource> sources = new ArrayList<>(); for (int i = 0; i < desiredNumSplits; i++) { - sources.add(new UnboundedJmsSource(connectionFactory, queue, topic)); + sources.add(new UnboundedJmsSource(spec)); } return sources; } @@ -250,8 +319,7 @@ public class JmsIO { @Override public void validate() { - checkNotNull(connectionFactory, "ConnectionFactory is not defined"); - checkArgument((queue != null || topic != null), "Either queue or topic is not defined"); + spec.validate(null); } @Override @@ -291,15 +359,17 @@ public class JmsIO { @Override public boolean start() throws IOException { - ConnectionFactory connectionFactory = source.connectionFactory; + ConnectionFactory connectionFactory = source.spec.getConnectionFactory(); try { this.connection = connectionFactory.createConnection(); this.connection.start(); this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - if (source.topic != null) { - this.consumer = this.session.createConsumer(this.session.createTopic(source.topic)); + if (source.spec.getTopic() != null) { + this.consumer = + this.session.createConsumer(this.session.createTopic(source.spec.getTopic())); } else { - this.consumer = this.session.createConsumer(this.session.createQueue(source.queue)); + this.consumer = + this.session.createConsumer(this.session.createQueue(source.spec.getQueue())); } return advance(); @@ -409,70 +479,122 @@ public class JmsIO { * A {@link PTransform} to write to a JMS queue. See {@link JmsIO} for * more information on usage and configuration. */ - public static class Write extends PTransform<PCollection<String>, PDone> { + @AutoValue + public abstract static class Write extends PTransform<PCollection<String>, PDone> { - protected ConnectionFactory connectionFactory; - protected String queue; - protected String topic; + @Nullable abstract ConnectionFactory getConnectionFactory(); + @Nullable abstract String getQueue(); + @Nullable abstract String getTopic(); + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setConnectionFactory(ConnectionFactory connectionFactory); + abstract Builder setQueue(String queue); + abstract Builder setTopic(String topic); + abstract Write build(); + } + + /** + * <p>Specify the JMS connection factory to connect to the JMS broker. + * + * <p>For instance: + * + * <pre> + * {@code + * .apply(JmsIO.write().withConnectionFactory(myConnectionFactory) + * } + * </pre> + * + * @param connectionFactory The JMS {@link ConnectionFactory}. + * @return The corresponding {@link JmsIO.Read}. + */ public Write withConnectionFactory(ConnectionFactory connectionFactory) { - return new Write(connectionFactory, queue, topic); + return builder().setConnectionFactory(connectionFactory).build(); } + /** + * <p>Specify the JMS queue destination name where to send messages to. The + * {@link JmsIO.Write} acts as a producer on the queue. + * + * <p>This method is exclusive with {@link JmsIO.Write#withTopic(String)}. The user has to + * specify a destination: queue or topic. + * + * <p>For instance: + * + * <pre> + * {@code + * .apply(JmsIO.write().withQueue("my-queue") + * } + * </pre> + * + * @param queue The JMS queue name where to send messages to. + * @return The corresponding {@link JmsIO.Read}. + */ public Write withQueue(String queue) { - return new Write(connectionFactory, queue, topic); + return builder().setQueue(queue).build(); } + /** + * <p>Specify the JMS topic destination name where to send messages to. The + * {@link JmsIO.Read} acts as a publisher on the topic. + * + * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String)}. The user has to + * specify a destination: queue or topic. + * + * <p>For instance: + * + * <pre> + * {@code + * .apply(JmsIO.write().withTopic("my-topic") + * } + * </pre> + * + * @param topic The JMS topic name. + * @return The corresponding {@link JmsIO.Read}. + */ public Write withTopic(String topic) { - return new Write(connectionFactory, queue, topic); - } - - private Write(ConnectionFactory connectionFactory, String queue, String topic) { - this.connectionFactory = connectionFactory; - this.queue = queue; - this.topic = topic; + return builder().setTopic(topic).build(); } @Override public PDone expand(PCollection<String> input) { - input.apply(ParDo.of(new JmsWriter(connectionFactory, queue, topic))); + input.apply(ParDo.of(new WriterFn(this))); return PDone.in(input.getPipeline()); } @Override public void validate(PCollection<String> input) { - checkNotNull(connectionFactory, "ConnectionFactory is not defined"); - checkArgument((queue != null || topic != null), "Either queue or topic is required"); + checkNotNull(getConnectionFactory(), "ConnectionFactory is not defined"); + checkArgument((getQueue() != null || getTopic() != null), "Either queue or topic is " + + "required"); } - private static class JmsWriter extends DoFn<String, Void> { + private static class WriterFn extends DoFn<String, Void> { - private ConnectionFactory connectionFactory; - private String queue; - private String topic; + private Write spec; private Connection connection; private Session session; private MessageProducer producer; - public JmsWriter(ConnectionFactory connectionFactory, String queue, String topic) { - this.connectionFactory = connectionFactory; - this.queue = queue; - this.topic = topic; + public WriterFn(Write spec) { + this.spec = spec; } @StartBundle public void startBundle(Context c) throws Exception { if (producer == null) { - this.connection = connectionFactory.createConnection(); + this.connection = spec.getConnectionFactory().createConnection(); this.connection.start(); // false means we don't use JMS transaction. this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination; - if (queue != null) { - destination = session.createQueue(queue); + if (spec.getQueue() != null) { + destination = session.createQueue(spec.getQueue()); } else { - destination = session.createTopic(topic); + destination = session.createTopic(spec.getTopic()); } this.producer = this.session.createProducer(destination); } @@ -481,7 +603,6 @@ public class JmsIO { @ProcessElement public void processElement(ProcessContext ctx) throws Exception { String value = ctx.element(); - try { TextMessage message = session.createTextMessage(value); producer.send(message);