This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 335365874a NIFI-11535: Transfer ConnectWebsocket connection 
configuration FlowFile to relationships
335365874a is described below

commit 335365874a249dfb3c1044cf39bfbc584085d83a
Author: Lehel <lehe...@hotmail.com>
AuthorDate: Mon May 15 15:52:12 2023 +0200

    NIFI-11535: Transfer ConnectWebsocket connection configuration FlowFile to 
relationships
    
    Also moved dto and util packages under org.apache.nifi.websocket.jetty
    
    This closes #7246.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../AbstractWebSocketGatewayProcessor.java         | 19 +++++++-
 .../processors/websocket/ConnectWebSocket.java     |  2 +
 .../processors/websocket/TestConnectWebSocket.java | 52 ++++++++++++++++++----
 .../nifi/websocket/jetty/JettyWebSocketClient.java |  4 +-
 .../nifi/websocket/jetty}/dto/SessionInfo.java     |  2 +-
 .../websocket/jetty}/util/HeaderMapExtractor.java  |  2 +-
 .../websocket/util/HeaderMapExtractorTest.java     |  2 +-
 7 files changed, 67 insertions(+), 16 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
index 9824c4a6d2..25ea1da832 100644
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java
@@ -77,6 +77,18 @@ public abstract class AbstractWebSocketGatewayProcessor 
extends AbstractSessionF
             .description("The WebSocket binary message output")
             .build();
 
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFile holding connection configuration attributes 
(like URL or HTTP headers) in case of successful connection")
+            .autoTerminateDefault(true)
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFile holding connection configuration attributes 
(like URL or HTTP headers) in case of connection failure")
+            .autoTerminateDefault(true)
+            .build();
+
     static Set<Relationship> getAbstractRelationships() {
         final Set<Relationship> relationships = new HashSet<>();
         relationships.add(REL_CONNECTED);
@@ -130,8 +142,11 @@ public abstract class AbstractWebSocketGatewayProcessor 
extends AbstractSessionF
                 final FlowFile flowFile = session.get();
                 try {
                     webSocketClientService.connect(endpointId, 
flowFile.getAttributes());
-                } finally {
-                    session.remove(flowFile);
+                    session.transfer(flowFile, REL_SUCCESS);
+                    session.commitAsync();
+                } catch (Exception e) {
+                    getLogger().error("Websocket connection failure", e);
+                    session.transfer(flowFile, REL_FAILURE);
                     session.commitAsync();
                 }
             } else {
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/ConnectWebSocket.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/ConnectWebSocket.java
index be53854d7b..7e2e142014 100644
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/ConnectWebSocket.java
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/ConnectWebSocket.java
@@ -89,6 +89,8 @@ public class ConnectWebSocket extends 
AbstractWebSocketGatewayProcessor {
         descriptors = Collections.unmodifiableList(innerDescriptorsList);
 
         final Set<Relationship> innerRelationshipsSet = 
getAbstractRelationships();
+        innerRelationshipsSet.add(REL_SUCCESS);
+        innerRelationshipsSet.add(REL_FAILURE);
         relationships = Collections.unmodifiableSet(innerRelationshipsSet);
     }
 
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
index 01c7b8079b..5b6d644749 100644
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java
@@ -136,17 +136,9 @@ class TestConnectWebSocket extends TestListenWebSocket {
         final String serviceId = "ws-service";
         final String endpointId = "client-1";
 
-        Map<String, String> attributes = new HashMap<>();
-        attributes.put("dynamicUrlPart", "test");
-        MockFlowFile flowFile = new MockFlowFile(1L);
-        flowFile.putAttributes(attributes);
+        MockFlowFile flowFile = getFlowFile();
         runner.enqueue(flowFile);
 
-        attributes.put("dynamicUrlPart", "test2");
-        MockFlowFile flowFileWithWrongUrl = new MockFlowFile(2L);
-        flowFileWithWrongUrl.putAttributes(attributes);
-        runner.enqueue(flowFileWithWrongUrl);
-
         JettyWebSocketClient service = new JettyWebSocketClient();
 
 
@@ -162,10 +154,44 @@ class TestConnectWebSocket extends TestListenWebSocket {
         final List<MockFlowFile> flowFilesForRelationship = 
runner.getFlowFilesForRelationship(ConnectWebSocket.REL_CONNECTED);
         assertEquals(1, flowFilesForRelationship.size());
 
+        final List<MockFlowFile> flowFilesForSuccess = 
runner.getFlowFilesForRelationship(ConnectWebSocket.REL_SUCCESS);
+        assertEquals(1, flowFilesForSuccess.size());
+
         runner.stop();
         webSocketListener.stop();
     }
 
+    @Test
+    void testDynamicUrlsParsedFromFlowFileButNotAbleToConnect() throws 
InitializationException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConnectWebSocket.class);
+
+        final String serviceId = "ws-service";
+        final String endpointId = "client-1";
+
+        MockFlowFile flowFile = getFlowFile();
+        runner.enqueue(flowFile);
+
+        JettyWebSocketClient service = new JettyWebSocketClient();
+
+
+        runner.addControllerService(serviceId, service);
+        runner.setProperty(service, JettyWebSocketClient.WS_URI, 
"ws://localhost/${dynamicUrlPart}");
+        runner.enableControllerService(service);
+
+        runner.setProperty(ConnectWebSocket.PROP_WEBSOCKET_CLIENT_SERVICE, 
serviceId);
+        runner.setProperty(ConnectWebSocket.PROP_WEBSOCKET_CLIENT_ID, 
endpointId);
+
+        runner.run(1, false);
+
+        final List<MockFlowFile> flowFilesForRelationship = 
runner.getFlowFilesForRelationship(ConnectWebSocket.REL_CONNECTED);
+        assertEquals(0, flowFilesForRelationship.size());
+
+        final List<MockFlowFile> flowFilesForSuccess = 
runner.getFlowFilesForRelationship(ConnectWebSocket.REL_FAILURE);
+        assertEquals(1, flowFilesForSuccess.size());
+
+        runner.stop();
+    }
+
     private TestRunner getListenWebSocket(final int port) throws 
