This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
commit ff4c187f860e332640bd1a10d09fd435d286e3f1 Author: exceptionfactory <exceptionfact...@apache.org> AuthorDate: Mon Feb 19 20:18:09 2024 -0600 NIFI-12820 Upgraded Email Processors to Jakarta Mail 2 This closes #8428. - Upgraded from Java Mail 1.4.7 to Jakarta Mail API 2.1.2 - Upgraded Spring Integration from 5.5.20 to 6.2.1 - Upgraded SubEtha SMTP from 3.1.7 to 7.0.1 - Upgraded Greenmail from 1.6.15 to 2.0.1 - Removed usage of Commons Lang3 - Removed usage of Commons IO Signed-off-by: Joseph Witt <joew...@apache.org> --- .../nifi-email-processors/pom.xml | 102 ++++++------ .../processors/email/AbstractEmailProcessor.java | 51 ++---- .../apache/nifi/processors/email/ConsumeIMAP.java | 18 +- .../apache/nifi/processors/email/ConsumePOP3.java | 21 +-- .../processors/email/ExtractEmailAttachments.java | 183 ++++++++++----------- .../nifi/processors/email/ExtractEmailHeaders.java | 168 +++++++++---------- .../processors/email/ExtractTNEFAttachments.java | 133 +++++---------- .../apache/nifi/processors/email/ListenSMTP.java | 142 +++++----------- .../nifi/processors/email/smtp/SmtpConsumer.java | 39 ++--- .../nifi/processors/email/GenerateAttachment.java | 131 +++++++-------- ...TestConsumeEmail.java => TestConsumeEmail.java} | 36 +--- .../email/TestExtractEmailAttachments.java | 33 ++-- .../processors/email/TestExtractEmailHeaders.java | 72 ++------ .../nifi/processors/email/TestListenSMTP.java | 12 +- pom.xml | 1 - 15 files changed, 443 insertions(+), 699 deletions(-) diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml index ddb94f1b2b..c6a4930dbb 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml @@ -25,7 +25,7 @@ <artifactId>nifi-email-processors</artifactId> <packaging>jar</packaging> <properties> - <spring.integration.version>5.5.20</spring.integration.version> + <spring.integration.version>6.2.1</spring.integration.version> <poi.version>5.2.5</poi.version> </properties> @@ -37,103 +37,95 @@ <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-utils</artifactId> - <version>2.0.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> - <artifactId>nifi-security-cert</artifactId> - <version>2.0.0-SNAPSHOT</version> + <artifactId>nifi-ssl-context-service-api</artifactId> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-oauth2-provider-api</artifactId> - <version>2.0.0-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>javax.mail</groupId> - <artifactId>mail</artifactId> - <version>1.4.7</version> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-email</artifactId> - <version>1.6.0</version> - <exclusions> - <exclusion> - <groupId>com.sun.mail</groupId> - <artifactId>javax.mail</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> </dependency> <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> + <groupId>jakarta.mail</groupId> + <artifactId>jakarta.mail-api</artifactId> + <version>2.1.2</version> </dependency> <dependency> - <groupId>com.sun.mail</groupId> - <artifactId>javax.mail</artifactId> - <version>1.6.2</version> + <groupId>org.eclipse.angus</groupId> + <artifactId>angus-mail</artifactId> + <version>2.0.2</version> </dependency> <dependency> - <groupId>org.subethamail</groupId> + <groupId>com.github.davidmoten</groupId> <artifactId>subethasmtp</artifactId> - <version>3.1.7</version> - <exclusions> - <exclusion> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - </exclusion> - <exclusion> - <groupId>javax.mail</groupId> - <artifactId>mail</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.github.stephenc.findbugs</groupId> - <artifactId>findbugs-annotations</artifactId> - <version>1.3.9-1</version> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-ssl-context-service-api</artifactId> + <version>7.0.1</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mail</artifactId> <version>${spring.integration.version}</version> <exclusions> + <!-- Exclude Spring libraries not required for IMAP and POP3 --> <exclusion> <groupId>org.springframework.retry</groupId> <artifactId>spring-retry</artifactId> </exclusion> + <exclusion> + <groupId>org.springframework</groupId> + <artifactId>spring-aop</artifactId> + </exclusion> + <exclusion> + <groupId>org.springframework</groupId> + <artifactId>spring-tx</artifactId> + </exclusion> + <exclusion> + <groupId>org.springframework</groupId> + <artifactId>spring-context-support</artifactId> + </exclusion> + <exclusion> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-core</artifactId> + </exclusion> + <exclusion> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-observation</artifactId> + </exclusion> </exclusions> </dependency> + <!-- poi-scratchpad required for TNEF parsing --> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-scratchpad</artifactId> <version>${poi.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-collections4</artifactId> + </exclusion> + <exclusion> + <groupId>com.zaxxer</groupId> + <artifactId>SparseBitSet</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> - <version>2.0.0-SNAPSHOT</version> - <scope>test</scope> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-security-utils</artifactId> - <version>2.0.0-SNAPSHOT</version> <scope>test</scope> </dependency> <dependency> <groupId>com.icegreen</groupId> <artifactId>greenmail</artifactId> - <version>1.6.15</version> + <version>2.0.1</version> <scope>test</scope> </dependency> </dependencies> diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java index 31a013abff..593fbcbfe7 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java @@ -33,14 +33,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.support.StaticListableBeanFactory; import org.springframework.integration.mail.AbstractMailReceiver; -import org.springframework.util.Assert; -import javax.mail.Address; -import javax.mail.Message; -import javax.mail.MessagingException; +import jakarta.mail.Address; +import jakarta.mail.Message; +import jakarta.mail.MessagingException; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -224,17 +223,11 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab } } - /** - * - */ @Override public Set<Relationship> getRelationships() { return SHARED_RELATIONSHIPS; } - /** - * - */ @Override public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException { this.initializeIfNecessary(context, processSession); @@ -245,9 +238,6 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab } } - /** - * - */ @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() @@ -278,26 +268,17 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab String port = processContext.getProperty(PORT).evaluateAttributeExpressions().getValue(); String user = processContext.getProperty(USER).evaluateAttributeExpressions().getValue(); - String password = oauth2AccessTokenProviderOptional.map(oauth2AccessTokenProvider -> { - String accessToken = oauth2AccessTokenProvider.getAccessDetails().getAccessToken(); - - return accessToken; - }).orElse(processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue()); + String password = oauth2AccessTokenProviderOptional.map(oauth2AccessTokenProvider -> + oauth2AccessTokenProvider.getAccessDetails().getAccessToken() + ).orElse(processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue()); String folder = processContext.getProperty(FOLDER).evaluateAttributeExpressions().getValue(); StringBuilder urlBuilder = new StringBuilder(); - try { - urlBuilder.append(URLEncoder.encode(user, "UTF-8")); - } catch (UnsupportedEncodingException e) { - throw new ProcessException(e); - } + urlBuilder.append(URLEncoder.encode(user, StandardCharsets.UTF_8)); + urlBuilder.append(":"); - try { - urlBuilder.append(URLEncoder.encode(password, "UTF-8")); - } catch (UnsupportedEncodingException e) { - throw new ProcessException(e); - } + urlBuilder.append(URLEncoder.encode(password, StandardCharsets.UTF_8)); urlBuilder.append("@"); urlBuilder.append(host); urlBuilder.append(":"); @@ -306,15 +287,15 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab urlBuilder.append(folder); String protocol = this.getProtocol(processContext); - String finalUrl = protocol + "://" + urlBuilder.toString(); + String finalUrl = protocol + "://" + urlBuilder; // build display-safe URL int passwordStartIndex = urlBuilder.indexOf(":") + 1; int passwordEndIndex = urlBuilder.indexOf("@"); urlBuilder.replace(passwordStartIndex, passwordEndIndex, "[password]"); - this.displayUrl = protocol + "://" + urlBuilder.toString(); + this.displayUrl = protocol + "://" + urlBuilder; if (this.logger.isInfoEnabled()) { - this.logger.info("Connecting to Email server at the following URL: " + this.displayUrl); + this.logger.info("Connecting to Email server at the following URL: {}", this.displayUrl); } return finalUrl; @@ -388,7 +369,6 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab if (messages != null) { for (Object message : messages) { - Assert.isTrue(message instanceof Message, "Message is not an instance of javax.mail.Message"); this.messageQueue.offer((Message) message); } } @@ -446,10 +426,7 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab } /** - * Will flush the remaining messages when this processor is stopped. The - * flushed messages are disposed via - * {@link #disposeMessage(Message, ProcessContext, ProcessSession)} - * operation + * Will flush the remaining messages when this processor is stopped. */ private void flushRemainingMessages(ProcessContext processContext) { Message emailMessage; diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumeIMAP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumeIMAP.java index c19ef4a102..611fe92309 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumeIMAP.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumeIMAP.java @@ -55,16 +55,12 @@ public class ConsumeIMAP extends AbstractEmailProcessor<ImapMailReceiver> { static final List<PropertyDescriptor> DESCRIPTORS; static { - List<PropertyDescriptor> _descriptors = new ArrayList<>(); - _descriptors.addAll(SHARED_DESCRIPTORS); - _descriptors.add(SHOULD_MARK_READ); - _descriptors.add(USE_SSL); - DESCRIPTORS = Collections.unmodifiableList(_descriptors); + List<PropertyDescriptor> descriptors = new ArrayList<>(SHARED_DESCRIPTORS); + descriptors.add(SHOULD_MARK_READ); + descriptors.add(USE_SSL); + DESCRIPTORS = Collections.unmodifiableList(descriptors); } - /** - * - */ @Override protected ImapMailReceiver buildMessageReceiver(ProcessContext processContext) { ImapMailReceiver receiver = new ImapMailReceiver(this.buildUrl(processContext)); @@ -74,17 +70,11 @@ public class ConsumeIMAP extends AbstractEmailProcessor<ImapMailReceiver> { return receiver; } - /** - * - */ @Override protected String getProtocol(ProcessContext processContext) { return processContext.getProperty(USE_SSL).asBoolean() ? "imaps" : "imap"; } - /** - * - */ @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return DESCRIPTORS; diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumePOP3.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumePOP3.java index 7211325e84..83a97ceda6 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumePOP3.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumePOP3.java @@ -16,8 +16,6 @@ */ package org.apache.nifi.processors.email; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -34,25 +32,11 @@ import org.springframework.integration.mail.Pop3MailReceiver; @Tags({ "Email", "POP3", "Get", "Ingest", "Ingress", "Message", "Consume" }) public class ConsumePOP3 extends AbstractEmailProcessor<Pop3MailReceiver> { - static final List<PropertyDescriptor> DESCRIPTORS; - - static { - List<PropertyDescriptor> _descriptors = new ArrayList<>(); - _descriptors.addAll(SHARED_DESCRIPTORS); - DESCRIPTORS = Collections.unmodifiableList(_descriptors); - } - - /** - * - */ @Override protected String getProtocol(ProcessContext processContext) { return "pop3"; } - /** - * - */ @Override protected Pop3MailReceiver buildMessageReceiver(ProcessContext context) { final Pop3MailReceiver receiver = new Pop3MailReceiver(this.buildUrl(context)); @@ -60,11 +44,8 @@ public class ConsumePOP3 extends AbstractEmailProcessor<Pop3MailReceiver> { return receiver; } - /** - * - */ @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return DESCRIPTORS; + return SHARED_DESCRIPTORS; } } diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailAttachments.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailAttachments.java index 3b90ab1fb4..ad49205021 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailAttachments.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailAttachments.java @@ -19,23 +19,22 @@ package org.apache.nifi.processors.email; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import javax.activation.DataSource; -import javax.mail.Address; -import javax.mail.MessagingException; -import javax.mail.Session; -import javax.mail.internet.MimeMessage; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.mail.util.MimeMessageParser; +import jakarta.activation.DataSource; +import jakarta.mail.Address; +import jakarta.mail.BodyPart; +import jakarta.mail.MessagingException; +import jakarta.mail.Multipart; +import jakarta.mail.Session; +import jakarta.mail.internet.MimeBodyPart; +import jakarta.mail.internet.MimeMessage; +import jakarta.mail.internet.MimePart; + import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; @@ -44,19 +43,15 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.FlowFileHandlingException; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.io.OutputStreamCallback; - +import org.apache.nifi.stream.io.StreamUtils; @SupportsBatching @SideEffectFree @@ -83,24 +78,12 @@ public class ExtractEmailAttachments extends AbstractProcessor { .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") - .description("Flowfiles that could not be parsed") + .description("FlowFiles that could not be parsed") .build(); - private Set<Relationship> relationships; - private List<PropertyDescriptor> descriptors; + private static final String ATTACHMENT_DISPOSITION = "attachment"; - @Override - protected void init(final ProcessorInitializationContext context) { - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_ATTACHMENTS); - relationships.add(REL_ORIGINAL); - relationships.add(REL_FAILURE); - this.relationships = Collections.unmodifiableSet(relationships); - - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - - this.descriptors = Collections.unmodifiableList(descriptors); - } + private static final Set<Relationship> relationships = Set.of(REL_ATTACHMENTS, REL_ORIGINAL, REL_FAILURE); @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { @@ -115,65 +98,61 @@ public class ExtractEmailAttachments extends AbstractProcessor { final String requireStrictAddresses = "false"; - session.read(originalFlowFile, new InputStreamCallback() { - @Override - public void process(final InputStream rawIn) throws IOException { - try (final InputStream in = new BufferedInputStream(rawIn)) { - Properties props = new Properties(); - props.put("mail.mime.address.strict", requireStrictAddresses); - Session mailSession = Session.getInstance(props); - MimeMessage originalMessage = new MimeMessage(mailSession, in); - MimeMessageParser parser = new MimeMessageParser(originalMessage).parse(); - // RFC-2822 determines that a message must have a "From:" header - // if a message lacks the field, it is flagged as invalid - Address[] from = originalMessage.getFrom(); - if (from == null) { - throw new MessagingException("Message failed RFC-2822 validation: No Sender"); + session.read(originalFlowFile, rawIn -> { + try (final InputStream in = new BufferedInputStream(rawIn)) { + Properties props = new Properties(); + props.put("mail.mime.address.strict", requireStrictAddresses); + Session mailSession = Session.getInstance(props); + MimeMessage originalMessage = new MimeMessage(mailSession, in); + + // RFC-2822 determines that a message must have a "From:" header + // if a message lacks the field, it is flagged as invalid + Address[] from = originalMessage.getFrom(); + if (from == null) { + throw new MessagingException("Message failed RFC-2822 validation: No Sender"); + } + originalFlowFilesList.add(originalFlowFile); + + final String originalFlowFileName = originalFlowFile.getAttribute(CoreAttributes.FILENAME.key()); + try { + final List<DataSource> attachments = new ArrayList<>(); + parseAttachments(attachments, originalMessage, 0); + + for (final DataSource data : attachments) { + FlowFile split = session.create(originalFlowFile); + final Map<String, String> attributes = new HashMap<>(); + final String name = data.getName(); + if (name != null && !name.isBlank()) { + attributes.put(CoreAttributes.FILENAME.key(), name); } - originalFlowFilesList.add(originalFlowFile); - if (parser.hasAttachments()) { - final String originalFlowFileName = originalFlowFile.getAttribute(CoreAttributes.FILENAME.key()); - try { - for (final DataSource data : parser.getAttachmentList()) { - FlowFile split = session.create(originalFlowFile); - final Map<String, String> attributes = new HashMap<>(); - if (StringUtils.isNotBlank(data.getName())) { - attributes.put(CoreAttributes.FILENAME.key(), data.getName()); - } - if (StringUtils.isNotBlank(data.getContentType())) { - attributes.put(CoreAttributes.MIME_TYPE.key(), data.getContentType()); - } - String parentUuid = originalFlowFile.getAttribute(CoreAttributes.UUID.key()); - attributes.put(ATTACHMENT_ORIGINAL_UUID, parentUuid); - attributes.put(ATTACHMENT_ORIGINAL_FILENAME, originalFlowFileName); - split = session.append(split, new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - IOUtils.copy(data.getInputStream(), out); - } - }); - split = session.putAllAttributes(split, attributes); - attachmentsList.add(split); - } - } catch (FlowFileHandlingException e) { - // Something went wrong - // Removing splits that may have been created - session.remove(attachmentsList); - // Removing the original flow from its list - originalFlowFilesList.remove(originalFlowFile); - logger.error("Flowfile {} triggered error {} while processing message removing generated FlowFiles from sessions", new Object[]{originalFlowFile, e}); - invalidFlowFilesList.add(originalFlowFile); - } + final String contentType = data.getContentType(); + if (contentType != null && !contentType.isBlank()) { + attributes.put(CoreAttributes.MIME_TYPE.key(), contentType); } - } catch (Exception e) { - // Another error hit... - // Removing the original flow from its list - originalFlowFilesList.remove(originalFlowFile); - logger.error("Could not parse the flowfile {} as an email, treating as failure", new Object[]{originalFlowFile, e}); - // Message is invalid or triggered an error during parsing - invalidFlowFilesList.add(originalFlowFile); + String parentUuid = originalFlowFile.getAttribute(CoreAttributes.UUID.key()); + attributes.put(ATTACHMENT_ORIGINAL_UUID, parentUuid); + attributes.put(ATTACHMENT_ORIGINAL_FILENAME, originalFlowFileName); + split = session.append(split, out -> StreamUtils.copy(data.getInputStream(), out)); + split = session.putAllAttributes(split, attributes); + attachmentsList.add(split); } + } catch (FlowFileHandlingException e) { + // Something went wrong + // Removing splits that may have been created + session.remove(attachmentsList); + // Removing the original flow from its list + originalFlowFilesList.remove(originalFlowFile); + logger.error("Flowfile {} triggered error {} while processing message removing generated FlowFiles from sessions", originalFlowFile, e); + invalidFlowFilesList.add(originalFlowFile); } + } catch (Exception e) { + // Another error hit... + // Removing the original flow from its list + originalFlowFilesList.remove(originalFlowFile); + logger.error("Could not parse the flowfile {} as an email, treating as failure", originalFlowFile, e); + // Message is invalid or triggered an error during parsing + invalidFlowFilesList.add(originalFlowFile); + } }); session.transfer(attachmentsList, REL_ATTACHMENTS); @@ -184,21 +163,33 @@ public class ExtractEmailAttachments extends AbstractProcessor { session.transfer(originalFlowFilesList, REL_ORIGINAL); if (attachmentsList.size() > 10) { - logger.info("Split {} into {} files", new Object[]{originalFlowFile, attachmentsList.size()}); - } else if (attachmentsList.size() > 1){ - logger.info("Split {} into {} files: {}", new Object[]{originalFlowFile, attachmentsList.size(), attachmentsList}); + logger.info("Split {} into {} files", originalFlowFile, attachmentsList.size()); + } else if (attachmentsList.size() > 1) { + logger.info("Split {} into {} files: {}", originalFlowFile, attachmentsList.size(), attachmentsList); } - } + } @Override public Set<Relationship> getRelationships() { - return this.relationships; + return relationships; } - @Override - public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return descriptors; + private void parseAttachments(final List<DataSource> attachments, final MimePart parentPart, final int depth) throws MessagingException, IOException { + final String disposition = parentPart.getDisposition(); + + final Object parentContent = parentPart.getContent(); + if (parentContent instanceof Multipart multipart) { + final int count = multipart.getCount(); + final int partDepth = depth + 1; + for (int i = 0; i < count; i++) { + final BodyPart bodyPart = multipart.getBodyPart(i); + if (bodyPart instanceof MimeBodyPart mimeBodyPart) { + parseAttachments(attachments, mimeBodyPart, partDepth); + } + } + } else if (ATTACHMENT_DISPOSITION.equalsIgnoreCase(disposition) || depth > 0) { + final DataSource dataSource = parentPart.getDataHandler().getDataSource(); + attachments.add(dataSource); + } } - - } diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailHeaders.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailHeaders.java index ba090f60a1..09b2738ecc 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailHeaders.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailHeaders.java @@ -16,29 +16,29 @@ */ package org.apache.nifi.processors.email; - import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import javax.mail.Address; -import javax.mail.Header; -import javax.mail.Message; -import javax.mail.MessagingException; -import javax.mail.Session; -import javax.mail.internet.MimeMessage; -import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.mail.util.MimeMessageParser; +import java.util.concurrent.atomic.AtomicInteger; +import jakarta.mail.Address; +import jakarta.mail.BodyPart; +import jakarta.mail.Header; +import jakarta.mail.Message; +import jakarta.mail.MessagingException; +import jakarta.mail.Multipart; +import jakarta.mail.Session; +import jakarta.mail.internet.MimeBodyPart; +import jakarta.mail.internet.MimeMessage; +import jakarta.mail.internet.MimePart; + import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; @@ -55,9 +55,7 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; @SupportsBatching @@ -127,22 +125,11 @@ public class ExtractEmailHeaders extends AbstractProcessor { .description("Flowfiles that could not be parsed as a RFC-2822 compliant message") .build(); - private Set<Relationship> relationships; - private List<PropertyDescriptor> descriptors; + private static final String ATTACHMENT_DISPOSITION = "attachment"; - @Override - protected void init(final ProcessorInitializationContext context) { - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - this.relationships = Collections.unmodifiableSet(relationships); + private static final Set<Relationship> relationships = Set.of(REL_SUCCESS, REL_FAILURE); - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - - descriptors.add(CAPTURED_HEADERS); - descriptors.add(STRICT_PARSING); - this.descriptors = Collections.unmodifiableList(descriptors); - } + private static final List<PropertyDescriptor> descriptors = List.of(CAPTURED_HEADERS, STRICT_PARSING); @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { @@ -160,68 +147,64 @@ public class ExtractEmailHeaders extends AbstractProcessor { final List<String> capturedHeadersList = Arrays.asList(context.getProperty(CAPTURED_HEADERS).getValue().toLowerCase().split(":")); final Map<String, String> attributes = new HashMap<>(); - session.read(originalFlowFile, new InputStreamCallback() { - @Override - public void process(final InputStream rawIn) throws IOException { - try (final InputStream in = new BufferedInputStream(rawIn)) { - Properties props = new Properties(); - props.put("mail.mime.address.strict", requireStrictAddresses); - Session mailSession = Session.getInstance(props); - MimeMessage originalMessage = new MimeMessage(mailSession, in); - MimeMessageParser parser = new MimeMessageParser(originalMessage).parse(); - // RFC-2822 determines that a message must have a "From:" header - // if a message lacks the field, it is flagged as invalid - Address[] from = originalMessage.getFrom(); - if (from == null) { - throw new MessagingException("Message failed RFC-2822 validation: No Sender"); - } - if (capturedHeadersList.size() > 0){ - Enumeration headers = originalMessage.getAllHeaders(); - while (headers.hasMoreElements()) { - Header header = (Header) headers.nextElement(); - if (StringUtils.isNotEmpty(header.getValue()) - && capturedHeadersList.contains(header.getName().toLowerCase())) { - attributes.put("email.headers." + header.getName().toLowerCase(), header.getValue()); - } + session.read(originalFlowFile, rawIn -> { + try (final InputStream in = new BufferedInputStream(rawIn)) { + Properties props = new Properties(); + props.put("mail.mime.address.strict", requireStrictAddresses); + Session mailSession = Session.getInstance(props); + MimeMessage originalMessage = new MimeMessage(mailSession, in); + // RFC-2822 determines that a message must have a "From:" header + // if a message lacks the field, it is flagged as invalid + Address[] from = originalMessage.getFrom(); + if (from == null) { + throw new MessagingException("Message failed RFC-2822 validation: No Sender"); + } + if (!capturedHeadersList.isEmpty()) { + Enumeration headers = originalMessage.getAllHeaders(); + while (headers.hasMoreElements()) { + final Header header = (Header) headers.nextElement(); + final String headerValue = header.getValue(); + if (headerValue != null && !headerValue.isBlank() + && capturedHeadersList.contains(header.getName().toLowerCase())) { + attributes.put("email.headers." + header.getName().toLowerCase(), header.getValue()); } } + } - putAddressListInAttributes(attributes, EMAIL_HEADER_TO, originalMessage.getRecipients(Message.RecipientType.TO)); - putAddressListInAttributes(attributes, EMAIL_HEADER_CC, originalMessage.getRecipients(Message.RecipientType.CC)); - putAddressListInAttributes(attributes, EMAIL_HEADER_BCC, originalMessage.getRecipients(Message.RecipientType.BCC)); - putAddressListInAttributes(attributes, EMAIL_HEADER_FROM, originalMessage.getFrom()); // RFC-2822 specifies "From" as mailbox-list + putAddressListInAttributes(attributes, EMAIL_HEADER_TO, originalMessage.getRecipients(Message.RecipientType.TO)); + putAddressListInAttributes(attributes, EMAIL_HEADER_CC, originalMessage.getRecipients(Message.RecipientType.CC)); + putAddressListInAttributes(attributes, EMAIL_HEADER_BCC, originalMessage.getRecipients(Message.RecipientType.BCC)); + putAddressListInAttributes(attributes, EMAIL_HEADER_FROM, originalMessage.getFrom()); // RFC-2822 specifies "From" as mailbox-list - if (StringUtils.isNotEmpty(originalMessage.getMessageID())) { - attributes.put(EMAIL_HEADER_MESSAGE_ID, originalMessage.getMessageID()); - } - if (originalMessage.getReceivedDate() != null) { - attributes.put(EMAIL_HEADER_RECV_DATE, originalMessage.getReceivedDate().toString()); - } - if (originalMessage.getSentDate() != null) { - attributes.put(EMAIL_HEADER_SENT_DATE, originalMessage.getSentDate().toString()); - } - if (StringUtils.isNotEmpty(originalMessage.getSubject())) { - attributes.put(EMAIL_HEADER_SUBJECT, originalMessage.getSubject()); - } - // Zeroes EMAIL_ATTACHMENT_COUNT - attributes.put(EMAIL_ATTACHMENT_COUNT, "0"); - // But insert correct value if attachments are present - if (parser.hasAttachments()) { - attributes.put(EMAIL_ATTACHMENT_COUNT, String.valueOf(parser.getAttachmentList().size())); - } - - } catch (Exception e) { - // Message is invalid or triggered an error during parsing - attributes.clear(); - logger.error("Could not parse the flowfile {} as an email, treating as failure", new Object[]{originalFlowFile, e}); - invalidFlowFilesList.add(originalFlowFile); + final String messageId = originalMessage.getMessageID(); + if (messageId != null && !messageId.isEmpty()) { + attributes.put(EMAIL_HEADER_MESSAGE_ID, originalMessage.getMessageID()); + } + if (originalMessage.getReceivedDate() != null) { + attributes.put(EMAIL_HEADER_RECV_DATE, originalMessage.getReceivedDate().toString()); + } + if (originalMessage.getSentDate() != null) { + attributes.put(EMAIL_HEADER_SENT_DATE, originalMessage.getSentDate().toString()); } + final String subject = originalMessage.getSubject(); + if (subject != null && !subject.isEmpty()) { + attributes.put(EMAIL_HEADER_SUBJECT, subject); + } + + final AtomicInteger attachmentsCounter = new AtomicInteger(); + countAttachments(attachmentsCounter, originalMessage, 0); + attributes.put(EMAIL_ATTACHMENT_COUNT, Integer.toString(attachmentsCounter.get())); + } catch (Exception e) { + // Message is invalid or triggered an error during parsing + attributes.clear(); + logger.error("Could not parse the flowfile {} as an email, treating as failure", originalFlowFile, e); + invalidFlowFilesList.add(originalFlowFile); } }); - if (attributes.size() > 0) { + if (!attributes.isEmpty()) { FlowFile updatedFlowFile = session.putAllAttributes(originalFlowFile, attributes); - logger.info("Extracted {} headers into {} file", new Object[]{attributes.size(), updatedFlowFile}); + logger.info("Extracted {} headers into {} file", attributes.size(), updatedFlowFile); processedFlowFilesList.add(updatedFlowFile); } @@ -232,7 +215,7 @@ public class ExtractEmailHeaders extends AbstractProcessor { @Override public Set<Relationship> getRelationships() { - return this.relationships; + return relationships; } @Override @@ -245,9 +228,28 @@ public class ExtractEmailHeaders extends AbstractProcessor { final String attributePrefix, Address[] addresses) { if (addresses != null) { - for (int count = 0; count < ArrayUtils.getLength(addresses); count++) { + for (int count = 0; count < addresses.length; count++) { attributes.put(attributePrefix + "." + count, addresses[count].toString()); } } } + + private void countAttachments(final AtomicInteger counter, final MimePart parentPart, final int depth) throws MessagingException, IOException { + final String disposition = parentPart.getDisposition(); + + final Object parentContent = parentPart.getContent(); + if (parentContent instanceof Multipart multipart) { + final int count = multipart.getCount(); + + final int partDepth = depth + 1; + for (int i = 0; i < count; i++) { + final BodyPart bodyPart = multipart.getBodyPart(i); + if (bodyPart instanceof MimeBodyPart mimeBodyPart) { + countAttachments(counter, mimeBodyPart, partDepth); + } + } + } else if (ATTACHMENT_DISPOSITION.equalsIgnoreCase(disposition) || depth > 0) { + counter.getAndIncrement(); + } + } } diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractTNEFAttachments.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractTNEFAttachments.java index d200058abc..3a24acf33d 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractTNEFAttachments.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractTNEFAttachments.java @@ -17,18 +17,12 @@ package org.apache.nifi.processors.email; import java.io.BufferedInputStream; -import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; -import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; @@ -37,7 +31,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; @@ -46,12 +39,9 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.FlowFileHandlingException; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.poi.hmef.Attachment; import org.apache.poi.hmef.HMEFMessage; - @SupportsBatching @SideEffectFree @Tags({"split", "email"}) @@ -61,7 +51,6 @@ import org.apache.poi.hmef.HMEFMessage; @WritesAttribute(attribute = "filename ", description = "The filename of the attachment"), @WritesAttribute(attribute = "email.tnef.attachment.parent.filename ", description = "The filename of the parent FlowFile"), @WritesAttribute(attribute = "email.tnef.attachment.parent.uuid", description = "The UUID of the original FlowFile.")}) - public class ExtractTNEFAttachments extends AbstractProcessor { public static final String ATTACHMENT_ORIGINAL_FILENAME = "email.tnef.attachment.parent.filename"; public static final String ATTACHMENT_ORIGINAL_UUID = "email.tnef.attachment.parent.uuid"; @@ -79,20 +68,7 @@ public class ExtractTNEFAttachments extends AbstractProcessor { .description("Each individual flowfile that could not be parsed will be routed to the failure relationship") .build(); - private final static Set<Relationship> RELATIONSHIPS; - private final static List<PropertyDescriptor> DESCRIPTORS; - - - static { - final Set<Relationship> _relationships = new HashSet<>(); - _relationships.add(REL_ATTACHMENTS); - _relationships.add(REL_ORIGINAL); - _relationships.add(REL_FAILURE); - RELATIONSHIPS = Collections.unmodifiableSet(_relationships); - - final List<PropertyDescriptor> _descriptors = new ArrayList<>(); - DESCRIPTORS = Collections.unmodifiableList(_descriptors); - } + private final static Set<Relationship> RELATIONSHIPS = Set.of(REL_ATTACHMENTS, REL_ORIGINAL, REL_FAILURE); @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { @@ -105,66 +81,50 @@ public class ExtractTNEFAttachments extends AbstractProcessor { final List<FlowFile> invalidFlowFilesList = new ArrayList<>(); final List<FlowFile> originalFlowFilesList = new ArrayList<>(); - session.read(originalFlowFile, new InputStreamCallback() { - @Override - public void process(final InputStream rawIn) throws IOException { - try (final InputStream in = new BufferedInputStream(rawIn)) { - Properties props = new Properties(); - - HMEFMessage hmefMessage = null; - - // This will trigger an exception in case content is not a TNEF. - hmefMessage = new HMEFMessage(in); - - // Add otiginal flowfile (may revert later on in case of errors) // - originalFlowFilesList.add(originalFlowFile); - - if (hmefMessage != null) { - // Attachments isn empty, proceeding. - if (!hmefMessage.getAttachments().isEmpty()) { - final String originalFlowFileName = originalFlowFile.getAttribute(CoreAttributes.FILENAME.key()); - try { - for (final Attachment attachment : hmefMessage.getAttachments()) { - FlowFile split = session.create(originalFlowFile); - final Map<String, String> attributes = new HashMap<>(); - if (StringUtils.isNotBlank(attachment.getLongFilename())) { - attributes.put(CoreAttributes.FILENAME.key(), attachment.getFilename()); - } - - String parentUuid = originalFlowFile.getAttribute(CoreAttributes.UUID.key()); - attributes.put(ATTACHMENT_ORIGINAL_UUID, parentUuid); - attributes.put(ATTACHMENT_ORIGINAL_FILENAME, originalFlowFileName); + session.read(originalFlowFile, rawIn -> { + try (final InputStream in = new BufferedInputStream(rawIn)) { + // This will trigger an exception in case content is not a TNEF. + final HMEFMessage hmefMessage = new HMEFMessage(in); + + // Add original FlowFile (may revert later on in case of errors) // + originalFlowFilesList.add(originalFlowFile); + + if (!hmefMessage.getAttachments().isEmpty()) { + final String originalFlowFileName = originalFlowFile.getAttribute(CoreAttributes.FILENAME.key()); + try { + for (final Attachment attachment : hmefMessage.getAttachments()) { + FlowFile split = session.create(originalFlowFile); + final Map<String, String> attributes = new HashMap<>(); + final String attachmentFilename = attachment.getFilename(); + if (attachmentFilename != null && !attachmentFilename.isBlank()) { + attributes.put(CoreAttributes.FILENAME.key(), attachmentFilename); + } - // TODO: Extract Mime Type (HMEF doesn't seem to be able to get this info. + String parentUuid = originalFlowFile.getAttribute(CoreAttributes.UUID.key()); + attributes.put(ATTACHMENT_ORIGINAL_UUID, parentUuid); + attributes.put(ATTACHMENT_ORIGINAL_FILENAME, originalFlowFileName); - split = session.append(split, new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - out.write(attachment.getContents()); - } - }); - split = session.putAllAttributes(split, attributes); - attachmentsList.add(split); - } - } catch (FlowFileHandlingException e) { - // Something went wrong - // Removing splits that may have been created - session.remove(attachmentsList); - // Removing the original flow from its list - originalFlowFilesList.remove(originalFlowFile); - logger.error("Flowfile {} triggered error {} while processing message removing generated FlowFiles from sessions", new Object[]{originalFlowFile, e}); - invalidFlowFilesList.add(originalFlowFile); - } + split = session.append(split, out -> out.write(attachment.getContents())); + split = session.putAllAttributes(split, attributes); + attachmentsList.add(split); } + } catch (FlowFileHandlingException e) { + // Something went wrong + // Removing splits that may have been created + session.remove(attachmentsList); + // Removing the original flow from its list + originalFlowFilesList.remove(originalFlowFile); + logger.error("Flowfile {} triggered error {} while processing message removing generated FlowFiles from sessions", originalFlowFile, e); + invalidFlowFilesList.add(originalFlowFile); } - } catch (Exception e) { - // Another error hit... - // Removing the original flow from its list - originalFlowFilesList.remove(originalFlowFile); - logger.error("Could not parse the flowfile {} as an email, treating as failure", new Object[]{originalFlowFile, e}); - // Message is invalid or triggered an error during parsing - invalidFlowFilesList.add(originalFlowFile); } + } catch (Exception e) { + // Another error hit... + // Removing the original flow from its list + originalFlowFilesList.remove(originalFlowFile); + logger.error("Could not parse {} as an email, treating as failure", originalFlowFile, e); + // Message is invalid or triggered an error during parsing + invalidFlowFilesList.add(originalFlowFile); } }); @@ -176,13 +136,13 @@ public class ExtractTNEFAttachments extends AbstractProcessor { session.transfer(originalFlowFilesList, REL_ORIGINAL); // check if attachments have been extracted - if (attachmentsList.size() != 0) { + if (!attachmentsList.isEmpty()) { if (attachmentsList.size() > 10) { // If more than 10, summarise log - logger.info("Split {} into {} files", new Object[]{originalFlowFile, attachmentsList.size()}); + logger.info("Split {} into {} files", originalFlowFile, attachmentsList.size()); } else { // Otherwise be more verbose and list each individual split - logger.info("Split {} into {} files: {}", new Object[]{originalFlowFile, attachmentsList.size(), attachmentsList}); + logger.info("Split {} into {} files: {}", originalFlowFile, attachmentsList.size(), attachmentsList); } } } @@ -191,12 +151,5 @@ public class ExtractTNEFAttachments extends AbstractProcessor { public Set<Relationship> getRelationships() { return RELATIONSHIPS; } - - @Override - public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return DESCRIPTORS; - } - - } diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java index 54bf91bbd9..47ab46b8e6 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java @@ -24,8 +24,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractSessionFactoryProcessor; @@ -37,24 +35,13 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.email.smtp.SmtpConsumer; import org.apache.nifi.security.util.ClientAuth; -import org.apache.nifi.security.util.TlsConfiguration; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.SSLContextService; -import org.springframework.util.StringUtils; import org.subethamail.smtp.MessageContext; import org.subethamail.smtp.MessageHandlerFactory; import org.subethamail.smtp.server.SMTPServer; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocket; -import javax.net.ssl.SSLSocketFactory; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -134,8 +121,9 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor { .name("CLIENT_AUTH") .displayName("Client Auth") .description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.") - .required(false) + .required(true) .allowableValues(ClientAuth.NONE.name(), ClientAuth.REQUIRED.name()) + .dependsOn(SSL_CONTEXT_SERVICE) .build(); protected static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder() @@ -152,25 +140,17 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor { .description("All new messages will be routed as FlowFiles to this relationship") .build(); - private final static List<PropertyDescriptor> PROPERTY_DESCRIPTORS; - - private final static Set<Relationship> RELATIONSHIPS; + private final static List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of( + SMTP_PORT, + SMTP_MAXIMUM_CONNECTIONS, + SMTP_TIMEOUT, + SMTP_MAXIMUM_MSG_SIZE, + SSL_CONTEXT_SERVICE, + CLIENT_AUTH, + SMTP_HOSTNAME + ); - static { - List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); - _propertyDescriptors.add(SMTP_PORT); - _propertyDescriptors.add(SMTP_MAXIMUM_CONNECTIONS); - _propertyDescriptors.add(SMTP_TIMEOUT); - _propertyDescriptors.add(SMTP_MAXIMUM_MSG_SIZE); - _propertyDescriptors.add(SSL_CONTEXT_SERVICE); - _propertyDescriptors.add(CLIENT_AUTH); - _propertyDescriptors.add(SMTP_HOSTNAME); - PROPERTY_DESCRIPTORS = Collections.unmodifiableList(_propertyDescriptors); - - Set<Relationship> _relationships = new HashSet<>(); - _relationships.add(REL_SUCCESS); - RELATIONSHIPS = Collections.unmodifiableSet(_relationships); - } + private final static Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS); private volatile SMTPServer smtp; @@ -180,28 +160,27 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor { try { final SMTPServer server = prepareServer(context, sessionFactory); server.start(); - getLogger().debug("Started SMTP Server on port " + server.getPort()); + getLogger().info("Started SMTP Server on {}", server.getPortAllocated()); smtp = server; - } catch (final Exception ex) {//have to catch exception due to awkward exception handling in subethasmtp + } catch (final Exception e) {//have to catch exception due to awkward exception handling in subethasmtp smtp = null; - getLogger().error("Unable to start SMTP server due to " + ex.getMessage(), ex); + getLogger().error("Unable to start SMTP server", e); } } context.yield();//nothing really to do here since threading managed by smtp server sessions } public int getListeningPort() { - return smtp == null ? 0 : smtp.getPort(); + return smtp == null ? 0 : smtp.getPortAllocated(); } @OnStopped public void stop() { try { smtp.stop(); - getLogger().debug("Stopped SMTP server on port " + smtp.getPort()); - }catch (Exception ex){ - getLogger().error("Error stopping SMTP server: " + ex.getMessage()); - }finally { + } catch (final Exception e) { + getLogger().error("Failed to stop SMTP Server", e); + } finally { smtp = null; } } @@ -211,78 +190,43 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor { return RELATIONSHIPS; } - @Override - protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { - List<ValidationResult> results = new ArrayList<>(); - - String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue(); - SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - - if (sslContextService != null && !StringUtils.hasText(clientAuth)) { - results.add(new ValidationResult.Builder() - .subject(CLIENT_AUTH.getDisplayName()) - .explanation(CLIENT_AUTH.getDisplayName() + " must be provided when using " + SSL_CONTEXT_SERVICE.getDisplayName()) - .valid(false) - .build()); - } else if (sslContextService == null && StringUtils.hasText(clientAuth)) { - results.add(new ValidationResult.Builder() - .subject(SSL_CONTEXT_SERVICE.getDisplayName()) - .explanation(SSL_CONTEXT_SERVICE.getDisplayName() + " must be provided when selecting " + CLIENT_AUTH.getDisplayName()) - .valid(false) - .build()); - } - return results; - } - @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return PROPERTY_DESCRIPTORS; } private SMTPServer prepareServer(final ProcessContext context, final ProcessSessionFactory sessionFactory) { + final SMTPServer.Builder smtpServerBuilder = new SMTPServer.Builder(); + final int port = context.getProperty(SMTP_PORT).asInteger(); final String host = context.getProperty(SMTP_HOSTNAME).getValue(); final ComponentLog log = getLogger(); final int maxMessageSize = context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue(); - //create message handler factory - final MessageHandlerFactory messageHandlerFactory = (final MessageContext mc) -> { - return new SmtpConsumer(mc, sessionFactory, port, host, log, maxMessageSize); - }; - //create smtp server + final MessageHandlerFactory messageHandlerFactory = (final MessageContext mc) -> new SmtpConsumer(mc, sessionFactory, port, host, log, maxMessageSize); + + smtpServerBuilder.messageHandlerFactory(messageHandlerFactory); + smtpServerBuilder.port(port); + smtpServerBuilder.softwareName("Apache NiFi SMTP"); + smtpServerBuilder.maxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger()); + smtpServerBuilder.maxMessageSize(maxMessageSize); + smtpServerBuilder.connectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS); + if (context.getProperty(SMTP_HOSTNAME).isSet()) { + smtpServerBuilder.hostName(context.getProperty(SMTP_HOSTNAME).getValue()); + } + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - final SMTPServer smtpServer = sslContextService == null ? new SMTPServer(messageHandlerFactory) : new SMTPServer(messageHandlerFactory) { - @Override - public SSLSocket createSSLSocket(Socket socket) throws IOException { - InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); - final String clientAuth = context.getProperty(CLIENT_AUTH).getValue(); - final SSLContext sslContext = sslContextService.createContext(); - final SSLSocketFactory socketFactory = sslContext.getSocketFactory(); - final SSLSocket sslSocket = (SSLSocket) socketFactory.createSocket(socket, remoteAddress.getHostName(), socket.getPort(), true); - final TlsConfiguration tlsConfiguration = sslContextService.createTlsConfiguration(); - sslSocket.setEnabledProtocols(tlsConfiguration.getEnabledProtocols()); + if (sslContextService == null) { + smtpServerBuilder.hideTLS(); + } else { + smtpServerBuilder.enableTLS(); - sslSocket.setUseClientMode(false); + final String clientAuth = context.getProperty(CLIENT_AUTH).getValue(); + final boolean requireClientCertificate = ClientAuth.REQUIRED.getType().equalsIgnoreCase(clientAuth); - if (ClientAuth.REQUIRED.getType().equals(clientAuth)) { - this.setRequireTLS(true); - sslSocket.setNeedClientAuth(true); - } - return sslSocket; - } - }; - if (sslContextService != null) { - smtpServer.setEnableTLS(true); - } else { - smtpServer.setHideTLS(true); + final SSLContext sslContext = sslContextService.createContext(); + smtpServerBuilder.startTlsSocketFactory(sslContext, requireClientCertificate); } - smtpServer.setSoftwareName("Apache NiFi SMTP"); - smtpServer.setPort(port); - smtpServer.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger()); - smtpServer.setMaxMessageSize(maxMessageSize); - smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - if (context.getProperty(SMTP_HOSTNAME).isSet()) { - smtpServer.setHostName(context.getProperty(SMTP_HOSTNAME).getValue()); - } - return smtpServer; + + return smtpServerBuilder.build(); } } diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/SmtpConsumer.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/SmtpConsumer.java index d025352920..7d0aa76f3a 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/SmtpConsumer.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/SmtpConsumer.java @@ -24,13 +24,12 @@ import java.net.SocketAddress; import java.security.cert.Certificate; import java.security.cert.X509Certificate; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.commons.io.IOUtils; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; @@ -38,15 +37,16 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processors.email.ListenSMTP; -import org.apache.nifi.security.cert.StandardPrincipalFormatter; import org.apache.nifi.stream.io.LimitingInputStream; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; import org.subethamail.smtp.MessageContext; import org.subethamail.smtp.MessageHandler; -import org.subethamail.smtp.RejectException; import org.subethamail.smtp.TooMuchDataException; import org.subethamail.smtp.server.SMTPServer; +import javax.security.auth.x500.X500Principal; + /** * A simple consumer that provides a bridge between 'push' message distribution * provided by {@link SMTPServer} and NiFi polling scheduler mechanism. @@ -82,16 +82,8 @@ public class SmtpConsumer implements MessageHandler { this.maxMessageSize = maxMessageSize; } - String getFrom() { - return from; - } - - List<String> getRecipients() { - return Collections.unmodifiableList(recipientList); - } - @Override - public void data(final InputStream data) throws RejectException, TooMuchDataException, IOException { + public String data(final InputStream data) throws IOException { final ProcessSession processSession = sessionFactory.createSession(); final StopWatch watch = new StopWatch(); watch.start(); @@ -100,7 +92,7 @@ public class SmtpConsumer implements MessageHandler { final AtomicBoolean limitExceeded = new AtomicBoolean(false); flowFile = processSession.write(flowFile, (OutputStream out) -> { final LimitingInputStream lis = new LimitingInputStream(data, maxMessageSize); - IOUtils.copy(lis, out); + StreamUtils.copy(lis, out); if (lis.hasReachedLimit()) { limitExceeded.set(true); } @@ -113,19 +105,21 @@ public class SmtpConsumer implements MessageHandler { processSession.getProvenanceReporter().receive(flowFile, "smtp://" + host + ":" + port + "/", watch.getDuration(TimeUnit.MILLISECONDS)); processSession.transfer(flowFile, ListenSMTP.REL_SUCCESS); processSession.commitAsync(); - } catch (FlowFileAccessException | IllegalStateException | RejectException | IOException ex) { - log.error("Unable to fully process input due to " + ex.getMessage(), ex); - throw ex; + } catch (final FlowFileAccessException | IllegalStateException | IOException e) { + log.error("SMTP data processing failed", e); + throw e; } + + return null; } @Override - public void from(final String from) throws RejectException { + public void from(final String from) { this.from = from; } @Override - public void recipient(final String recipient) throws RejectException { + public void recipient(final String recipient) { if (recipient != null && recipient.length() < 100 && recipientList.size() < 100) { recipientList.add(recipient); } @@ -142,7 +136,7 @@ public class SmtpConsumer implements MessageHandler { for (int i = 0; i < tlsPeerCertificates.length; i++) { if (tlsPeerCertificates[i] instanceof final X509Certificate x509Cert) { attributes.put("smtp.certificate." + i + ".serial", x509Cert.getSerialNumber().toString()); - attributes.put("smtp.certificate." + i + ".subjectName", StandardPrincipalFormatter.getInstance().getSubject(x509Cert)); + attributes.put("smtp.certificate." + i + ".subjectName", x509Cert.getSubjectX500Principal().getName(X500Principal.RFC1779)); } } } @@ -156,7 +150,9 @@ public class SmtpConsumer implements MessageHandler { attributes.put("smtp.src", strAddress); } - attributes.put("smtp.helo", context.getHelo()); + final Optional<String> helo = context.getHelo(); + helo.ifPresent(s -> attributes.put("smtp.helo", s)); + attributes.put("smtp.from", from); for (int i = 0; i < recipientList.size(); i++) { attributes.put("smtp.recipient." + i, recipientList.get(i)); @@ -164,5 +160,4 @@ public class SmtpConsumer implements MessageHandler { attributes.put(CoreAttributes.MIME_TYPE.key(), "message/rfc822"); return attributes; } - } diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/GenerateAttachment.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/GenerateAttachment.java index 6bfbc7872d..0ea1838fe8 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/GenerateAttachment.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/GenerateAttachment.java @@ -17,15 +17,7 @@ package org.apache.nifi.processors.email; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import javax.mail.MessagingException; -import javax.mail.internet.MimeMessage; -import org.apache.commons.mail.Email; -import org.apache.commons.mail.EmailAttachment; -import org.apache.commons.mail.EmailException; -import org.apache.commons.mail.MultiPartEmail; -import org.apache.commons.mail.SimpleEmail; +import java.nio.charset.StandardCharsets; public class GenerateAttachment { String from; @@ -34,6 +26,10 @@ public class GenerateAttachment { String message; String hostName; + private static final String NEWLINE = "\n"; + + private static final String BOUNDARY = "5A7C0449-336B-4F73-81EF-F176E4DF44B2"; + public GenerateAttachment(String from, String to, String subject, String message, String hostName) { this.from = from; this.to = to; @@ -42,73 +38,72 @@ public class GenerateAttachment { this.hostName = hostName; } - public byte[] SimpleEmail() { - MimeMessage mimeMessage = SimpleEmailMimeMessage(); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - try { - mimeMessage.writeTo(output); - } catch (IOException e) { - e.printStackTrace(); - } catch (MessagingException e) { - e.printStackTrace(); - } - - return output.toByteArray(); + public byte[] simpleMessage() { + return simpleMessage(null); } - public MimeMessage SimpleEmailMimeMessage() { - Email email = new SimpleEmail(); - try { - email.setFrom(from); - email.addTo(to); - email.setSubject(subject); - email.setMsg(message); - email.setHostName(hostName); - email.buildMimeMessage(); - } catch (EmailException e) { - e.printStackTrace(); + public byte[] simpleMessage(final String recipient) { + final StringBuilder builder = new StringBuilder(); + + builder.append("MIME-Version: 1.0"); + builder.append(NEWLINE); + builder.append("Content-Type: text/plain; charset=utf-8"); + builder.append(NEWLINE); + builder.append("From: "); + builder.append(from); + builder.append(NEWLINE); + + if (recipient != null) { + builder.append("To: "); + builder.append(recipient); + builder.append(NEWLINE); } - return email.getMimeMessage(); + builder.append("Subject: "); + builder.append(subject); + builder.append(NEWLINE); + builder.append(NEWLINE); + builder.append(message); + + return builder.toString().getBytes(StandardCharsets.UTF_8); } + public byte[] withAttachments(int amount) { + final StringBuilder builder = new StringBuilder(); - public byte[] WithAttachments(int amount) { - MultiPartEmail email = new MultiPartEmail(); - try { - - email.setFrom(from); - email.addTo(to); - email.setSubject(subject); - email.setMsg(message); - email.setHostName(hostName); - - int x = 1; - while (x <= amount) { - // Create an attachment with the pom.xml being used to compile (yay!!!) - EmailAttachment attachment = new EmailAttachment(); - attachment.setPath("pom.xml"); - attachment.setDisposition(EmailAttachment.ATTACHMENT); - attachment.setDescription("pom.xml"); - attachment.setName("pom.xml"+String.valueOf(x)); - // attach - email.attach(attachment); - x++; - } - email.buildMimeMessage(); - } catch (EmailException e) { - e.printStackTrace(); - } - ByteArrayOutputStream output = new ByteArrayOutputStream(); - MimeMessage mimeMessage = email.getMimeMessage(); - try { - mimeMessage.writeTo(output); - } catch (IOException e) { - e.printStackTrace(); - } catch (MessagingException e) { - e.printStackTrace(); + builder.append("MIME-Version: 1.0"); + builder.append(NEWLINE); + + builder.append("Content-Type: multipart/mixed; boundary=\""); + builder.append(BOUNDARY); + builder.append("\""); + builder.append(NEWLINE); + + builder.append("From: "); + builder.append(from); + builder.append(NEWLINE); + builder.append("To: "); + builder.append(to); + builder.append(NEWLINE); + builder.append("Subject: "); + builder.append(subject); + builder.append(NEWLINE); + builder.append(NEWLINE); + + for (int i = 0; i < amount; i++) { + builder.append("--"); + builder.append(BOUNDARY); + builder.append(NEWLINE); + builder.append("Content-Type: text/plain; charset=utf-8"); + builder.append(NEWLINE); + builder.append("Content-Disposition: attachment; filename=\"pom.xml-%d\"".formatted(i)); + builder.append(NEWLINE); + builder.append(NEWLINE); + builder.append("Attachment"); + builder.append(i); + builder.append(NEWLINE); } - return output.toByteArray(); + return builder.toString().getBytes(StandardCharsets.UTF_8); } } diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ITestConsumeEmail.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestConsumeEmail.java similarity index 82% rename from nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ITestConsumeEmail.java rename to nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestConsumeEmail.java index 75c6c39098..dfb61074f3 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ITestConsumeEmail.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestConsumeEmail.java @@ -28,27 +28,24 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.integration.mail.AbstractMailReceiver; -import javax.mail.Message; -import javax.mail.MessagingException; -import javax.mail.Session; -import javax.mail.internet.InternetAddress; -import javax.mail.internet.MimeMessage; -import java.lang.reflect.Field; +import jakarta.mail.Message; +import jakarta.mail.MessagingException; +import jakarta.mail.Session; +import jakarta.mail.internet.InternetAddress; +import jakarta.mail.internet.MimeMessage; import java.util.List; import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; - -public class ITestConsumeEmail { +public class TestConsumeEmail { private GreenMail mockIMAP4Server; private GreenMail mockPOP3Server; private GreenMailUser imapUser; private GreenMailUser popUser; - // Setup mock imap server @BeforeEach public void setUp() { mockIMAP4Server = new GreenMail(ServerSetupTest.IMAP); @@ -77,7 +74,6 @@ public class ITestConsumeEmail { user.deliver(message); } - // Start the testing units @Test public void testConsumeIMAP4() throws Exception { @@ -157,24 +153,4 @@ public class ITestConsumeEmail { assertEquals("pop3", consume.getProtocol(runner.getProcessContext())); } - - @Test - public void validateUrl() throws Exception { - Field displayUrlField = AbstractEmailProcessor.class.getDeclaredField("displayUrl"); - displayUrlField.setAccessible(true); - - AbstractEmailProcessor<? extends AbstractMailReceiver> consume = new ConsumeIMAP(); - TestRunner runner = TestRunners.newTestRunner(consume); - runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com"); - runner.setProperty(ConsumeIMAP.PORT, "1234"); - runner.setProperty(ConsumeIMAP.USER, "jon"); - runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr"); - runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX"); - runner.setProperty(ConsumeIMAP.USE_SSL, "false"); - - assertEquals("imap://jon:qhgwjg...@foo.bar.com:1234/MYBOX", consume.buildUrl(runner.getProcessContext())); - assertEquals("imap://jon:[password]@foo.bar.com:1234/MYBOX", displayUrlField.get(consume)); - } - - } diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailAttachments.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailAttachments.java index ecaf548ae3..719d5d64a4 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailAttachments.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailAttachments.java @@ -25,13 +25,10 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Random; import static org.junit.jupiter.api.Assertions.assertTrue; - public class TestExtractEmailAttachments { - // Setups the fields to be used... String from = "Alice <al...@nifi.apache.org>"; String to = "b...@nifi.apache.org"; String subject = "Just a test email"; @@ -40,13 +37,11 @@ public class TestExtractEmailAttachments { GenerateAttachment attachmentGenerator = new GenerateAttachment(from, to, subject, message, hostName); - @Test - public void testValidEmailWithAttachments() throws Exception { + public void testValidEmailWithAttachments() { final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments()); - // Create the message dynamically - byte [] withAttachment = attachmentGenerator.WithAttachments(1); + byte [] withAttachment = attachmentGenerator.withAttachments(1); runner.enqueue(withAttachment); runner.run(); @@ -56,17 +51,15 @@ public class TestExtractEmailAttachments { runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, 1); // Have a look at the attachments... final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ExtractEmailAttachments.REL_ATTACHMENTS); - splits.get(0).assertAttributeEquals("filename", "pom.xml1"); + splits.get(0).assertAttributeEquals("filename", "pom.xml-0"); } @Test - public void testValidEmailWithMultipleAttachments() throws Exception { - Random rnd = new Random() ; + public void testValidEmailWithMultipleAttachments() { final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments()); - // Create the message dynamically - int amount = rnd.nextInt(10) + 1; - byte [] withAttachment = attachmentGenerator.WithAttachments(amount); + int amount = 3; + byte [] withAttachment = attachmentGenerator.withAttachments(amount); runner.enqueue(withAttachment); runner.run(); @@ -74,23 +67,22 @@ public class TestExtractEmailAttachments { runner.assertTransferCount(ExtractEmailAttachments.REL_ORIGINAL, 1); runner.assertTransferCount(ExtractEmailAttachments.REL_FAILURE, 0); runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, amount); - // Have a look at the attachments... + final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ExtractEmailAttachments.REL_ATTACHMENTS); List<String> filenames = new ArrayList<>(); for (int a = 0 ; a < amount ; a++ ) { - filenames.add(splits.get(a).getAttribute("filename").toString()); + filenames.add(splits.get(a).getAttribute("filename")); } - assertTrue(filenames.containsAll(Arrays.asList("pom.xml1", "pom.xml" + amount))); + assertTrue(filenames.containsAll(Arrays.asList("pom.xml-0", "pom.xml-1", "pom.xml-2"))); } @Test - public void testValidEmailWithoutAttachments() throws Exception { + public void testValidEmailWithoutAttachments() { final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments()); - // Create the message dynamically - byte [] simpleEmail = attachmentGenerator.SimpleEmail(); + byte [] simpleEmail = attachmentGenerator.simpleMessage(); runner.enqueue(simpleEmail); runner.run(); @@ -98,11 +90,10 @@ public class TestExtractEmailAttachments { runner.assertTransferCount(ExtractEmailAttachments.REL_ORIGINAL, 1); runner.assertTransferCount(ExtractEmailAttachments.REL_FAILURE, 0); runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, 0); - } @Test - public void testInvalidEmail() throws Exception { + public void testInvalidEmail() { final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments()); runner.enqueue("test test test chocolate".getBytes()); runner.run(); diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailHeaders.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailHeaders.java index 774e714b59..37abd95444 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailHeaders.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailHeaders.java @@ -22,15 +22,9 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.Test; -import javax.mail.MessagingException; -import javax.mail.internet.MimeMessage; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.util.List; public class TestExtractEmailHeaders { - - // Setup the fields to be used... String from = "Alice <al...@nifi.apache.org>"; String to = "b...@nifi.apache.org"; String subject = "Just a test email"; @@ -40,11 +34,10 @@ public class TestExtractEmailHeaders { GenerateAttachment attachmentGenerator = new GenerateAttachment(from, to, subject, message, hostName); @Test - public void testValidEmailWithAttachments() throws Exception { + public void testValidEmailWithAttachments() { final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders()); - // Create the message dynamically - byte [] withAttachment = attachmentGenerator.WithAttachments(1); + byte [] withAttachment = attachmentGenerator.withAttachments(1); runner.enqueue(withAttachment); runner.run(); @@ -61,12 +54,11 @@ public class TestExtractEmailHeaders { } @Test - public void testValidEmailWithoutAttachments() throws Exception { + public void testValidEmailWithoutAttachments() { final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders()); runner.setProperty(ExtractEmailHeaders.CAPTURED_HEADERS, "MIME-Version"); - // Create the message dynamically - byte [] simpleEmail = attachmentGenerator.SimpleEmail(); + byte [] simpleEmail = attachmentGenerator.simpleMessage(to); runner.enqueue(simpleEmail); runner.run(); @@ -89,24 +81,12 @@ public class TestExtractEmailHeaders { * TO, CC, BCC. */ @Test - public void testValidEmailWithNoRecipients() throws Exception { + public void testValidEmailWithNoRecipients() { final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders()); runner.setProperty(ExtractEmailHeaders.CAPTURED_HEADERS, "MIME-Version"); - MimeMessage simpleEmailMimeMessage = attachmentGenerator.SimpleEmailMimeMessage(); - - simpleEmailMimeMessage.removeHeader("To"); - simpleEmailMimeMessage.removeHeader("Cc"); - simpleEmailMimeMessage.removeHeader("Bcc"); - - ByteArrayOutputStream messageBytes = new ByteArrayOutputStream(); - try { - simpleEmailMimeMessage.writeTo(messageBytes); - } catch (IOException | MessagingException e) { - e.printStackTrace(); - } - - runner.enqueue(messageBytes.toByteArray()); + final byte[] message = attachmentGenerator.simpleMessage(); + runner.enqueue(message); runner.run(); runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 1); @@ -128,32 +108,21 @@ public class TestExtractEmailHeaders { * addresses. */ @Test - public void testNonStrictParsingPassesForInvalidAddresses() throws Exception { + public void testNonStrictParsingPassesForInvalidAddresses() { final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders()); runner.setProperty(ExtractEmailHeaders.STRICT_PARSING, "false"); - MimeMessage simpleEmailMimeMessage = attachmentGenerator.SimpleEmailMimeMessage(); - - simpleEmailMimeMessage.setHeader("From", "<bad_email>"); - simpleEmailMimeMessage.setHeader("To", "<>, Joe, \"\" <>"); - - ByteArrayOutputStream messageBytes = new ByteArrayOutputStream(); - try { - simpleEmailMimeMessage.writeTo(messageBytes); - } catch (IOException | MessagingException e) { - e.printStackTrace(); - } + final byte[] message = attachmentGenerator.simpleMessage("<>, Joe, \"\" <>"); - runner.enqueue(messageBytes.toByteArray()); + runner.enqueue(message); runner.run(); runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 1); runner.assertTransferCount(ExtractEmailHeaders.REL_FAILURE, 0); - runner.assertQueueEmpty(); final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ExtractEmailHeaders.REL_SUCCESS); - splits.get(0).assertAttributeEquals("email.headers.from.0", "bad_email"); + splits.get(0).assertAttributeEquals("email.headers.to.0", ""); splits.get(0).assertAttributeEquals("email.headers.to.1", "Joe"); splits.get(0).assertAttributeEquals("email.headers.to.2", ""); @@ -166,23 +135,13 @@ public class TestExtractEmailHeaders { * addresses. */ @Test - public void testStrictParsingFailsForInvalidAddresses() throws Exception { + public void testStrictParsingFailsForInvalidAddresses() { final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders()); runner.setProperty(ExtractEmailHeaders.STRICT_PARSING, "true"); - MimeMessage simpleEmailMimeMessage = attachmentGenerator.SimpleEmailMimeMessage(); - - simpleEmailMimeMessage.setHeader("From", "<bad_email>"); - simpleEmailMimeMessage.setHeader("To", "<>, Joe, <invalid>"); + final byte[] message = attachmentGenerator.simpleMessage("<>, Joe, \"\" <>"); - ByteArrayOutputStream messageBytes = new ByteArrayOutputStream(); - try { - simpleEmailMimeMessage.writeTo(messageBytes); - } catch (IOException | MessagingException e) { - e.printStackTrace(); - } - - runner.enqueue(messageBytes.toByteArray()); + runner.enqueue(message); runner.run(); runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 0); @@ -190,7 +149,7 @@ public class TestExtractEmailHeaders { } @Test - public void testInvalidEmail() throws Exception { + public void testInvalidEmail() { final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders()); runner.enqueue("test test test chocolate".getBytes()); runner.run(); @@ -198,5 +157,4 @@ public class TestExtractEmailHeaders { runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 0); runner.assertTransferCount(ExtractEmailHeaders.REL_FAILURE, 1); } - } diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java index 7a1aa21630..82e124811d 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java @@ -30,12 +30,12 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import javax.mail.Message; -import javax.mail.MessagingException; -import javax.mail.Session; -import javax.mail.Transport; -import javax.mail.internet.InternetAddress; -import javax.mail.internet.MimeMessage; +import jakarta.mail.Message; +import jakarta.mail.MessagingException; +import jakarta.mail.Session; +import jakarta.mail.Transport; +import jakarta.mail.internet.InternetAddress; +import jakarta.mail.internet.MimeMessage; import javax.net.ssl.SSLContext; import java.net.Socket; import java.security.GeneralSecurityException; diff --git a/pom.xml b/pom.xml index b7c029597c..bee1d2ff27 100644 --- a/pom.xml +++ b/pom.xml @@ -1192,7 +1192,6 @@ !AzureGraphUserGroupProviderIT, !GremlinClientServiceYamlSettingsAndBytecodeIT, !GremlinClientServiceControllerSettingsIT, - !ITestConsumeEmail#validateUrl, !PrometheusReportingTaskIT#testNullLabel, !SnowflakeConnectionPoolIT, !SnowflakePipeIT,