[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16281850#comment-16281850
 ] 

ASF GitHub Bot commented on APEXMALHAR-2548:
--------------------------------------------

tweise closed pull request #676: APEXMALHAR-2548 Using the correct websocket 
scheme when connecting to a SSL cluster
URL: https://github.com/apache/apex-malhar/pull/676
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java 
b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java
index 98dfebd7b8..82c9214fa9 100644
--- 
a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java
+++ 
b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java
@@ -18,19 +18,24 @@
  */
 package com.datatorrent.apps.logstream;
 
-import java.net.URI;
-import java.util.*;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator;
 import 
org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition;
-
-import org.apache.commons.lang.StringUtils;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.hadoop.conf.Configuration;
 
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator;
+import com.datatorrent.contrib.redis.RedisMapOutputOperator;
+import com.datatorrent.contrib.redis.RedisNumberSummationMapOutputOperator;
 import com.datatorrent.lib.algo.TopN;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
 import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
@@ -43,13 +48,6 @@
 import com.datatorrent.lib.util.AbstractDimensionTimeBucketOperator;
 import com.datatorrent.lib.util.DimensionTimeBucketSumOperator;
 
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Operator.InputPort;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator;
-import com.datatorrent.contrib.redis.RedisMapOutputOperator;
-import com.datatorrent.contrib.redis.RedisNumberSummationMapOutputOperator;
-
 /**
  * Log stream processing application based on Apex platform.<br>
  * This application consumes log data generated by running systems and services
@@ -156,14 +154,12 @@ private SYSTEM_KEYS(String value)
 
   private InputPort<Object> wsOutput(DAG dag, String operatorName)
   {
-    String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-    if (!StringUtils.isEmpty(daemonAddress)) {
-      URI uri = URI.create("ws://" + daemonAddress + "/pubsub");
+    if (PubSubHelper.isGatewayConfigured(dag)) {
       String appId = "appid";
       //appId = dag.attrValue(DAG.APPLICATION_ID, null); // will be used once 
UI is able to pick applications from list and listen to corresponding 
application
       String topic = "apps.logstream." + appId + "." + operatorName;
       PubSubWebSocketOutputOperator<Object> wsOut = 
dag.addOperator(operatorName, new PubSubWebSocketOutputOperator<Object>());
-      wsOut.setUri(uri);
+      wsOut.setUri(PubSubHelper.getURI(dag));
       wsOut.setTopic(topic);
       return wsOut.input;
     }
diff --git 
a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java
 
b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java
index 73c38ef6cb..eed9a683e7 100644
--- 
a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java
+++ 
b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java
@@ -20,11 +20,13 @@
 
 import java.io.Serializable;
 import java.net.URI;
+
 import org.apache.apex.examples.frauddetect.operator.HdfsStringOutputOperator;
 import org.apache.apex.examples.frauddetect.operator.MongoDBOutputOperator;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context;
-import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
@@ -88,11 +90,7 @@ public void populateDAG(DAG dag, Configuration conf)
   {
 
     try {
-      String gatewayAddress = dag.getValue(DAGContext.GATEWAY_CONNECT_ADDRESS);
-      if (gatewayAddress == null) {
-        gatewayAddress = "localhost:9090";
-      }
-      URI duri = URI.create("ws://" + gatewayAddress + "/pubsub");
+      URI duri = PubSubHelper.getURIWithDefault(dag, "localhost:9090");
 
       PubSubWebSocketInputOperator userTxWsInput = 
getPubSubWebSocketInputOperator("userTxInput", dag, duri, 
"examples.app.frauddetect.submitTransaction");
       PubSubWebSocketOutputOperator ccUserAlertWsOutput = 
getPubSubWebSocketOutputOperator("ccUserAlertQueryOutput", dag, duri, 
"examples.app.frauddetect.fraudAlert");
diff --git 
a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java
 
b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java
index f719643546..dd1e1363ca 100644
--- 
a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java
+++ 
b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java
@@ -26,6 +26,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.commons.lang3.Range;
 import org.apache.hadoop.conf.Configuration;
@@ -157,8 +158,7 @@ public void populateDAG(DAG dag, Configuration conf)
     // done generating data
     LOG.info("Finished generating seed data.");
 
-    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-    URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+    URI uri = PubSubHelper.getURI(dag);
     PubSubWebSocketOutputOperator<Object> wsOut = 
dag.addOperator("LocationResults", new PubSubWebSocketOutputOperator<Object>());
     wsOut.setUri(uri);
     PubSubWebSocketInputOperator<Map<String, String>> wsIn = 
dag.addOperator("QueryLocation", new PubSubWebSocketInputOperator<Map<String, 
String>>());
diff --git 
a/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java
 
b/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java
index ce6ca41a97..b88ed57258 100644
--- 
a/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java
+++ 
b/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java
@@ -21,7 +21,6 @@
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
-
 import javax.servlet.Servlet;
 
 import org.eclipse.jetty.server.Connector;
@@ -32,11 +31,10 @@
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.LocalMode;
-
 import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
 import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
 import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
@@ -67,7 +65,7 @@ public void testGetApplication() throws Exception
     server.start();
     Connector[] connector = server.getConnectors();
     conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + 
connector[0].getLocalPort());
-    URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + 
"/pubsub");
+    URI uri = PubSubHelper.getURI("localhost:" + connector[0].getLocalPort());
 
     PubSubWebSocketOutputOperator<Object> outputOperator = new 
PubSubWebSocketOutputOperator<Object>();
     outputOperator.setUri(uri);
diff --git 
a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java
 
b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java
index 288da84a3e..55d98aa0bc 100644
--- 
a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java
+++ 
b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java
@@ -23,6 +23,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.DAG;
@@ -48,10 +49,8 @@
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
-    String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
     MRJobStatusOperator mrJobOperator = dag.addOperator("JobMonitor", new 
MRJobStatusOperator());
-    URI uri = URI.create("ws://" + daemonAddress + "/pubsub");
-    logger.info("WebSocket with daemon at {}", daemonAddress);
+    URI uri = PubSubHelper.getURI(dag);
 
     PubSubWebSocketInputOperator wsIn = dag.addOperator("Query", new 
PubSubWebSocketInputOperator());
     wsIn.setUri(uri);
diff --git 
a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java
 
b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java
index be7edfbfdc..225ea25039 100644
--- 
a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java
+++ 
b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java
@@ -18,9 +18,9 @@
  */
 package org.apache.apex.examples.twitter;
 
