Github user trixpan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/483#discussion_r70182626
  
    --- Diff: 
nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java
 ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.email.smtp.handler;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.security.cert.X509Certificate;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.stream.io.ByteArrayOutputStream;
    +import org.apache.nifi.util.StopWatch;
    +import org.subethamail.smtp.DropConnectionException;
    +import org.subethamail.smtp.MessageContext;
    +import org.subethamail.smtp.MessageHandler;
    +import org.subethamail.smtp.MessageHandlerFactory;
    +import org.subethamail.smtp.RejectException;
    +import org.subethamail.smtp.TooMuchDataException;
    +import org.subethamail.smtp.server.SMTPServer;
    +
    +import org.apache.nifi.processors.email.smtp.event.SmtpEvent;
    +
    +
    +public class SMTPMessageHandlerFactory implements MessageHandlerFactory {
    +    final LinkedBlockingQueue<SmtpEvent> incomingMessages;
    +    final ComponentLog logger;
    +
    +    public SMTPMessageHandlerFactory(LinkedBlockingQueue<SmtpEvent> 
incomingMessages, ComponentLog logger) {
    +        this.incomingMessages = incomingMessages;
    +        this.logger = logger;
    +    }
    +
    +    @Override
    +    public MessageHandler create(MessageContext messageContext) {
    +        return new Handler(messageContext, incomingMessages, logger);
    +    }
    +
    +    class Handler implements MessageHandler {
    +        final MessageContext messageContext;
    +        String from;
    +        String recipient;
    +        byte [] messageBody;
    +
    +
    +        public Handler(MessageContext messageContext, 
LinkedBlockingQueue<SmtpEvent> incomingMessages, ComponentLog logger){
    +            this.messageContext = messageContext;
    +        }
    +
    +        @Override
    +        public void from(String from) throws RejectException {
    +            // TODO: possibly whitelist senders?
    +            this.from = from;
    +        }
    +
    +        @Override
    +        public void recipient(String recipient) throws RejectException {
    +            // TODO: possibly whitelist receivers?
    +            this.recipient = recipient;
    +        }
    +
    +        @Override
    +        public void data(InputStream inputStream) throws RejectException, 
TooMuchDataException, IOException {
    +            // Start counting the timer...
    +
    +            StopWatch watch = new StopWatch(false);
    +
    +            SMTPServer server = messageContext.getSMTPServer();
    +
    +            ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +
    +            byte [] buffer = new byte[1024];
    +            int rd;
    +
    +            while ((rd = inputStream.read(buffer, 0, buffer.length)) != 
-1) {
    +                baos.write(buffer, 0, rd);
    +            }
    +            if (baos.getBufferLength() > server.getMaxMessageSize()) {
    +                throw new TooMuchDataException("Data exceeds the amount 
allowed.");
    +            }
    +
    +            baos.flush();
    +            this.messageBody = baos.toByteArray();
    +
    +
    +            X509Certificate[] certificates = new X509Certificate[]{};
    --- End diff --
    
    @JPercivall 
    
    Great feedback. Learned lots from it. Truly thankful!
    
    Following your feedback I changed a bit the approach I took with ListenSMTP.
    
    Instead of processing messages at `done()` (i.e. after the acknowledgment 
back to client) I moved most of the processing of the message into the `data()` 
method which gets called immediately after the sending client sends the end of 
data command (i.e. the ".") but before the SMTP server acknowledge the message 
arrival.
    
    I also added some throws to inform the SMTP client to retry (via SMTP error 
451) whenever an error occurs. This should move the server closer to a 
"at-least once" processor. 
    
    My limited knowledge (i.e. nearly lack of) of concurrent programming is 
seriously limiting my endeavours but hopefully this processor will get there!
    
    May I ask you to have a look ?
    
    Core of the changes are here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to