[ https://issues.apache.org/jira/browse/NIFI-2519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420303#comment-15420303 ]
ASF GitHub Bot commented on NIFI-2519: -------------------------------------- Github user trixpan commented on a diff in the pull request: https://github.com/apache/nifi/pull/856#discussion_r74700141 --- Diff: nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/SmtpConsumer.java --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.email.smtp; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.security.cert.Certificate; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.FlowFileAccessException; +import org.apache.nifi.processors.email.ListenSMTP; +import org.apache.nifi.stream.io.LimitingInputStream; +import org.apache.nifi.util.StopWatch; + +import org.subethamail.smtp.MessageContext; +import org.subethamail.smtp.MessageHandler; +import org.subethamail.smtp.RejectException; +import org.subethamail.smtp.TooMuchDataException; +import org.subethamail.smtp.server.SMTPServer; + +/** + * A simple consumer that provides a bridge between 'push' message distribution + * provided by {@link SMTPServer} and NiFi polling scheduler mechanism. + */ +public class SmtpConsumer implements MessageHandler { + + private String from = null; + private final List<String> recipientList = new ArrayList<>(); + private final MessageContext context; + private final ProcessSessionFactory sessionFactory; + private final int port; + private final int maxMessageSize; + private final ComponentLog log; + private final String host; + + public SmtpConsumer( + final MessageContext context, + final ProcessSessionFactory sessionFactory, + final int port, + final String host, + final ComponentLog log, + final int maxMessageSize + ) { + this.context = context; + this.sessionFactory = sessionFactory; + this.port = port; + if (host == null || host.trim().isEmpty()) { + this.host = context.getSMTPServer().getHostName(); + } else { + this.host = host; + } + this.log = log; + this.maxMessageSize = maxMessageSize; + } + + String getFrom() { + return from; + } + + List<String> getRecipients() { + return Collections.unmodifiableList(recipientList); + } + + @Override + public void data(final InputStream data) throws RejectException, TooMuchDataException, IOException { + final ProcessSession processSession = sessionFactory.createSession(); + final StopWatch watch = new StopWatch(); + watch.start(); + try { + FlowFile flowFile = processSession.create(); + final AtomicBoolean limitExceeded = new AtomicBoolean(false); + flowFile = processSession.write(flowFile, (OutputStream out) -> { + final LimitingInputStream lis = new LimitingInputStream(data, maxMessageSize); + IOUtils.copy(lis, out); + if (lis.hasReachedLimit()) { + limitExceeded.set(true); + } + }); + if (limitExceeded.get()) { + throw new TooMuchDataException("Maximum message size limit reached - client must send smaller messages"); + } + flowFile = processSession.putAllAttributes(flowFile, extractMessageAttributes()); + watch.stop(); + processSession.getProvenanceReporter().receive(flowFile, "smtp://" + host + ":" + port + "/", watch.getDuration(TimeUnit.MILLISECONDS)); + processSession.transfer(flowFile, ListenSMTP.REL_SUCCESS); + processSession.commit(); + } catch (FlowFileAccessException | IllegalStateException | RejectException | IOException ex) { + log.error("Unable to fully process input due to " + ex.getMessage(), ex); + throw ex; + } finally { + processSession.rollback(); //make sure this happens no matter what - is safe --- End diff -- @joewitt fascinating refactor. Would you mind explaining why the rollback is used and why it doesn't affect the commit above? > TestListenSMTP ValidEmail fails during parallel build > ----------------------------------------------------- > > Key: NIFI-2519 > URL: https://issues.apache.org/jira/browse/NIFI-2519 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions > Reporter: Joseph Witt > Assignee: Oleg Zhurakousky > Fix For: 1.0.0 > > Attachments: > 0001-NIFI-2519-fixed-max-connections-and-max-size-enforce.patch > > > While running a full NiFi parallel build received the following. So there is > some test issue at least that is impacting build stability. > [INFO] --- maven-compiler-plugin:3.2:testCompile (default-testCompile) @ > nifi-email-processors --- > [INFO] Changes detected - recompiling the module! > [INFO] Compiling 4 source files to > /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/target/test-classes > [WARNING] > /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[122,24] > [deprecation] stop() in Thread has been deprecated > [WARNING] > /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[186,24] > [deprecation] stop() in Thread has been deprecated > [WARNING] > /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[307,24] > [deprecation] stop() in Thread has been deprecated > [INFO] > [INFO] --- maven-compiler-plugin:3.2:testCompile (groovy-tests) @ > nifi-email-processors --- > [INFO] Changes detected - recompiling the module! > [INFO] Nothing to compile - all classes are up to date > [INFO] > [INFO] --- maven-surefire-plugin:2.18:test (default-test) @ > nifi-email-processors --- > [INFO] Surefire report directory: > /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/target/surefire-reports > [INFO] Using configured provider > org.apache.maven.surefire.junit4.JUnit4Provider > ------------------------------------------------------- > T E S T S > ------------------------------------------------------- > Running org.apache.nifi.processors.email.TestListenSMTP > Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.473 sec <<< > FAILURE! - in org.apache.nifi.processors.email.TestListenSMTP > ValidEmail(org.apache.nifi.processors.email.TestListenSMTP) Time elapsed: > 0.038 sec <<< FAILURE! > java.lang.AssertionError: Sending email failed > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.junit.Assert.assertFalse(Assert.java:64) > at > org.apache.nifi.processors.email.TestListenSMTP.ValidEmail(TestListenSMTP.java:188) > Running org.apache.nifi.processors.email.TestExtractEmailAttachments > Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.1 sec - in > org.apache.nifi.processors.email.TestExtractEmailAttachments > Running org.apache.nifi.processors.email.TestExtractEmailHeaders > Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.029 sec - > in org.apache.nifi.processors.email.TestExtractEmailHeaders > Results : > Failed tests: > TestListenSMTP.ValidEmail:188 Sending email failed -- This message was sent by Atlassian JIRA (v6.3.4#6332)