InitializationException {
         final TestRunner runner = 
TestRunners.newTestRunner(ListenWebSocket.class);
 
@@ -180,4 +206,12 @@ class TestConnectWebSocket extends TestListenWebSocket {
 
         return runner;
     }
+
+    private MockFlowFile getFlowFile() {
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("dynamicUrlPart", "test");
+        MockFlowFile flowFile = new MockFlowFile(1L);
+        flowFile.putAttributes(attributes);
+        return flowFile;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
index 83fa91090b..712b2c1ce6 100644
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketClient.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.websocket.jetty;
 
-import dto.SessionInfo;
+import org.apache.nifi.websocket.jetty.dto.SessionInfo;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
@@ -42,7 +42,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
 import org.eclipse.jetty.websocket.client.WebSocketClient;
-import util.HeaderMapExtractor;
+import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor;
 
 import java.io.IOException;
 import java.net.URI;
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/dto/SessionInfo.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/dto/SessionInfo.java
similarity index 94%
rename from 
nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/dto/SessionInfo.java
rename to 
nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/dto/SessionInfo.java
index 7caa2f5ff9..11f379c9e0 100644
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/dto/SessionInfo.java
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/dto/SessionInfo.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package dto;
+package org.apache.nifi.websocket.jetty.dto;
 
 import java.util.Map;
 
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/util/HeaderMapExtractor.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/util/HeaderMapExtractor.java
similarity index 95%
rename from 
nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/util/HeaderMapExtractor.java
rename to 
nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/util/HeaderMapExtractor.java
index f29db9f416..45c09e57ce 100644
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/util/HeaderMapExtractor.java
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/util/HeaderMapExtractor.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package util;
+package org.apache.nifi.websocket.jetty.util;
 
 import org.apache.nifi.util.StringUtils;
 
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/util/HeaderMapExtractorTest.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/util/HeaderMapExtractorTest.java
index 28d57e5021..211d22fdb1 100644
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/util/HeaderMapExtractorTest.java
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/util/HeaderMapExtractorTest.java
@@ -17,7 +17,7 @@
 package org.apache.nifi.websocket.util;
 
 import org.junit.jupiter.api.Test;
-import util.HeaderMapExtractor;
+import org.apache.nifi.websocket.jetty.util.HeaderMapExtractor;
 
 import java.util.Arrays;
 import java.util.Collections;

Reply via email to