exceptionfactory commented on code in PR #6225:
URL: https://github.com/apache/nifi/pull/6225#discussion_r957965217


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -31,89 +31,94 @@
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.TlsException;
 import org.apache.nifi.ssl.SSLContextService;
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import java.util.Properties;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
+import static org.apache.commons.lang3.EnumUtils.isValidEnumIgnoreCase;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_500;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
 
 public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProcessor {
 
-    public static int DISCONNECT_TIMEOUT = 5000;
+    private static final String DEFAULT_SESSION_EXPIRY_INTERVAL = "24 hrs";
 
     protected ComponentLog logger;
-    protected IMqttClient mqttClient;
-    protected volatile String broker;
-    protected volatile String brokerUri;
-    protected volatile String clientID;
-    protected MqttConnectOptions connOpts;
-    protected MemoryPersistence persistence = new MemoryPersistence();
 
-    public ProcessSessionFactory processSessionFactory;
+    protected MqttClientProperties clientProperties;
 
-    public static final Validator QOS_VALIDATOR = new Validator() {
+    protected MqttClientFactory mqttClientFactory = new MqttClientFactory();
+    protected MqttClient mqttClient;
 
-        @Override
-        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
-            Integer inputInt = Integer.parseInt(input);
-            if (inputInt < 0 || inputInt > 2) {
-                return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must 
be an integer between 0 and 2").build();
-            }
-            return new 
ValidationResult.Builder().subject(subject).valid(true).build();
+    public ProcessSessionFactory processSessionFactory;
+
+    public static final Validator QOS_VALIDATOR = (subject, input, context) -> 
{
+        Integer inputInt = Integer.parseInt(input);
+        if (inputInt < 0 || inputInt > 2) {
+            return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must 
be an integer between 0 and 2.").build();
         }
+        return new 
ValidationResult.Builder().subject(subject).valid(true).build();
     };
 
-    public static final Validator BROKER_VALIDATOR = new Validator() {
-
-        @Override
-        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
-            try{
-                URI brokerURI = new URI(input);
-                if (!"".equals(brokerURI.getPath())) {
-                    return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("the 
broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
-                }
-                if (!("tcp".equals(brokerURI.getScheme()) || 
"ssl".equals(brokerURI.getScheme()) || "ws".equals(brokerURI.getScheme()) || 
"wss".equals(brokerURI.getScheme()))) {
-                    return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("only the 
'tcp', 'ssl', 'ws' and 'wss' schemes are supported.").build();
-                }
-            } catch (URISyntaxException e) {
-                return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("it is not 
valid URI syntax.").build();
+    public static final Validator BROKER_VALIDATOR = (subject, input, context) 
-> {
+        try {
+            URI brokerURI = new URI(input);
+            if (!EMPTY.equals(brokerURI.getPath())) {
+                return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("the 
broker URI cannot have a path. It currently is: " + 
brokerURI.getPath()).build();
             }
-            return new 
ValidationResult.Builder().subject(subject).valid(true).build();
+            if (!isValidEnumIgnoreCase(MqttProtocolScheme.class, 
brokerURI.getScheme())) {
+                return new 
ValidationResult.Builder().subject(subject).valid(false)
+                        .explanation("scheme is invalid. Supported schemes 
are: " + getSupportedSchemeList()).build();
+            }
+        } catch (URISyntaxException e) {
+            return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("it is not 
valid URI syntax.").build();
         }
+        return new 
ValidationResult.Builder().subject(subject).valid(true).build();
     };
 
-    public static final Validator RETAIN_VALIDATOR = new Validator() {
-
-        @Override
-        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
-            if("true".equalsIgnoreCase(input) || 
"false".equalsIgnoreCase(input)){
-                return new 
ValidationResult.Builder().subject(subject).valid(true).build();
-            } else{
-                return 
StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN,
 false)
-                        .validate(subject, input, context);
-            }
+    private static String getSupportedSchemeList() {
+        return String.join(", ", 
Arrays.stream(MqttProtocolScheme.values()).map(value -> 
value.name().toLowerCase()).toArray(String[]::new));

Review Comment:
   This can be modified to use Collectors.joining():
   ```suggestion
           return Arrays.stream(MqttProtocolScheme.values())
               .map(value -> value.name().toLowerCase())
               .collect(Collectors.joining(", ");
   ```



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -384,16 +331,60 @@ public final void onTrigger(final ProcessContext context, 
final ProcessSessionFa
             onTrigger(context, session);
             session.commitAsync();
         } catch (final Throwable t) {
-            getLogger().error("{} failed to process due to {}; rolling back 
session", new Object[]{this, t});
+            getLogger().error("{} failed to process due to {}; rolling back 
session", this, t);
             session.rollback(true);
             throw t;
         }
     }
 
     public abstract void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException;
 
-    protected boolean isConnected(){
+    protected boolean isConnected() {
         return (mqttClient != null && mqttClient.isConnected());
     }
 
+    protected MqttClientProperties getMqttClientProperties(final 
ProcessContext context) {
+        final MqttClientProperties clientProperties = new 
MqttClientProperties();
+
+        try {
+            clientProperties.setBrokerUri(new 
URI(context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue()));
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Invalid Broker URI", e);
+        }
+
+        String clientId = 
context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
+        if (clientId == null) {
+            clientId = UUID.randomUUID().toString();
+        }
+        clientProperties.setClientId(clientId);
+
+        
clientProperties.setMqttVersion(MqttVersion.fromVersionCode(context.getProperty(PROP_MQTT_VERSION).asInteger()));
+
+        
clientProperties.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
+        
clientProperties.setSessionExpiryInterval(context.getProperty(PROP_SESSION_EXPIRY_INTERVAL).asTimePeriod(TimeUnit.SECONDS));
+
+        
clientProperties.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
+        
clientProperties.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
+
+        final PropertyValue sslProp = 
context.getProperty(PROP_SSL_CONTEXT_SERVICE);
+        if (sslProp.isSet()) {
+            final SSLContextService sslContextService = (SSLContextService) 
sslProp.asControllerService();

Review Comment:
   This should be changed to pass the reference class to 
`asControllerService()`:
   ```suggestion
               final SSLContextService sslContextService = 
sslProp.asControllerService(SSLContextService.class);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to