This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b8ddda10a0 Solace Read connector: adding Basic Authentication support 
(#31541)
3b8ddda10a0 is described below

commit 3b8ddda10a01ff640ed4cf1ce746d0c19e003180
Author: Bartosz Zablocki <bzablo...@google.com>
AuthorDate: Mon Jul 1 23:24:57 2024 +0200

    Solace Read connector: adding Basic Authentication support (#31541)
    
    * Add support for BasicAuth to Solace
    
    * Address PR comments
    
    * Use `checkStateNotNull`
---
 .../broker/BasicAuthJcsmpSessionService.java       | 148 +++++++++++++++++++++
 .../BasicAuthJcsmpSessionServiceFactory.java       |  74 +++++++++++
 .../io/solace/broker/SolaceMessageReceiver.java    |  72 ++++++++++
 3 files changed, 294 insertions(+)

diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
new file mode 100644
index 00000000000..7863dbd129c
--- /dev/null
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.solacesystems.jcsmp.ConsumerFlowProperties;
+import com.solacesystems.jcsmp.EndpointProperties;
+import com.solacesystems.jcsmp.FlowReceiver;
+import com.solacesystems.jcsmp.InvalidPropertiesException;
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.JCSMPFactory;
+import com.solacesystems.jcsmp.JCSMPProperties;
+import com.solacesystems.jcsmp.JCSMPSession;
+import com.solacesystems.jcsmp.Queue;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.solace.RetryCallableManager;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * A class that manages a connection to a Solace broker using basic 
authentication.
+ *
+ * <p>This class provides a way to connect to a Solace broker and receive 
messages from a queue. The
+ * connection is established using basic authentication.
+ */
+public class BasicAuthJcsmpSessionService implements SessionService {
+  private final String queueName;
+  private final String host;
+  private final String username;
+  private final String password;
+  private final String vpnName;
+  @Nullable private JCSMPSession jcsmpSession;
+  private final RetryCallableManager retryCallableManager = 
RetryCallableManager.create();
+
+  /**
+   * Creates a new {@link BasicAuthJcsmpSessionService} with the given 
parameters.
+   *
+   * @param queueName The name of the queue to receive messages from.
+   * @param host The host name or IP address of the Solace broker. Format: 
Host[:Port]
+   * @param username The username to use for authentication.
+   * @param password The password to use for authentication.
+   * @param vpnName The name of the VPN to connect to.
+   */
+  public BasicAuthJcsmpSessionService(
+      String queueName, String host, String username, String password, String 
vpnName) {
+    this.queueName = queueName;
+    this.host = host;
+    this.username = username;
+    this.password = password;
+    this.vpnName = vpnName;
+  }
+
+  @Override
+  public void connect() {
+    retryCallableManager.retryCallable(this::connectSession, 
ImmutableSet.of(JCSMPException.class));
+  }
+
+  @Override
+  public void close() {
+    if (isClosed()) {
+      return;
+    }
+    retryCallableManager.retryCallable(
+        () -> {
+          checkStateNotNull(jcsmpSession).closeSession();
+          return 0;
+        },
+        ImmutableSet.of(IOException.class));
+  }
+
+  @Override
+  public MessageReceiver createReceiver() {
+    return retryCallableManager.retryCallable(
+        this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
+  }
+
+  @Override
+  public boolean isClosed() {
+    return jcsmpSession == null || jcsmpSession.isClosed();
+  }
+
+  private MessageReceiver createFlowReceiver() throws JCSMPException, 
IOException {
+    if (isClosed()) {
+      connectSession();
+    }
+
+    Queue queue = JCSMPFactory.onlyInstance().createQueue(queueName);
+
+    ConsumerFlowProperties flowProperties = new ConsumerFlowProperties();
+    flowProperties.setEndpoint(queue);
+    flowProperties.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);
+
+    EndpointProperties endpointProperties = new EndpointProperties();
+    
endpointProperties.setAccessType(EndpointProperties.ACCESSTYPE_NONEXCLUSIVE);
+    if (jcsmpSession != null) {
+      return new SolaceMessageReceiver(
+          createFlowReceiver(jcsmpSession, flowProperties, 
endpointProperties));
+    }
+    throw new IOException(
+        "SolaceIO.Read: Could not create a receiver from the Jcsmp session: 
session object is null.");
+  }
+
+  // The `@SuppressWarning` is needed here, because the checkerframework 
reports an error for the
+  // first argument of the `createFlow` being null, even though the 
documentation allows it:
+  // 
https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPSession.html#createFlow-com.solacesystems.jcsmp.XMLMessageListener-com.solacesystems.jcsmp.ConsumerFlowProperties-com.solacesystems.jcsmp.EndpointProperties-
+  @SuppressWarnings("nullness")
+  private static FlowReceiver createFlowReceiver(
+      JCSMPSession jcsmpSession,
+      ConsumerFlowProperties flowProperties,
+      EndpointProperties endpointProperties)
+      throws JCSMPException {
+    return jcsmpSession.createFlow(null, flowProperties, endpointProperties);
+  }
+
+  private int connectSession() throws JCSMPException {
+    if (jcsmpSession == null) {
+      jcsmpSession = createSessionObject();
+    }
+    jcsmpSession.connect();
+    return 0;
+  }
+
+  private JCSMPSession createSessionObject() throws InvalidPropertiesException 
{
+    JCSMPProperties properties = new JCSMPProperties();
+    properties.setProperty(JCSMPProperties.HOST, host);
+    properties.setProperty(JCSMPProperties.USERNAME, username);
+    properties.setProperty(JCSMPProperties.PASSWORD, password);
+    properties.setProperty(JCSMPProperties.VPN_NAME, vpnName);
+
+    return JCSMPFactory.onlyInstance().createSession(properties);
+  }
+}
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
new file mode 100644
index 00000000000..8cb4ff0af05
--- /dev/null
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.auto.value.AutoValue;
+
+/**
+ * A factory for creating {@link BasicAuthJcsmpSessionService} instances. 
Extends {@link
+ * SessionServiceFactory}.
+ *
+ * <p>This factory provides a way to create {@link 
BasicAuthJcsmpSessionService} instances with
+ * authenticate to Solace with Basic Authentication.
+ */
+@AutoValue
+public abstract class BasicAuthJcsmpSessionServiceFactory extends 
SessionServiceFactory {
+  public abstract String host();
+
+  public abstract String username();
+
+  public abstract String password();
+
+  public abstract String vpnName();
+
+  public static Builder builder() {
+    return new AutoValue_BasicAuthJcsmpSessionServiceFactory.Builder();
+  }
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    /**
+     * Set Solace host, format: Host[:Port] e.g. "12.34.56.78", or 
"[fe80::1]", or
+     * "12.34.56.78:4444".
+     */
+    public abstract Builder host(String host);
+
+    /** Set Solace username. */
+    public abstract Builder username(String username);
+    /** Set Solace password. */
+    public abstract Builder password(String password);
+
+    /** Set Solace vpn name. */
+    public abstract Builder vpnName(String vpnName);
+
+    public abstract BasicAuthJcsmpSessionServiceFactory build();
+  }
+
+  @Override
+  public SessionService create() {
+    return new BasicAuthJcsmpSessionService(
+        checkStateNotNull(queue, "SolaceIO.Read: Queue is not set.").getName(),
+        host(),
+        username(),
+        password(),
+        vpnName());
+  }
+}
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java
new file mode 100644
index 00000000000..e5f129d3ddf
--- /dev/null
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.FlowReceiver;
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.StaleSessionException;
+import java.io.IOException;
+import org.apache.beam.sdk.io.solace.RetryCallableManager;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SolaceMessageReceiver implements MessageReceiver {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SolaceMessageReceiver.class);
+
+  public static final int DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS = 100;
+  private final FlowReceiver flowReceiver;
+  private final RetryCallableManager retryCallableManager = 
RetryCallableManager.create();
+
+  public SolaceMessageReceiver(FlowReceiver flowReceiver) {
+    this.flowReceiver = flowReceiver;
+  }
+
+  @Override
+  public void start() {
+    startFlowReceiver();
+  }
+
+  private void startFlowReceiver() {
+    retryCallableManager.retryCallable(
+        () -> {
+          flowReceiver.start();
+          return 0;
+        },
+        ImmutableSet.of(JCSMPException.class));
+  }
+
+  @Override
+  public boolean isClosed() {
+    return flowReceiver == null || flowReceiver.isClosed();
+  }
+
+  @Override
+  public BytesXMLMessage receive() throws IOException {
+    try {
+      return flowReceiver.receive(DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS);
+    } catch (StaleSessionException e) {
+      LOG.warn("SolaceIO: Caught StaleSessionException, restarting the 
FlowReceiver.");
+      startFlowReceiver();
+      throw new IOException(e);
+    } catch (JCSMPException e) {
+      throw new IOException(e);
+    }
+  }
+}

Reply via email to