[ https://issues.apache.org/jira/browse/NIFI-2519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15416383#comment-15416383 ]
ASF GitHub Bot commented on NIFI-2519: -------------------------------------- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/827#discussion_r74361967 --- Diff: nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java --- @@ -166,317 +108,158 @@ .identifiesControllerService(SSLContextService.class) .build(); - public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() + static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() .name("CLIENT_AUTH") .displayName("Client Auth") .description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.") .required(false) .allowableValues(SSLContextService.ClientAuth.NONE.toString(), SSLContextService.ClientAuth.REQUIRED.toString()) .build(); - @Override - protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { - final List<ValidationResult> results = new ArrayList<>(); + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All new messages will be routed as FlowFiles to this relationship") + .build(); - final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue(); - final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + private final static List<PropertyDescriptor> propertyDescriptors; - if (sslContextService != null && StringUtils.isBlank(clientAuth)) { - results.add(new ValidationResult.Builder() - .explanation("Client Auth must be provided when using TLS/SSL") - .valid(false).subject("Client Auth").build()); - } + private final static Set<Relationship> relationships; - return results; + static { + List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(SMTP_PORT); + _propertyDescriptors.add(SMTP_MAXIMUM_CONNECTIONS); + _propertyDescriptors.add(SMTP_TIMEOUT); + _propertyDescriptors.add(SMTP_MAXIMUM_MSG_SIZE); + _propertyDescriptors.add(SSL_CONTEXT_SERVICE); + _propertyDescriptors.add(CLIENT_AUTH); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(_relationships); } + private volatile SMTPServer smtp; - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Extraction was successful") - .build(); + private volatile SmtpConsumer smtpConsumer; - private Set<Relationship> relationships; - private List<PropertyDescriptor> propertyDescriptors; - private volatile LinkedBlockingQueue<SmtpEvent> incomingMessages; + /** + * + */ + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + ProcessSession processSession = sessionFactory.createSession(); + if (this.smtp == null) { + this.setupSmtpIfNecessary(context, processSession); + } + + if (this.smtpConsumer.hasMessage()) { + try { + /* + * Will consume incoming message directly from the wire and into + * FlowFile/Content repository before exiting. This essentially + * limits any potential data loss by allowing SMTPServer thread + * to actually commit NiFi session if all good. However in the + * event of exception, such exception will be propagated back to + * the email sender via "undeliverable message" allowing such + * user to re-send the message + */ + this.smtpConsumer.consumeUsing((inputDataStream) -> { + FlowFile flowFile = processSession.create(); + AtomicInteger size = new AtomicInteger(); + flowFile = processSession.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + size.set(IOUtils.copy(inputDataStream, out)); + } + }); + processSession.getProvenanceReporter().receive(flowFile, "smtp://" + + ListenSMTP.this.smtp.getHostName() + ":" + ListenSMTP.this.smtp.getPort() + "/"); + processSession.transfer(flowFile, REL_SUCCESS); + processSession.commit(); + return size.get(); + }); + } catch (Exception e) { + this.getLogger().error("Failed while listenning for messages.", e); + processSession.rollback(); + } + } else { + context.yield(); + } + } - private volatile SMTPServer server; - private AtomicBoolean initialized = new AtomicBoolean(false); - private AtomicBoolean stopping = new AtomicBoolean(false); + /** + * + */ + @OnStopped + public void close() { + this.getLogger().info("Stopping SMTPServer"); + this.smtp.stop(); + this.smtp = null; + this.getLogger().info("SMTPServer stopped"); + } + /** + * + */ @Override public Set<Relationship> getRelationships() { return relationships; } + /** + * + */ @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return propertyDescriptors; } - @Override - protected void init(final ProcessorInitializationContext context) { - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - this.relationships = Collections.unmodifiableSet(relationships); - - final List<PropertyDescriptor> props = new ArrayList<>(); - props.add(SMTP_PORT); - props.add(SMTP_HOSTNAME); - props.add(SMTP_MAXIMUM_CONNECTIONS); - props.add(SMTP_TIMEOUT); - props.add(SMTP_MAXIMUM_MSG_SIZE); - props.add(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE); - props.add(SSL_CONTEXT_SERVICE); - props.add(CLIENT_AUTH); - this.propertyDescriptors = Collections.unmodifiableList(props); - - } - - // Upon Schedule, reset the initialized state to false - @OnScheduled - public void onScheduled(ProcessContext context) { - initialized.set(false); - stopping.set(false); - } - - protected synchronized void initializeSMTPServer(final ProcessContext context) throws Exception { - - // check if we are already running or if it is stopping - if (initialized.get() && server.isRunning() || stopping.get() ) { - return; - } - - incomingMessages = new LinkedBlockingQueue<>(context.getProperty(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE).asInteger()); - - String clientAuth = null; - - // If an SSLContextService was provided then create an SSLContext to pass down to the server - SSLContext sslContext = null; - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - if (sslContextService != null) { - clientAuth = context.getProperty(CLIENT_AUTH).getValue(); - sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth)); + /** + * + */ + private synchronized void setupSmtpIfNecessary(ProcessContext context, ProcessSession processSession) { + if (this.smtp == null) { + SmtpConsumer consumer = new SmtpConsumer(); + SMTPServer smtpServer = this.createServerInstance(context, consumer); + smtpServer.setSoftwareName("Apache NiFi"); + smtpServer.setPort(context.getProperty(SMTP_PORT).asInteger()); + smtpServer.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue()); + smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + + this.smtpConsumer = consumer; + this.smtp = smtpServer; + this.smtp.start(); } + } - final SSLContext finalSslContext = sslContext; - - SMTPMessageHandlerFactory smtpMessageHandlerFactory = new SMTPMessageHandlerFactory(incomingMessages, getLogger()); - final SMTPServer server = new SMTPServer(smtpMessageHandlerFactory) { - + /** + * + */ + private SMTPServer createServerInstance(ProcessContext context, SmtpConsumer consumer) { + SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + SMTPServer smtpServer = sslContextService == null ? new SMTPServer(consumer) : new SMTPServer(consumer) { @Override public SSLSocket createSSLSocket(Socket socket) throws IOException { InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); - - SSLSocketFactory socketFactory = finalSslContext.getSocketFactory(); - - SSLSocket s = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(), socket.getPort(), true)); - - s.setUseClientMode(false); - - - // For some reason the createSSLContext above is not enough to enforce - // client side auth - // If client auth is required... - if (SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue())) { - s.setNeedClientAuth(true); + String clientAuth = context.getProperty(CLIENT_AUTH).getValue(); + SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth)); + SSLSocketFactory socketFactory = sslContext.getSocketFactory(); + SSLSocket sslSocket = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(),socket.getPort(), true)); + sslSocket.setUseClientMode(false); + + if (SSLContextService.ClientAuth.REQUIRED.toString().equals(clientAuth)) { + this.setRequireTLS(true); + sslSocket.setNeedClientAuth(true); } - - return s; + return sslSocket; } }; - - // Set some parameters to our server - server.setSoftwareName("Apache NiFi"); - - - // Set the Server options based on properties - server.setPort(context.getProperty(SMTP_PORT).asInteger()); - server.setHostName(context.getProperty(SMTP_HOSTNAME).getValue()); - server.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue()); - server.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger()); - server.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - - - // Check if TLS should be enabled if (sslContextService != null) { - server.setEnableTLS(true); - } else { - server.setHideTLS(true); --- End diff -- Fair enough. Thanks for heads up. > TestListenSMTP ValidEmail fails during parallel build > ----------------------------------------------------- > > Key: NIFI-2519 > URL: https://issues.apache.org/jira/browse/NIFI-2519 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions > Reporter: Joseph Witt > Assignee: Oleg Zhurakousky > Fix For: 1.0.0 > > > While running a full NiFi parallel build received the following. So there is > some test issue at least that is impacting build stability. > [INFO] --- maven-compiler-plugin:3.2:testCompile (default-testCompile) @ > nifi-email-processors --- > [INFO] Changes detected - recompiling the module! > [INFO] Compiling 4 source files to > /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/target/test-classes > [WARNING] > /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[122,24] > [deprecation] stop() in Thread has been deprecated > [WARNING] > /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[186,24] > [deprecation] stop() in Thread has been deprecated > [WARNING] > /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java:[307,24] > [deprecation] stop() in Thread has been deprecated > [INFO] > [INFO] --- maven-compiler-plugin:3.2:testCompile (groovy-tests) @ > nifi-email-processors --- > [INFO] Changes detected - recompiling the module! > [INFO] Nothing to compile - all classes are up to date > [INFO] > [INFO] --- maven-surefire-plugin:2.18:test (default-test) @ > nifi-email-processors --- > [INFO] Surefire report directory: > /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/target/surefire-reports > [INFO] Using configured provider > org.apache.maven.surefire.junit4.JUnit4Provider > ------------------------------------------------------- > T E S T S > ------------------------------------------------------- > Running org.apache.nifi.processors.email.TestListenSMTP > Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.473 sec <<< > FAILURE! - in org.apache.nifi.processors.email.TestListenSMTP > ValidEmail(org.apache.nifi.processors.email.TestListenSMTP) Time elapsed: > 0.038 sec <<< FAILURE! > java.lang.AssertionError: Sending email failed > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.junit.Assert.assertFalse(Assert.java:64) > at > org.apache.nifi.processors.email.TestListenSMTP.ValidEmail(TestListenSMTP.java:188) > Running org.apache.nifi.processors.email.TestExtractEmailAttachments > Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.1 sec - in > org.apache.nifi.processors.email.TestExtractEmailAttachments > Running org.apache.nifi.processors.email.TestExtractEmailHeaders > Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.029 sec - > in org.apache.nifi.processors.email.TestExtractEmailHeaders > Results : > Failed tests: > TestListenSMTP.ValidEmail:188 Sending email failed -- This message was sent by Atlassian JIRA (v6.3.4#6332)