Github user trixpan commented on a diff in the pull request: https://github.com/apache/nifi/pull/827#discussion_r74361899 --- Diff: nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java --- @@ -166,317 +108,158 @@ .identifiesControllerService(SSLContextService.class) .build(); - public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() + static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() .name("CLIENT_AUTH") .displayName("Client Auth") .description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.") .required(false) .allowableValues(SSLContextService.ClientAuth.NONE.toString(), SSLContextService.ClientAuth.REQUIRED.toString()) .build(); - @Override - protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { - final List<ValidationResult> results = new ArrayList<>(); + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All new messages will be routed as FlowFiles to this relationship") + .build(); - final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue(); - final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + private final static List<PropertyDescriptor> propertyDescriptors; - if (sslContextService != null && StringUtils.isBlank(clientAuth)) { - results.add(new ValidationResult.Builder() - .explanation("Client Auth must be provided when using TLS/SSL") - .valid(false).subject("Client Auth").build()); - } + private final static Set<Relationship> relationships; - return results; + static { + List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(SMTP_PORT); + _propertyDescriptors.add(SMTP_MAXIMUM_CONNECTIONS); + _propertyDescriptors.add(SMTP_TIMEOUT); + _propertyDescriptors.add(SMTP_MAXIMUM_MSG_SIZE); + _propertyDescriptors.add(SSL_CONTEXT_SERVICE); + _propertyDescriptors.add(CLIENT_AUTH); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(_relationships); } + private volatile SMTPServer smtp; - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Extraction was successful") - .build(); + private volatile SmtpConsumer smtpConsumer; - private Set<Relationship> relationships; - private List<PropertyDescriptor> propertyDescriptors; - private volatile LinkedBlockingQueue<SmtpEvent> incomingMessages; + /** + * + */ + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + ProcessSession processSession = sessionFactory.createSession(); + if (this.smtp == null) { + this.setupSmtpIfNecessary(context, processSession); + } + + if (this.smtpConsumer.hasMessage()) { + try { + /* + * Will consume incoming message directly from the wire and into + * FlowFile/Content repository before exiting. This essentially + * limits any potential data loss by allowing SMTPServer thread + * to actually commit NiFi session if all good. However in the + * event of exception, such exception will be propagated back to + * the email sender via "undeliverable message" allowing such + * user to re-send the message + */ + this.smtpConsumer.consumeUsing((inputDataStream) -> { + FlowFile flowFile = processSession.create(); + AtomicInteger size = new AtomicInteger(); + flowFile = processSession.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + size.set(IOUtils.copy(inputDataStream, out)); + } + }); + processSession.getProvenanceReporter().receive(flowFile, "smtp://" + + ListenSMTP.this.smtp.getHostName() + ":" + ListenSMTP.this.smtp.getPort() + "/"); + processSession.transfer(flowFile, REL_SUCCESS); + processSession.commit(); + return size.get(); + }); + } catch (Exception e) { + this.getLogger().error("Failed while listenning for messages.", e); + processSession.rollback(); + } + } else { + context.yield(); + } + } - private volatile SMTPServer server; - private AtomicBoolean initialized = new AtomicBoolean(false); - private AtomicBoolean stopping = new AtomicBoolean(false); + /** + * + */ + @OnStopped + public void close() { + this.getLogger().info("Stopping SMTPServer"); + this.smtp.stop(); + this.smtp = null; + this.getLogger().info("SMTPServer stopped"); + } + /** + * + */ @Override public Set<Relationship> getRelationships() { return relationships; } + /** + * + */ @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return propertyDescriptors; } - @Override - protected void init(final ProcessorInitializationContext context) { - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - this.relationships = Collections.unmodifiableSet(relationships); - - final List<PropertyDescriptor> props = new ArrayList<>(); - props.add(SMTP_PORT); - props.add(SMTP_HOSTNAME); - props.add(SMTP_MAXIMUM_CONNECTIONS); - props.add(SMTP_TIMEOUT); - props.add(SMTP_MAXIMUM_MSG_SIZE); - props.add(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE); - props.add(SSL_CONTEXT_SERVICE); - props.add(CLIENT_AUTH); - this.propertyDescriptors = Collections.unmodifiableList(props); - - } - - // Upon Schedule, reset the initialized state to false - @OnScheduled - public void onScheduled(ProcessContext context) { - initialized.set(false); - stopping.set(false); - } - - protected synchronized void initializeSMTPServer(final ProcessContext context) throws Exception { - - // check if we are already running or if it is stopping - if (initialized.get() && server.isRunning() || stopping.get() ) { - return; - } - - incomingMessages = new LinkedBlockingQueue<>(context.getProperty(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE).asInteger()); - - String clientAuth = null; - - // If an SSLContextService was provided then create an SSLContext to pass down to the server - SSLContext sslContext = null; - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - if (sslContextService != null) { - clientAuth = context.getProperty(CLIENT_AUTH).getValue(); - sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth)); + /** + * + */ + private synchronized void setupSmtpIfNecessary(ProcessContext context, ProcessSession processSession) { + if (this.smtp == null) { + SmtpConsumer consumer = new SmtpConsumer(); + SMTPServer smtpServer = this.createServerInstance(context, consumer); + smtpServer.setSoftwareName("Apache NiFi"); + smtpServer.setPort(context.getProperty(SMTP_PORT).asInteger()); + smtpServer.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue()); + smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + + this.smtpConsumer = consumer; + this.smtp = smtpServer; + this.smtp.start(); } + } - final SSLContext finalSslContext = sslContext; - - SMTPMessageHandlerFactory smtpMessageHandlerFactory = new SMTPMessageHandlerFactory(incomingMessages, getLogger()); - final SMTPServer server = new SMTPServer(smtpMessageHandlerFactory) { - + /** + * + */ + private SMTPServer createServerInstance(ProcessContext context, SmtpConsumer consumer) { + SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + SMTPServer smtpServer = sslContextService == null ? new SMTPServer(consumer) : new SMTPServer(consumer) { @Override public SSLSocket createSSLSocket(Socket socket) throws IOException { InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); - - SSLSocketFactory socketFactory = finalSslContext.getSocketFactory(); - - SSLSocket s = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(), socket.getPort(), true)); - - s.setUseClientMode(false); - - - // For some reason the createSSLContext above is not enough to enforce - // client side auth - // If client auth is required... - if (SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue())) { - s.setNeedClientAuth(true); + String clientAuth = context.getProperty(CLIENT_AUTH).getValue(); + SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth)); + SSLSocketFactory socketFactory = sslContext.getSocketFactory(); + SSLSocket sslSocket = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(),socket.getPort(), true)); + sslSocket.setUseClientMode(false); + + if (SSLContextService.ClientAuth.REQUIRED.toString().equals(clientAuth)) { + this.setRequireTLS(true); + sslSocket.setNeedClientAuth(true); } - - return s; + return sslSocket; } }; - - // Set some parameters to our server - server.setSoftwareName("Apache NiFi"); - - - // Set the Server options based on properties - server.setPort(context.getProperty(SMTP_PORT).asInteger()); - server.setHostName(context.getProperty(SMTP_HOSTNAME).getValue()); - server.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue()); - server.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger()); - server.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - - - // Check if TLS should be enabled if (sslContextService != null) { - server.setEnableTLS(true); - } else { - server.setHideTLS(true); --- End diff -- @olegz TLS should be hidden when not enabled so to prevent situations where the client will not like the `454 TLS not supported` message ListenSMTP returns when it is disabled. This is required to prevent this: ``` $ telnet 0 2525 Trying 0.0.0.0... Connected to 0. Escape character is '^]'. 220 localhost ESMTP Apache NiFi help 214-Apache NiFi on localhost 214-Topics: 214- HELP 214- HELO 214- RCPT 214- MAIL 214- DATA 214- AUTH 214- EHLO 214- NOOP 214- RSET 214- VRFY 214- QUIT 214- STARTTLS 214-For more info use "HELP <topic>". 214 End of HELP info STARTTLS 454 TLS not supported quit 221 Bye Connection closed by foreign host. ``` vs ``` $ time openssl s_client -starttls smtp -crlf -connect 127.0.0.1:2525 CONNECTED(00000003) didn't found starttls in server response, try anyway... 140406973187912:error:140770FC:SSL routines:SSL23_GET_SERVER_HELLO:unknown protocol:s23_clnt.c:769: --- no peer certificate available --- No client certificate CA names sent --- SSL handshake has read 119 bytes and written 282 bytes --- New, (NONE), Cipher is (NONE) Secure Renegotiation IS NOT supported Compression: NONE Expansion: NONE --- real 1m0.086s user 0m0.023s sys 0m0.003s ```
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---