bzablocki commented on code in PR #31476:
URL: https://github.com/apache/beam/pull/31476#discussion_r1638454863


##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.Destination;
+import com.solacesystems.jcsmp.JCSMPFactory;
+import com.solacesystems.jcsmp.Queue;
+import com.solacesystems.jcsmp.Topic;
+import java.io.IOException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.solace.broker.SempClientFactory;
+import org.apache.beam.sdk.io.solace.broker.SessionService;
+import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
+import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.io.solace.data.Solace.SolaceRecordMapper;
+import org.apache.beam.sdk.io.solace.data.SolaceRecordCoder;
+import org.apache.beam.sdk.io.solace.read.UnboundedSolaceSource;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} to read and write from/to <a 
href="https://solace.com/";>Solace</a> event
+ * broker.
+ *
+ * <p>Note: this API is beta and subject to change.
+ *
+ * <h2>Reading from Solace</h2>
+ *
+ * To read from Solace, use the {@link SolaceIO#read()} or {@link 
SolaceIO#read(TypeDescriptor,
+ * SerializableFunction, SerializableFunction)}.
+ *
+ * <h3>No-argument {@link SolaceIO#read()} top-level method</h3>
+ *
+ * <p>This method returns a PCollection of {@link Solace.Record} objects. It 
uses a default mapper
+ * ({@link SolaceRecordMapper#map(BytesXMLMessage)}) to map from the received 
{@link
+ * BytesXMLMessage} from Solace, to the {@link Solace.Record} objects.
+ *
+ * <p>By default, it also uses a {@link BytesXMLMessage#getSenderTimestamp()} 
for watermark
+ * estimation. This {@link SerializableFunction} can be overridden with {@link
+ * Read#withTimestampFn(SerializableFunction)} method.
+ *
+ * <p>When using this method, the Coders are inferred automatically.
+ *
+ * <h3>Advanced {@link SolaceIO#read(TypeDescriptor, SerializableFunction, 
SerializableFunction)}
+ * top-level method</h3>
+ *
+ * <p>With this method, the user can:
+ *
+ * <ul>
+ *   <li>specify a custom output type for the PTransform (for example their 
own class consisting
+ *       only of the relevant fields, optimized for their use-case), and
+ *   <li>create a custom mapping between {@link BytesXMLMessage} and their 
output type and
+ *   <li>specify what field to use for watermark estimation from their mapped 
field (for example, in
+ *       this method the user can use a field which is encoded in the payload 
as a timestamp, which
+ *       cannot be done with the {@link SolaceIO#read()} method.
+ * </ul>
+ *
+ * <h3>Reading from a queue ({@link Read#from(Solace.Queue)}} or a topic 
({@link
+ * Read#from(Solace.Topic)})</h3>
+ *
+ * <p>Regardless of the top-level read method choice, the user can specify 
whether to read from a
+ * Queue - {@link Read#from(Solace.Queue)}, or a Topic {@link 
Read#from(Solace.Topic)}.
+ *
+ * <p>Note: when a user specifies to read from a Topic, the connector will 
create a matching Queue
+ * and a Subscription. The user must ensure that the SEMP API is reachable 
from the driver program
+ * and must provide credentials that have `write` permission to the <a
+ * href="https://docs.solace.com/Admin/SEMP/Using-SEMP.htm";>SEMP Config 
API</a>. The created Queue
+ * will be non-exclusive. The Queue will not be deleted when the pipeline is 
terminated.
+ *
+ * <p>Note: If the user specifies to read from a Queue, <a
+ * 
href="https://beam.apache.org/documentation/programming-guide/#overview";>the 
driver program</a>
+ * will execute a call to the SEMP API to check if the Queue is `exclusive` or 
`non-exclusive`. The
+ * user must ensure that the SEMP API is reachable from the driver program and 
provide credentials
+ * with `read` permission to the {@link 
Read#withSempClientFactory(SempClientFactory)}.
+ *
+ * <h3>Usage example</h3>
+ *
+ * <h4>The no-arg {@link SolaceIO#read()} method</h4>
+ *
+ * <p>The minimal example - reading from an existing Queue, using the no-arg 
{@link SolaceIO#read()}
+ * method, with all the default configuration options.
+ *
+ * <pre>{@code
+ * PCollection<Solace.Record> events =
+ *   pipeline.apply(
+ *     SolaceIO.read()
+ *         .from(Queue.fromName("your-queue-name"))
+ *         .withSempClientFactory(
+ *                 BasicAuthSempClientFactory.builder()
+ *                         .host("your-host-name-with-protocol") // e.g. 
"http://12.34.56.78:8080";
+ *                         .username("semp-username")
+ *                         .password("semp-password")
+ *                         .vpnName("vpn-name")
+ *                         .build())
+ *         .withSessionServiceFactory(
+ *                 BasicAuthJcsmpSessionServiceFactory.builder()
+ *                         .host("your-host-name")
+ *                               // e.g. "12.34.56.78", or "[fe80::1]", or 
"12.34.56.78:4444"
+ *                         .username("username")
+ *                         .password("password")
+ *                         .vpnName("vpn-name")
+ *                         .build()));
+ * }</pre>
+ *
+ * <h4>The advanced {@link SolaceIO#read(TypeDescriptor, SerializableFunction,
+ * SerializableFunction)} method</h4>
+ *
+ * <p>When using this method you can specify a custom output PCollection type 
and a custom timestamp
+ * function.
+ *
+ * <pre>{@code
+ * @DefaultSchema(JavaBeanSchema.class)
+ * public static class SimpleRecord {
+ *    public String payload;
+ *    public String messageId;
+ *    public Instant timestamp;
+ *
+ *    public SimpleRecord() {}
+ *
+ *    public SimpleRecord(String payload, String messageId, Instant timestamp) 
{
+ *        this.payload = payload;
+ *        this.messageId = messageId;
+ *        this.timestamp = timestamp;
+ *    }
+ * }
+ *
+ * private static SimpleRecord toSimpleRecord(BytesXMLMessage record) {
+ *    if (record == null) {
+ *        return null;
+ *    }
+ *    return new SimpleRecord(
+ *            new String(record.getBytes(), StandardCharsets.UTF_8),
+ *            record.getApplicationMessageId(),
+ *            record.getSenderTimestamp() != null
+ *                    ? Instant.ofEpochMilli(record.getSenderTimestamp())
+ *                    : Instant.now());
+ * }
+ *
+ * PCollection<SimpleRecord> events =
+ *  pipeline.apply(
+ *      SolaceIO.read(
+ *                      TypeDescriptor.of(SimpleRecord.class),
+ *                      record -> toSimpleRecord(record),
+ *                      record -> record.timestamp)
+ *              .from(Topic.fromName("your-topic-name"))
+ *              .withSempClientFactory(...)
+ *              .withSessionServiceFactory(...);
+ *
+ *
+ * }</pre>
+ *
+ * <h3>Authentication</h3>
+ *
+ * <p>When reading from Solace, the user must use {@link
+ * Read#withSessionServiceFactory(SessionServiceFactory)} to create a JCSMP 
session and {@link
+ * Read#withSempClientFactory(SempClientFactory)} to authenticate to the SEMP 
API.
+ *
+ * <p>See {@link Read#withSessionServiceFactory(SessionServiceFactory)} for 
session authentication.
+ * The connector provides implementation of the {@link SessionServiceFactory} 
using the Basic
+ * Authentication: {@link 
org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService}.
+ *
+ * <p>For the authentication to the SEMP API ({@link 
Read#withSempClientFactory(SempClientFactory)})
+ * the connector provides {@link 
org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} to
+ * authenticate using the Basic Authentication.
+ */
+@Internal
+public class SolaceIO {
+
+  public static final SerializableFunction<Solace.Record, Instant> 
SENDER_TIMESTAMP_FUNCTION =
+      (record) -> {
+        Long senderTimestamp = record != null ? record.getSenderTimestamp() : 
null;
+        if (senderTimestamp != null) {
+          return Instant.ofEpochMilli(senderTimestamp);
+        } else {
+          return Instant.now();
+        }
+      };
+  private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false;
+
+  /** Get a {@link Topic} object from the topic name. */
+  static Topic topicFromName(String topicName) {
+    return JCSMPFactory.onlyInstance().createTopic(topicName);
+  }
+
+  /** Get a {@link Queue} object from the queue name. */
+  static Queue queueFromName(String queueName) {
+    return JCSMPFactory.onlyInstance().createQueue(queueName);
+  }
+
+  /**
+   * Convert to a JCSMP destination from a schema-enabled {@link
+   * org.apache.beam.sdk.io.solace.data.Solace.Destination}.
+   *
+   * <p>This method returns a {@link Destination}, which may be either a 
{@link Topic} or a {@link
+   * Queue}
+   */
+  public static Destination convertToJcsmpDestination(Solace.Destination 
destination) {
+    if (destination.getType().equals(Solace.DestinationType.TOPIC)) {
+      return topicFromName(checkNotNull(destination.getName()));
+    } else if (destination.getType().equals(Solace.DestinationType.QUEUE)) {
+      return queueFromName(checkNotNull(destination.getName()));
+    } else {
+      throw new IllegalArgumentException(
+          "SolaceIO.Write: Unknown destination type: " + 
destination.getType());
+    }
+  }
+
+  /**
+   * Create a {@link Read} transform, to read from Solace. The ingested 
records will be mapped to
+   * the {@link Solace.Record} objects.
+   */
+  public static Read<Solace.Record> read() {
+    return Read.<Solace.Record>builder()
+        .setTypeDescriptor(TypeDescriptor.of(Solace.Record.class))
+        .setParseFn(SolaceRecordMapper::map)
+        .setTimestampFn(SENDER_TIMESTAMP_FUNCTION)
+        .build();
+  }
+  /**
+   * Create a {@link Read} transform, to read from Solace. Specify a {@link 
SerializableFunction} to
+   * map incoming {@link BytesXMLMessage} records, to the object of your 
choice. You also need to
+   * specify a {@link TypeDescriptor} for your class and the timestamp 
function which returns an
+   * {@link Instant} from the record.
+   *
+   * <p>The type descriptor will be used to infer a coder from CoderRegistry 
or Schema Registry. You
+   * can initialize a new TypeDescriptor in the following manner:
+   *
+   * <pre>{@code
+   * TypeDescriptor<T> typeDescriptor = 
TypeDescriptor.of(YourOutputType.class);
+   * }</pre>
+   */
+  public static <T> Read<T> read(
+      TypeDescriptor<T> typeDescriptor,
+      SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn,
+      SerializableFunction<T, Instant> timestampFn) {
+    checkState(typeDescriptor != null, "SolaceIO.Read: typeDescriptor must not 
be null");
+    checkState(parseFn != null, "SolaceIO.Read: parseFn must not be null");
+    checkState(timestampFn != null, "SolaceIO.Read: timestampFn must not be 
null");
+    return Read.<T>builder()
+        .setTypeDescriptor(typeDescriptor)
+        .setParseFn(parseFn)
+        .setTimestampFn(timestampFn)
+        .build();
+  }
+
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, 
PCollection<T>> {
+    private static final Logger LOG = LoggerFactory.getLogger(Read.class);
+
+    /** Set the queue name to read from. Use this or the `from(Topic)` method. 
*/
+    public Read<T> from(Solace.Queue queue) {
+      return toBuilder().setQueue(queueFromName(queue.getName())).build();
+    }
+
+    /** Set the topic name to read from. Use this or the `from(Queue)` method. 
*/
+    public Read<T> from(Solace.Topic topic) {
+      return toBuilder().setTopic(topicFromName(topic.getName())).build();
+    }
+
+    /**
+     * The timestamp function, used for estimating the watermark, mapping the 
record T to an {@link
+     * Instant}
+     *
+     * <p>Optional when using the no-arg {@link SolaceIO#read()} method. 
Defaults to {@link
+     * SolaceIO#SENDER_TIMESTAMP_FUNCTION}. When using the {@link 
SolaceIO#read(TypeDescriptor,
+     * SerializableFunction, SerializableFunction)} method, the function 
mapping from T to {@link
+     * Instant} has to be passed as an argument.
+     */
+    public Read<T> withTimestampFn(SerializableFunction<T, Instant> 
timestampFn) {
+      checkState(
+          timestampFn != null,
+          "SolaceIO.Read: timestamp function must be set or use the"
+              + " `Read.readSolaceRecords()` method");
+      return toBuilder().setTimestampFn(timestampFn).build();
+    }
+
+    /**
+     * Optional. Sets the maximum number of connections to the broker. The 
actual number of sessions
+     * is determined by this and the number set by the runner. If not set, the 
number of sessions is
+     * determined by the runner. The number of connections created follows 
this logic:
+     * `numberOfConnections = min(maxNumConnections, desiredNumberOfSplits)`, 
where the
+     * `desiredNumberOfSplits` is set by the runner.
+     */
+    public Read<T> withMaxNumConnections(Integer maxNumConnections) {
+      return toBuilder().setMaxNumConnections(maxNumConnections).build();
+    }
+
+    /**
+     * Optional, default: false. Set to deduplicate messages based on the 
{@link
+     * BytesXMLMessage#getApplicationMessageId()} of the incoming {@link 
BytesXMLMessage}. If the
+     * field is null, then the {@link 
BytesXMLMessage#getReplicationGroupMessageId()} will be used,
+     * which is always set by Solace.
+     */
+    public Read<T> withDeduplicateRecords(boolean deduplicateRecords) {
+      return toBuilder().setDeduplicateRecords(deduplicateRecords).build();
+    }
+
+    /**
+     * Set a factory that creates a {@link 
org.apache.beam.sdk.io.solace.broker.SempClientFactory}.
+     *
+     * <p>The factory `create()` method is invoked in each instance of an 
{@link
+     * org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader}. Created 
{@link
+     * org.apache.beam.sdk.io.solace.broker.SempClient} has to communicate 
with broker management
+     * API. It must support operations such as:
+     *
+     * <ul>
+     *   <li>query for outstanding backlog bytes in a Queue,
+     *   <li>query for metadata such as access-type of a Queue,
+     *   <li>requesting creation of new Queues.
+     * </ul>
+     *
+     * <p>An existing implementation of the SempClientFactory includes {@link
+     * org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} which 
implements connection
+     * to the SEMP with the Basic Authentication method.
+     *
+     * <p>To use it, specify the credentials with the builder methods.
+     *
+     * <p>The format of the host is `[Protocol://]Host[:Port]`
+     *
+     * <pre>{@code
+     * .withSempClientFactory(
+     *         BasicAuthSempClientFactory.builder()
+     *               .host("your-host-name-with-protocol") // e.g. 
"http://12.34.56.78:8080";
+     *               .username("username")
+     *               .password("password")
+     *               .vpnName("vpn-name")
+     *               .build())
+     * }</pre>
+     */
+    public Read<T> withSempClientFactory(SempClientFactory sempClientFactory) {
+      checkState(sempClientFactory != null, "SolaceIO.Read: sempClientFactory 
must not be null.");
+      return toBuilder().setSempClientFactory(sempClientFactory).build();
+    }
+
+    /**
+     * Set a factory that creates a {@link SessionService}.
+     *
+     * <p>The factory `create()` method is invoked in each instance of an 
{@link
+     * org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader}. Created 
{@link SessionService} has
+     * to be able to:
+     *
+     * <ul>
+     *   <li>initialize a connection with the broker,
+     *   <li>check liveliness of the connection,
+     *   <li>close the connection,
+     *   <li>create a {@link 
org.apache.beam.sdk.io.solace.broker.MessageReceiver}.
+     * </ul>
+     *
+     * <p>An existing implementation of the SempClientFactory includes {@link
+     * org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService} 
which implements the Basic
+     * Authentication to Solace. *
+     *
+     * <p>To use it, specify the credentials with the builder methods. *
+     *
+     * <p>The host is the IPv4 or IPv6 or host name of the appliance. IPv5 
addresses must be encoded
+     * in brackets ([]). For example, "12.34.56.78", or "[fe80::1]". If 
connecting to a non-default
+     * port, it can be specified here using the "Host:Port" format. For 
example, "12.34.56.78:4444",
+     * or "[fe80::1]:4444".
+     *
+     * <pre>{@code
+     * BasicAuthJcsmpSessionServiceFactory.builder()
+     *     .host("your-host-name")
+     *           // e.g. "12.34.56.78", or "[fe80::1]", or "12.34.56.78:4444"
+     *     .username("semp-username")
+     *     .password("semp-password")
+     *     .vpnName("vpn-name")
+     *     .build()));
+     * }</pre>
+     */
+    public Read<T> withSessionServiceFactory(SessionServiceFactory 
sessionServiceFactory) {
+      checkState(
+          sessionServiceFactory != null, "SolaceIO.Read: sessionServiceFactory 
must not be null.");
+      return 
toBuilder().setSessionServiceFactory(sessionServiceFactory).build();
+    }
+
+    abstract @Nullable Queue getQueue();
+
+    abstract @Nullable Topic getTopic();
+
+    abstract @Nullable SerializableFunction<T, Instant> getTimestampFn();
+
+    abstract @Nullable Integer getMaxNumConnections();
+
+    abstract boolean getDeduplicateRecords();
+
+    abstract SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> 
getParseFn();
+
+    abstract @Nullable SempClientFactory getSempClientFactory();
+
+    abstract @Nullable SessionServiceFactory getSessionServiceFactory();
+
+    abstract TypeDescriptor<T> getTypeDescriptor();
+
+    public static <T> Builder<T> builder() {
+      Builder<T> builder = new 
org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read.Builder<T>();
+      builder.setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS);
+      return builder;
+    }
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder<T> {
+
+      abstract Builder<T> setQueue(Queue queue);
+
+      abstract Builder<T> setTopic(Topic topic);
+
+      abstract Builder<T> setTimestampFn(SerializableFunction<T, Instant> 
timestampFn);
+
+      abstract Builder<T> setMaxNumConnections(Integer maxNumConnections);
+
+      abstract Builder<T> setDeduplicateRecords(boolean deduplicateRecords);
+
+      abstract Builder<T> setParseFn(
+          SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> 
parseFn);
+
+      abstract Builder<T> setSempClientFactory(SempClientFactory 
brokerServiceFactory);
+
+      abstract Builder<T> setSessionServiceFactory(SessionServiceFactory 
sessionServiceFactory);
+
+      abstract Builder<T> setTypeDescriptor(TypeDescriptor<T> typeDescriptor);
+
+      abstract Read<T> build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      checkState(
+          (getQueue() == null ^ getTopic() == null),
+          "SolaceIO.Read: One of the Solace {Queue, Topic} must be set.");
+
+      SempClientFactory sempClientFactory =
+          checkNotNull(getSempClientFactory(), "SolaceIO: sempClientFactory is 
null.");
+      String jobName = input.getPipeline().getOptions().getJobName();
+      Queue queueFromOptions = getQueue();
+      Queue initializedQueue =
+          queueFromOptions != null
+              ? queueFromOptions
+              : initializeQueueForTopic(jobName, sempClientFactory);
+
+      SessionServiceFactory sessionServiceFactory =
+          checkNotNull(getSessionServiceFactory(), "SolaceIO: 
sessionServiceFactory is null.");
+      sessionServiceFactory.setQueue(initializedQueue);

Review Comment:
   Thanks for the suggestion!
   Here's what I tried, using the `getSempClientFactory()` as an example. 
   I moved the check to the `validate()` method
   ```
   public void validate(@Nullable PipelineOptions options) {
         checkNotNull(getSempClientFactory(), "SolaceIO: sempClientFactory is 
null.");
       }
   ```
   but I still need to do the extra validation in the expand() method, to 
satisfy the checkerframework checks.
   ```
   SempClientFactory sempClientFactory = checkNotNull(getSempClientFactory());
   ```
   Would that be ok, to duplicate the `checkNotNull` method call for each 
`@Nullable` method called in the `expand()`? 
   Otherwise, I'd need to annotate `initializeQueueForTopic()` method arguments 
as `@Nullable`, 'dragging' this annotation unnecessarily downstream.
   Unfortunately, I can't make the `getSempClientFactory()` return a NonNull 
value, it has to be Nullable, as I can't default to any sensible value.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to