[ 
https://issues.apache.org/jira/browse/NIFI-2519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420303#comment-15420303
 ] 

ASF GitHub Bot commented on NIFI-2519:
--------------------------------------

Github user trixpan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/856#discussion_r74700141
  
    --- Diff: 
nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/SmtpConsumer.java
 ---
    @@ -0,0 +1,161 @@
    +/*
    + *  Licensed to the Apache Software Foundation (ASF) under one or more
    + *  contributor license agreements.  See the NOTICE file distributed with
    + *  this work for additional information regarding copyright ownership.
    + *  The ASF licenses this file to You under the Apache License, Version 2.0
    + *  (the "License"); you may not use this file except in compliance with
    + *  the License.  You may obtain a copy of the License at
    + *
    + *  http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + */
    +package org.apache.nifi.processors.email.smtp;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.security.cert.Certificate;
    +import java.security.cert.X509Certificate;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import org.apache.commons.io.IOUtils;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.exception.FlowFileAccessException;
    +import org.apache.nifi.processors.email.ListenSMTP;
    +import org.apache.nifi.stream.io.LimitingInputStream;
    +import org.apache.nifi.util.StopWatch;
    +
    +import org.subethamail.smtp.MessageContext;
    +import org.subethamail.smtp.MessageHandler;
    +import org.subethamail.smtp.RejectException;
    +import org.subethamail.smtp.TooMuchDataException;
    +import org.subethamail.smtp.server.SMTPServer;
    +
    +/**
    + * A simple consumer that provides a bridge between 'push' message 
distribution
    + * provided by {@link SMTPServer} and NiFi polling scheduler mechanism.
    + */
    +public class SmtpConsumer implements MessageHandler {
    +
    +    private String from = null;
    +    private final List<String> recipientList = new ArrayList<>();
    +    private final MessageContext context;
    +    private final ProcessSessionFactory sessionFactory;
    +    private final int port;
    +    private final int maxMessageSize;
    +    private final ComponentLog log;
    +    private final String host;
    +
    +    public SmtpConsumer(
    +            final MessageContext context,
    +            final ProcessSessionFactory sessionFactory,
    +            final int port,
    +            final String host,
    +            final ComponentLog log,
    +            final int maxMessageSize
    +    ) {
    +        this.context = context;
    +        this.sessionFactory = sessionFactory;
    +        this.port = port;
    +        if (host == null || host.trim().isEmpty()) {
    +            this.host = context.getSMTPServer().getHostName();
    +        } else {
    +            this.host = host;
    +        }
    +        this.log = log;
    +        this.maxMessageSize = maxMessageSize;
    +    }
    +
    +    String getFrom() {
    +        return from;
    +    }
    +
    +    List<String> getRecipients() {
    +        return Collections.unmodifiableList(recipientList);
    +    }
    +
    +    @Override
    +    public void data(final InputStream data) throws RejectException, 
TooMuchDataException, IOException {
    +        final ProcessSession processSession = 
sessionFactory.createSession();
    +        final StopWatch watch = new StopWatch();
    +        watch.start();
    +        try {
    +            FlowFile flowFile = processSession.create();
    +            final AtomicBoolean limitExceeded = new AtomicBoolean(false);
    +            flowFile = processSession.write(flowFile, (OutputStream out) 
-> {
    +                final LimitingInputStream lis = new 
LimitingInputStream(data, maxMessageSize);
    +                IOUtils.copy(lis, out);
    +                if (lis.hasReachedLimit()) {
    +                    limitExceeded.set(true);
    +                }
    +            });
    +            if (limitExceeded.get()) {
    +                throw new TooMuchDataException("Maximum message size limit 
reached - client must send smaller messages");
    +            }
    +            flowFile = processSession.putAllAttributes(flowFile, 
extractMessageAttributes());
    +            watch.stop();
    +            processSession.getProvenanceReporter().receive(flowFile, 
"smtp://" + host + ":" + port + "/", watch.getDuration(TimeUnit.MILLISECONDS));
    +            processSession.transfer(flowFile, ListenSMTP.REL_SUCCESS);
    +            processSession.commit();
    +        } catch (FlowFileAccessException | IllegalStateException | 
RejectException | IOException ex) {
    +            log.error("Unable to fully process input due to " + 
ex.getMessage(), ex);
    +            throw ex;
    +        } finally {
    +            processSession.rollback(); //make sure this happens no matter 
what - is safe
    --- End diff --
    
    @joewitt fascinating refactor. 
    
    Would you mind explaining why the rollback is used and why it doesn't 
affect the commit above?


> TestListenSMTP ValidEmail fails during parallel build
> -----------------------------------------------------
>
>                 Key: NIFI-2519
>                 URL: https://issues.apache.org/jira/browse/NIFI-2519
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>            Reporter: Joseph Witt
>            Assignee: Oleg Zhurakousky
>             Fix For: 1.0.0
>
>         Attachments: 
> 0001-NIFI-2519-fixed-max-connections-and-max-size-enforce.patch
>
>
> While running a full NiFi parallel build received the following. So there is 
> some test issue at least that is impacting build stability.
> [INFO] --- maven-compiler-plugin:3.2:testCompile (default-testCompile) @ 
> nifi-email-processors ---
> [INFO] Changes detected - recompiling the module!
> [INFO] Compiling 4 source files to 
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/target/test-classes
> [WARNING] 
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[122,24]
>  [deprecation] stop() in Thread has been deprecated
> [WARNING] 
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[186,24]
>  [deprecation] stop() in Thread has been deprecated
> [WARNING] 
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[307,24]
>  [deprecation] stop() in Thread has been deprecated
> [INFO] 
> [INFO] --- maven-compiler-plugin:3.2:testCompile (groovy-tests) @ 
> nifi-email-processors ---
> [INFO] Changes detected - recompiling the module!
> [INFO] Nothing to compile - all classes are up to date
> [INFO] 
> [INFO] --- maven-surefire-plugin:2.18:test (default-test) @ 
> nifi-email-processors ---
> [INFO] Surefire report directory: 
> /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/target/surefire-reports
> [INFO] Using configured provider 
> org.apache.maven.surefire.junit4.JUnit4Provider
> -------------------------------------------------------
>  T E S T S
> -------------------------------------------------------
> Running org.apache.nifi.processors.email.TestListenSMTP
> Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.473 sec <<< 
> FAILURE! - in org.apache.nifi.processors.email.TestListenSMTP
> ValidEmail(org.apache.nifi.processors.email.TestListenSMTP)  Time elapsed: 
> 0.038 sec  <<< FAILURE!
> java.lang.AssertionError: Sending email failed
>       at org.junit.Assert.fail(Assert.java:88)
>       at org.junit.Assert.assertTrue(Assert.java:41)
>       at org.junit.Assert.assertFalse(Assert.java:64)
>       at 
> org.apache.nifi.processors.email.TestListenSMTP.ValidEmail(TestListenSMTP.java:188)
> Running org.apache.nifi.processors.email.TestExtractEmailAttachments
> Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.1 sec - in 
> org.apache.nifi.processors.email.TestExtractEmailAttachments
> Running org.apache.nifi.processors.email.TestExtractEmailHeaders
> Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.029 sec - 
> in org.apache.nifi.processors.email.TestExtractEmailHeaders
> Results :
> Failed tests: 
>   TestListenSMTP.ValidEmail:188 Sending email failed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to