[ 
https://issues.apache.org/jira/browse/NIFI-2519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15416383#comment-15416383
 ] 

ASF GitHub Bot commented on NIFI-2519:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/827#discussion_r74361967
  
    --- Diff: 
nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java
 ---
    @@ -166,317 +108,158 @@
                 .identifiesControllerService(SSLContextService.class)
                 .build();
     
    -    public static final PropertyDescriptor CLIENT_AUTH = new 
PropertyDescriptor.Builder()
    +    static final PropertyDescriptor CLIENT_AUTH = new 
PropertyDescriptor.Builder()
                 .name("CLIENT_AUTH")
                 .displayName("Client Auth")
                 .description("The client authentication policy to use for the 
SSL Context. Only used if an SSL Context Service is provided.")
                 .required(false)
                 .allowableValues(SSLContextService.ClientAuth.NONE.toString(), 
SSLContextService.ClientAuth.REQUIRED.toString())
                 .build();
     
    -    @Override
    -    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
    -        final List<ValidationResult> results = new ArrayList<>();
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All new messages will be routed as FlowFiles to 
this relationship")
    +            .build();
     
    -        final String clientAuth = 
validationContext.getProperty(CLIENT_AUTH).getValue();
    -        final SSLContextService sslContextService = 
validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    +    private final static List<PropertyDescriptor> propertyDescriptors;
     
    -        if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
    -            results.add(new ValidationResult.Builder()
    -                    .explanation("Client Auth must be provided when using 
TLS/SSL")
    -                    .valid(false).subject("Client Auth").build());
    -        }
    +    private final static Set<Relationship> relationships;
     
    -        return results;
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(SMTP_PORT);
    +        _propertyDescriptors.add(SMTP_MAXIMUM_CONNECTIONS);
    +        _propertyDescriptors.add(SMTP_TIMEOUT);
    +        _propertyDescriptors.add(SMTP_MAXIMUM_MSG_SIZE);
    +        _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
    +        _propertyDescriptors.add(CLIENT_AUTH);
    +        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
     
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        relationships = Collections.unmodifiableSet(_relationships);
         }
     
    +    private volatile SMTPServer smtp;
     
    -    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    -            .name("success")
    -            .description("Extraction was successful")
    -            .build();
    +    private volatile SmtpConsumer smtpConsumer;
     
    -    private Set<Relationship> relationships;
    -    private List<PropertyDescriptor> propertyDescriptors;
    -    private volatile LinkedBlockingQueue<SmtpEvent> incomingMessages;
    +    /**
    +     *
    +     */
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory) throws ProcessException {
    +        ProcessSession processSession = sessionFactory.createSession();
    +        if (this.smtp == null) {
    +            this.setupSmtpIfNecessary(context, processSession);
    +        }
    +
    +        if (this.smtpConsumer.hasMessage()) {
    +            try {
    +                /*
    +                 * Will consume incoming message directly from the wire 
and into
    +                 * FlowFile/Content repository before exiting. This 
essentially
    +                 * limits any potential data loss by allowing SMTPServer 
thread
    +                 * to actually commit NiFi session if all good. However in 
the
    +                 * event of exception, such exception will be propagated 
back to
    +                 * the email sender via "undeliverable message" allowing 
such
    +                 * user to re-send the message
    +                 */
    +                this.smtpConsumer.consumeUsing((inputDataStream) -> {
    +                    FlowFile flowFile = processSession.create();
    +                    AtomicInteger size = new AtomicInteger();
    +                    flowFile = processSession.write(flowFile, new 
OutputStreamCallback() {
    +                        @Override
    +                        public void process(final OutputStream out) throws 
IOException {
    +                            size.set(IOUtils.copy(inputDataStream, out));
    +                        }
    +                    });
    +                    
processSession.getProvenanceReporter().receive(flowFile, "smtp://"
    +                            + ListenSMTP.this.smtp.getHostName() + ":" + 
ListenSMTP.this.smtp.getPort() + "/");
    +                    processSession.transfer(flowFile, REL_SUCCESS);
    +                    processSession.commit();
    +                    return size.get();
    +                });
    +            } catch (Exception e) {
    +                this.getLogger().error("Failed while listenning for 
messages.", e);
    +                processSession.rollback();
    +            }
    +        } else {
    +            context.yield();
    +        }
    +    }
     
    -    private volatile SMTPServer server;
    -    private AtomicBoolean initialized = new AtomicBoolean(false);
    -    private AtomicBoolean stopping = new AtomicBoolean(false);
    +    /**
    +     *
    +     */
    +    @OnStopped
    +    public void close() {
    +        this.getLogger().info("Stopping SMTPServer");
    +        this.smtp.stop();
    +        this.smtp = null;
    +        this.getLogger().info("SMTPServer stopped");
    +    }
     
    +    /**
    +     *
    +     */
         @Override
         public Set<Relationship> getRelationships() {
             return relationships;
         }
     
    +    /**
    +    *
    +    */
         @Override
         protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
             return propertyDescriptors;
         }
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final Set<Relationship> relationships = new HashSet<>();
    -        relationships.add(REL_SUCCESS);
    -        this.relationships = Collections.unmodifiableSet(relationships);
    -
    -        final List<PropertyDescriptor> props = new ArrayList<>();
    -        props.add(SMTP_PORT);
    -        props.add(SMTP_HOSTNAME);
    -        props.add(SMTP_MAXIMUM_CONNECTIONS);
    -        props.add(SMTP_TIMEOUT);
    -        props.add(SMTP_MAXIMUM_MSG_SIZE);
    -        props.add(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE);
    -        props.add(SSL_CONTEXT_SERVICE);
    -        props.add(CLIENT_AUTH);
    -        this.propertyDescriptors = Collections.unmodifiableList(props);
    -
    -    }
    -
    -    // Upon Schedule, reset the initialized state to false
    -    @OnScheduled
    -    public void onScheduled(ProcessContext context) {
    -        initialized.set(false);
    -        stopping.set(false);
    -    }
    -
    -    protected synchronized void initializeSMTPServer(final ProcessContext 
context) throws Exception {
    -
    -        // check if we are already running or if it is stopping
    -        if (initialized.get() && server.isRunning() || stopping.get() ) {
    -            return;
    -        }
    -
    -        incomingMessages = new 
LinkedBlockingQueue<>(context.getProperty(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE).asInteger());
    -
    -        String clientAuth = null;
    -
    -        // If an SSLContextService was provided then create an SSLContext 
to pass down to the server
    -        SSLContext sslContext = null;
    -        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    -        if (sslContextService != null) {
    -            clientAuth = context.getProperty(CLIENT_AUTH).getValue();
    -            sslContext = 
sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
    +    /**
    +     *
    +     */
    +    private synchronized void setupSmtpIfNecessary(ProcessContext context, 
ProcessSession processSession) {
    +        if (this.smtp == null) {
    +            SmtpConsumer consumer = new SmtpConsumer();
    +            SMTPServer smtpServer = this.createServerInstance(context, 
consumer);
    +            smtpServer.setSoftwareName("Apache NiFi");
    +            smtpServer.setPort(context.getProperty(SMTP_PORT).asInteger());
    +            
smtpServer.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue());
    +            
smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
    +
    +            this.smtpConsumer = consumer;
    +            this.smtp = smtpServer;
    +            this.smtp.start();
             }
    +    }
     
    -        final SSLContext finalSslContext = sslContext;
    -
    -        SMTPMessageHandlerFactory smtpMessageHandlerFactory = new 
SMTPMessageHandlerFactory(incomingMessages, getLogger());
    -        final SMTPServer server = new 
SMTPServer(smtpMessageHandlerFactory) {
    -
    +    /**
    +     *
    +     */
    +    private SMTPServer createServerInstance(ProcessContext context, 
SmtpConsumer consumer) {
    +        SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    +        SMTPServer smtpServer = sslContextService == null ? new 
SMTPServer(consumer) : new SMTPServer(consumer) {
                 @Override
                 public SSLSocket createSSLSocket(Socket socket) throws 
IOException {
                     InetSocketAddress remoteAddress = (InetSocketAddress) 
socket.getRemoteSocketAddress();
    -
    -                SSLSocketFactory socketFactory = 
finalSslContext.getSocketFactory();
    -
    -                SSLSocket s = (SSLSocket) 
(socketFactory.createSocket(socket, remoteAddress.getHostName(), 
socket.getPort(), true));
    -
    -                s.setUseClientMode(false);
    -
    -
    -                // For some reason the createSSLContext above is not 
enough to enforce
    -                // client side auth
    -                // If client auth is required...
    -                if 
(SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue()))
 {
    -                    s.setNeedClientAuth(true);
    +                String clientAuth = 
context.getProperty(CLIENT_AUTH).getValue();
    +                SSLContext sslContext = 
sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
    +                SSLSocketFactory socketFactory = 
sslContext.getSocketFactory();
    +                SSLSocket sslSocket = (SSLSocket) 
(socketFactory.createSocket(socket, 
remoteAddress.getHostName(),socket.getPort(), true));
    +                sslSocket.setUseClientMode(false);
    +
    +                if 
(SSLContextService.ClientAuth.REQUIRED.toString().equals(clientAuth)) {
    +                    this.setRequireTLS(true);
    +                    sslSocket.setNeedClientAuth(true);
                     }
     
    -
    -                return s;
    +                return sslSocket;
                 }
             };
    -
    -        // Set some parameters to our server
    -        server.setSoftwareName("Apache NiFi");
    -
    -
    -        // Set the Server options based on properties
    -        server.setPort(context.getProperty(SMTP_PORT).asInteger());
    -        server.setHostName(context.getProperty(SMTP_HOSTNAME).getValue());
    -        
server.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue());
    -        
server.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger());
    -        
server.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
    -
    -
    -        // Check if TLS should be enabled
             if (sslContextService != null) {
    -            server.setEnableTLS(true);
    -        } else {
    -            server.setHideTLS(true);
    --- End diff --
    
    Fair enough. Thanks for heads up.


> TestListenSMTP ValidEmail fails during parallel build
> -----------------------------------------------------
>
>                 Key: NIFI-2519
>                 URL: https://issues.apache.org/jira/browse/NIFI-2519
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>            Reporter: Joseph Witt
>            Assignee: Oleg Zhurakousky
>             Fix For: 1.0.0
>
>
> While running a full NiFi parallel build received the following. So there is 
> some test issue at least that is impacting build stability.
> [INFO] --- maven-compiler-plugin:3.2:testCompile (default-testCompile) @ 
> nifi-email-processors ---
> [INFO] Changes detected - recompiling the module!
> [INFO] Compiling 4 source files to 
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/target/test-classes
> [WARNING] 
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[122,24]
>  [deprecation] stop() in Thread has been deprecated
> [WARNING] 
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[186,24]
>  [deprecation] stop() in Thread has been deprecated
> [WARNING] 
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[307,24]
>  [deprecation] stop() in Thread has been deprecated
> [INFO] 
> [INFO] --- maven-compiler-plugin:3.2:testCompile (groovy-tests) @ 
> nifi-email-processors ---
> [INFO] Changes detected - recompiling the module!
> [INFO] Nothing to compile - all classes are up to date
> [INFO] 
> [INFO] --- maven-surefire-plugin:2.18:test (default-test) @ 
> nifi-email-processors ---
> [INFO] Surefire report directory: 
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/target/surefire-reports
> [INFO] Using configured provider 
> org.apache.maven.surefire.junit4.JUnit4Provider
> -------------------------------------------------------
>  T E S T S
> -------------------------------------------------------
> Running org.apache.nifi.processors.email.TestListenSMTP
> Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.473 sec <<< 
> FAILURE! - in org.apache.nifi.processors.email.TestListenSMTP
> ValidEmail(org.apache.nifi.processors.email.TestListenSMTP)  Time elapsed: 
> 0.038 sec  <<< FAILURE!
> java.lang.AssertionError: Sending email failed
>       at org.junit.Assert.fail(Assert.java:88)
>       at org.junit.Assert.assertTrue(Assert.java:41)
>       at org.junit.Assert.assertFalse(Assert.java:64)
>       at 
> org.apache.nifi.processors.email.TestListenSMTP.ValidEmail(TestListenSMTP.java:188)
> Running org.apache.nifi.processors.email.TestExtractEmailAttachments
> Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.1 sec - in 
> org.apache.nifi.processors.email.TestExtractEmailAttachments
> Running org.apache.nifi.processors.email.TestExtractEmailHeaders
> Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.029 sec - 
> in org.apache.nifi.processors.email.TestExtractEmailHeaders
> Results :
> Failed tests: 
>   TestListenSMTP.ValidEmail:188 Sending email failed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to