[ 
https://issues.apache.org/jira/browse/NIFI-1899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15368238#comment-15368238
 ] 

ASF GitHub Bot commented on NIFI-1899:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/483#discussion_r70126772
  
    --- Diff: 
nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
 ---
    @@ -0,0 +1,385 @@
    +/*
    + *  Licensed to the Apache Software Foundation (ASF) under one or more
    + *  contributor license agreements.  See the NOTICE file distributed with
    + *  this work for additional information regarding copyright ownership.
    + *  The ASF licenses this file to You under the Apache License, Version 2.0
    + *  (the "License"); you may not use this file except in compliance with
    + *  the License.  You may obtain a copy of the License at
    + *
    + *  http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    +*/
    +package org.apache.nifi.processors.email;
    +
    +import javax.net.ssl.SSLContext;
    +import javax.net.ssl.SSLSocket;
    +import javax.net.ssl.SSLSocketFactory;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.net.InetSocketAddress;
    +import java.net.Socket;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processors.email.smtp.event.SmtpEvent;
    +import 
org.apache.nifi.processors.email.smtp.handler.SMTPMessageHandlerFactory;
    +import org.subethamail.smtp.server.SMTPServer;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.ssl.SSLContextService;
    +
    +@Tags({"listen", "email", "smtp"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("This processor implements a lightweight SMTP 
server to an arbitrary port, " +
    +        "allowing nifi to listen for incoming email. " +
    +        "" +
    +        "Note this server does not perform any email validation. If direct 
exposure to the internet is sought," +
    +        "it may be a better idea to use the combination of NiFi and an 
industrial scale MTA (e.g. Postfix)")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "mime.type", description = "The value 
used during HELO"),
    +        @WritesAttribute(attribute = "smtp.helo", description = "The value 
used during HELO"),
    +        @WritesAttribute(attribute = "smtp.certificates.*.serial", 
description = "The serial numbers for each of the " +
    +                "certificates used by an TLS peer"),
    +        @WritesAttribute(attribute = "smtp.certificates.*.principal", 
description = "The principal for each of the " +
    +                "certificates used by an TLS peer"),
    +        @WritesAttribute(attribute = "smtp.from", description = "The value 
used during MAIL FROM (i.e. envelope)"),
    +        @WritesAttribute(attribute = "smtp.to", description = "The value 
used during RCPT TO (i.e. envelope)")})
    +
    +public class ListenSMTP extends AbstractProcessor {
    +    public static final String SMTP_HELO = "smtp.helo";
    +    public static final String SMTP_FROM = "smtp.from";
    +    public static final String SMTP_TO = "smtp.to";
    +    public static final String MIME_TYPE = "message/rfc822";
    +
    +
    +    protected static final PropertyDescriptor SMTP_PORT = new 
PropertyDescriptor.Builder()
    +            .name("SMTP_PORT")
    +            .displayName("Listening Port")
    +            .description("The TCP port the ListenSMTP processor will bind 
to." +
    +                    "NOTE that on Unix derivative operating  systems this 
port must " +
    +                    "be higher than 1024 unless NiFi is running as with 
root user permissions.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    protected static final PropertyDescriptor SMTP_HOSTNAME = new 
PropertyDescriptor.Builder()
    +            .name("SMTP_HOSTNAME")
    +            .displayName("SMTP hostname")
    +            .description("The hostname to be embedded into the banner 
displayed when an " +
    +                    "SMTP client connects to the processor TCP port .")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    protected static final PropertyDescriptor SMTP_MAXIMUM_CONNECTIONS = 
new PropertyDescriptor.Builder()
    +            .name("SMTP_MAXIMUM_CONNECTIONS")
    +            .displayName("Maximum number of SMTP connection")
    +            .description("The maximum number of simultaneous SMTP 
connections.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .build();
    +
    +    protected static final PropertyDescriptor SMTP_TIMEOUT = new 
PropertyDescriptor.Builder()
    +            .name("SMTP_TIMEOUT")
    +            .displayName("SMTP connection timeout")
    +            .description("The maximum time to wait for an action of SMTP 
client.")
    +            .defaultValue("10 seconds")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    protected static final PropertyDescriptor SMTP_MAXIMUM_MSG_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("SMTP_MAXIMUM_MSG_SIZE")
    +            .displayName("SMTP Maximum Message Size")
    +            .description("The maximum number of bytes the server will 
accept.")
    +            .required(true)
    +            .defaultValue("20MB")
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
    +            .name("SSL_CONTEXT_SERVICE")
    +            .displayName("SSL Context Service")
    +            .description("The Controller Service to use in order to obtain 
an SSL Context. If this property is set, " +
    +                    "messages will be received over a secure connection.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +    public static final PropertyDescriptor CLIENT_AUTH = new 
PropertyDescriptor.Builder()
    +            .name("CLIENT_AUTH")
    +            .displayName("Client Auth")
    +            .description("The client authentication policy to use for the 
SSL Context. Only used if an SSL Context Service is provided.")
    +            .required(false)
    +            .allowableValues(SSLContextService.ClientAuth.NONE.toString(), 
SSLContextService.ClientAuth.REQUIRED.toString())
    +            .build();
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
    +        final List<ValidationResult> results = new ArrayList<>();
    +
    +        final String clientAuth = 
validationContext.getProperty(CLIENT_AUTH).getValue();
    +        final SSLContextService sslContextService = 
validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    +
    +        if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
    +            results.add(new ValidationResult.Builder()
    +                    .explanation("Client Auth must be provided when using 
TLS/SSL")
    +                    .valid(false).subject("Client Auth").build());
    +        }
    +
    +        return results;
    +
    +    }
    +
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("Extraction was successful")
    +            .build();
    +
    +    private Set<Relationship> relationships;
    +    private List<PropertyDescriptor> propertyDescriptors;
    +    private volatile LinkedBlockingQueue<SmtpEvent> messages;
    +
    +    private volatile SMTPServer server;
    +    private AtomicBoolean initialized = new AtomicBoolean(false);
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +
    +        final List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(SMTP_PORT);
    +        props.add(SMTP_HOSTNAME);
    +        props.add(SMTP_MAXIMUM_CONNECTIONS);
    +        props.add(SMTP_TIMEOUT);
    +        props.add(SMTP_MAXIMUM_MSG_SIZE);
    +        props.add(SSL_CONTEXT_SERVICE);
    +        props.add(CLIENT_AUTH);
    +        this.propertyDescriptors = Collections.unmodifiableList(props);
    +
    +    }
    +
    +    final ComponentLog logger = getLogger();
    +
    +    // Upon Schedule, reset the initialized state to false
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        initialized.set(false);
    +    }
    +
    +    protected synchronized void initializeSMTPServer(final ProcessContext 
context) throws Exception {
    +        if (initialized.get()) {
    +            return;
    +        }
    +
    +        messages = new LinkedBlockingQueue<>(1024);
    +        String clientAuth = null;
    +
    +        new SMTPMessageHandlerFactory(messages);
    +
    +        // If an SSLContextService was provided then create an SSLContext 
to pass down to the server
    +        SSLContext sslContext = null;
    +        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    +        if (sslContextService != null) {
    +            clientAuth = context.getProperty(CLIENT_AUTH).getValue();
    +            sslContext = 
sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
    +        }
    +
    +        final SSLContext finalSslContext = sslContext;
    +
    +        SMTPMessageHandlerFactory smtpMessageHandlerFactory = new 
SMTPMessageHandlerFactory(messages);
    +        final SMTPServer server = new 
SMTPServer(smtpMessageHandlerFactory) {
    +
    +            @Override
    +            public SSLSocket createSSLSocket(Socket socket) throws 
IOException {
    +                InetSocketAddress remoteAddress = (InetSocketAddress) 
socket.getRemoteSocketAddress();
    +
    +                SSLSocketFactory socketFactory = 
finalSslContext.getSocketFactory();
    +
    +                SSLSocket s = (SSLSocket) 
(socketFactory.createSocket(socket, remoteAddress.getHostName(), 
socket.getPort(), true));
    +
    +                s.setUseClientMode(false);
    +
    +
    +                // For some reason the createSSLContext above is not 
enough to enforce
    +                // client side auth
    +                // If client auth is required...
    +                if 
(SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue()))
 {
    +                    s.setNeedClientAuth(true);
    +                }
    +
    +
    +                return s;
    +            }
    +        };
    +
    +
    +        // Set some parameters to our server
    +        server.setSoftwareName("Apache NiFi");
    +
    +
    +        // Set the Server options based on properties
    +        server.setPort(context.getProperty(SMTP_PORT).asInteger());
    +        server.setHostName(context.getProperty(SMTP_HOSTNAME).getValue());
    +        
server.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue());
    +        
server.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger());
    +        
server.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
    +
    +
    +        // Check if TLS should be enabled
    +        if (sslContextService != null) {
    +            server.setEnableTLS(true);
    +        } else {
    +            server.setHideTLS(true);
    +        }
    +
    +        // Set TLS to required in case CLIENT_AUTH = required
    +        if 
(SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue()))
 {
    +            server.setRequireTLS(true);
    +        }
    +
    +        this.server = server;
    +        server.start();
    +
    +        getLogger().info("Server started and listening on port " + 
server.getPort());
    +
    +        initialized.set(true);
    +    }
    +
    +    @OnStopped
    +    public void shutdown() throws Exception {
    --- End diff --
    
    Going along with the comment on line 330, when the processor is stopped 
there is the potential for flowfiles to still be in the messages queue. If 
anything happens to the processor (it is removed) those messages will be lost 
with no record of ever receiving them. 
    
    The way ConsumeMQTT and ConsumeWindowsEventLog handles this is by storing a 
SessionFactory and attempting to finish transferring the messages once the 
onTriggers finish.
    
    [1] 
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java#L235


> Create ListenSMTP & ExtractEmailAttachment processors
> -----------------------------------------------------
>
>                 Key: NIFI-1899
>                 URL: https://issues.apache.org/jira/browse/NIFI-1899
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Andre
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to