-import java.net.URI;
-import org.apache.commons.lang.StringUtils;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.Operator.InputPort;
@@ -174,13 +174,11 @@
 
   private InputPort<Object> consoleOutput(DAG dag, String operatorName)
   {
-    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-    if (!StringUtils.isEmpty(gatewayAddress)) {
-      URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+    if (PubSubHelper.isGatewayConfigured(dag)) {
       String topic = "examples.twitter." + operatorName;
       //LOG.info("WebSocket with gateway at: {}", gatewayAddress);
       PubSubWebSocketOutputOperator<Object> wsOut = 
dag.addOperator(operatorName, new PubSubWebSocketOutputOperator<Object>());
-      wsOut.setUri(uri);
+      wsOut.setUri(PubSubHelper.getURI(dag));
       wsOut.setTopic(topic);
       return wsOut.input;
     }
diff --git 
a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java
 
b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java
index ee43383874..77384a81a3 100644
--- 
a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java
+++ 
b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java
@@ -22,11 +22,10 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.collect.Maps;
-
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -34,7 +33,6 @@
 import com.datatorrent.api.Operator.OutputPort;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-
 import com.datatorrent.contrib.twitter.TwitterSampleInput;
 import com.datatorrent.lib.algo.UniqueCounter;
 import com.datatorrent.lib.appdata.schemas.SchemaUtils;
@@ -189,9 +187,8 @@ public void populateDAG(DAG dag, Configuration conf)
 
   public static void consoleOutput(DAG dag, String operatorName, 
OutputPort<List<Map<String, Object>>> topCount, String schemaFile, String alias)
   {
-    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-    if (!StringUtils.isEmpty(gatewayAddress)) {
-      URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+    if (PubSubHelper.isGatewayConfigured(dag)) {
+      URI uri = PubSubHelper.getURI(dag);
 
       AppDataSnapshotServerMap snapshotServer = 
dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap());
 
diff --git 
a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java
 
b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java
index 699469b6de..440b30abe8 100644
--- 
a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java
+++ 
b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java
@@ -22,15 +22,13 @@
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang.StringUtils;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-
 import com.datatorrent.lib.appdata.schemas.SchemaUtils;
 import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
@@ -84,10 +82,8 @@ public void populateDAG(DAG dag, Configuration conf)
     dag.addStream("windowWordCounts", windowWordCount.output, 
fileWordCount.input);
     dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input);
 
-    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-
-    if (!StringUtils.isEmpty(gatewayAddress)) {        // add query support
-      URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+    if (PubSubHelper.isGatewayConfigured(dag)) {        // add query support
+      URI uri = PubSubHelper.getURI(dag);
 
       AppDataSnapshotServerMap snapshotServerFile
           = dag.addOperator("snapshotServerFile", new 
AppDataSnapshotServerMap());
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
index 3f2029efbe..9b1e0cf322 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
@@ -19,14 +19,13 @@
 package com.datatorrent.lib.io;
 
 import java.net.URI;
-import java.net.URISyntaxException;
-
 import javax.validation.constraints.Min;
 
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG;
@@ -116,10 +115,8 @@ public static URI uriHelper(OperatorContext context, URI 
uri)
       }
 
       try {
-        uri = new URI("ws://"
-                      + context.getValue(DAG.GATEWAY_CONNECT_ADDRESS)
-                      + "/pubsub");
-      } catch (URISyntaxException ex) {
+        uri = PubSubHelper.getURI(context);
+      } catch (Exception ex) {
         throw new RuntimeException(ex);
       }
     }
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java 
b/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
index b027b582dd..a9030d42ed 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
@@ -20,17 +20,15 @@
 
 import java.io.IOException;
 import java.lang.reflect.Array;
-import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
 import com.google.common.collect.Maps;
-
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultInputPort;
@@ -129,9 +127,8 @@ public String convertMapToMessage(Pair<String, Object> t) 
throws IOException
   @Override
   public void setup(OperatorContext context)
   {
-    String gatewayAddress = context.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-    if (!StringUtils.isEmpty(gatewayAddress)) {
-      wsoo.setUri(URI.create("ws://" + gatewayAddress + "/pubsub"));
+    if (PubSubHelper.isGatewayConfigured(context)) {
+      wsoo.setUri(PubSubHelper.getURI(context));
       wsoo.setup(context);
     } else {
       isWebSocketConnected = false;
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/utils/PubSubHelper.java 
b/library/src/main/java/org/apache/apex/malhar/lib/utils/PubSubHelper.java
new file mode 100644
index 0000000000..51eaeee7ea
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/PubSubHelper.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils;
+
+import java.net.URI;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+
+@InterfaceStability.Evolving
+public class PubSubHelper
+{
+  private static final Logger logger = 
LoggerFactory.getLogger(PubSubHelper.class);
+
+  public static boolean isGatewayConfigured(Context context)
+  {
+    return context.getValue(Context.DAGContext.GATEWAY_CONNECT_ADDRESS) != 
null;
+  }
+
+  public static URI getURI(Context context)
+  {
+    return getURIWithDefault(context, null);
+  }
+
+  public static URI getURIWithDefault(Context context, String defaultAddress)
+  {
+    String address = 
context.getValue(Context.DAGContext.GATEWAY_CONNECT_ADDRESS);
+    if (address == null) {
+      address = defaultAddress;
+    }
+    return getURI(address, 
context.getValue(Context.DAGContext.GATEWAY_USE_SSL));
+  }
+
+  public static URI getURI(String address)
+  {
+    return getURI(address, false);
+  }
+
+  public static URI getURI(String address, boolean useSSL)
+  {
+    if (address == null) {
+      throw new NullPointerException("No address specified");
+    }
+    String uri = (useSSL ? "wss://" : "ws://") + address + "/pubsub";
+    logger.debug("PubSub uri {}", uri);
+    return URI.create(uri);
+  }
+}
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
 
b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
index 7801619630..fc49bea38e 100644
--- 
a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
+++ 
b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
@@ -20,25 +20,25 @@
 
 import java.lang.reflect.Method;
 import java.net.URI;
-import java.net.URISyntaxException;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 
 import com.datatorrent.common.experimental.AppData;
 
 public abstract class PubSubWebSocketAppDataOperatorTest
 {
   public static final String GATEWAY_CONNECT_ADDRESS_STRING = "my.gateway.com";
-  public static final String URI_ADDRESS_STRING = "ws://localhost:6666/pubsub";
+  public static final String URI_ADDRESS_STRING = "localhost:6666";
   public static final URI GATEWAY_CONNECT_ADDRESS;
   public static final URI URI_ADDRESS;
 
   static {
     try {
-      GATEWAY_CONNECT_ADDRESS = new URI("ws://" + 
GATEWAY_CONNECT_ADDRESS_STRING + "/pubsub");
-      URI_ADDRESS = new URI(URI_ADDRESS_STRING);
-    } catch (URISyntaxException ex) {
+      GATEWAY_CONNECT_ADDRESS = 
PubSubHelper.getURI(GATEWAY_CONNECT_ADDRESS_STRING);
+      URI_ADDRESS = PubSubHelper.getURI(URI_ADDRESS_STRING);
+    } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
   }
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java 
b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
index e165649b62..7f1e4dd09c 100644
--- 
a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
+++ 
b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
@@ -30,6 +30,8 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
+
 import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 
@@ -51,7 +53,7 @@ public void testPubSubWebSocket() throws Exception
     contextHandler.addServlet(sh, "/*");
     server.start();
     Connector[] connector = server.getConnectors();
-    URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + 
"/pubsub");
+    URI uri = PubSubHelper.getURI("localhost:" + connector[0].getLocalPort());
 
     PubSubWebSocketOutputOperator<Object> outputOperator = new 
PubSubWebSocketOutputOperator<Object>();
     outputOperator.setUri(uri);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pubsub operators do not use the correct URL to connect to web socket server 
> in SSL mode
> ---------------------------------------------------------------------------------------
>
>                 Key: APEXMALHAR-2548
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2548
>             Project: Apache Apex Malhar
>          Issue Type: Bug
>            Reporter: Pramod Immaneni
>            Assignee: Pramod Immaneni
>
> In SSL mode, pub sub operators need to use wss protocol instead of ws. Today 
> they use ws.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to