Hi Could you add some logging in the finally block? As shown below:
} finally { // need to ensure we release resources try { if (folder.isOpen()) { LOG.info("Closing folder in finally block"); folder.close(true); } } catch (Exception e) { // some mail servers will lock the folder so we ignore in this case (CAMEL-1263) LOG.debug("Could not close mailbox folder: " + folder.getName(), e); } I would like to see if the finally block closes the folder, before the mail onCompletion task is executed. On Wed, Sep 14, 2011 at 7:53 AM, kevin707 <wenbo...@vip.sina.com> wrote: > Thanks Claus, > > It is nice to hear from you. > > We are using grails plugin routing 1.1.3,which targets camel 2.5.0. > > Following your advice, we copied 2.8.0 code to 2.5.0 and build with mvn. > still have problem: > > 2011-09-14 12:25:57,042 [Camel Thread 34 - seda://email_process] ERROR > seda.SedaConsumer - Error processing exchange. Exchange[Message: > TWFpbCBEZWxpdmVyeSBTdWJzeXN0ZW0gPE1BSUxFUi1EQUVNT05AaXBlYy5jbj4gICBUaGUgb3JpZ2luYWwgbWVzc2FnZSB3YXMgcmVjZWl2ZWQgYXQgRnJpLCA5IFNlcCAyMDExIDExOjA2OjQ1ICswODAwDQpmcm9tIFsyMDIuMTM2LjYyLjI0M10NCg0KICAgLS0tLS0gVGhlIGZvbGxvd2luZyBhZGRyZXNzZXMgaGFkIHBlcm1hbmVudCBmYXRhbCBlcnJvcnMgLS0tLS0NCjxzYWxlcy1hbXNoQGFjZW1vbGQuY29tPg0KICAgIChyZWFzb246IDQ1MSA0LjQuMSByZXBseTogcmVhZCBlcnJvciBmcm9tIG1haWwuYWNlbW9sZC5jb20uKQ0KDQogICAtLS0tLSBUcmFuc2NyaXB0IG9mIHNlc3Npb24gZm9sbG93cyAtLS0tLQ0KNDUxIDQuNC4xIHJlcGx5OiByZWFkIGVycm9yIGZyb20gbWFpbC5hY2Vtb2xkLmNvbS4NCjxzYWxlcy1hbXNoQGFjZW1vbGQuY29tPi4uLiBEZWZlcnJlZDogQ29ubmVjdGlvbiByZXNldCBieSBtYWlsLmFjZW1vbGQuY29tLg0KTWVzc2FnZSBjb3VsZCBub3QgYmUgZGVsaXZlcmVkIGZvciA1IGRheXMNCk1lc3NhZ2Ugd2lsbCBiZSBkZWxldGVkIGZyb20gcXVldWUNCg==]. > Caused by: [javax.mail.FolderClosedException - null] > javax.mail.FolderClosedException > at com.sun.mail.imap.IMAPMessage.getProtocol(IMAPMessage.java:133) > at com.sun.mail.imap.IMAPMessage.setFlags(IMAPMessage.java:843) > at javax.mail.Message.setFlag(Message.java:565) > at > org.apache.camel.component.mail.MailConsumer.processCommit(MailConsumer.java:275) > at > org.apache.camel.component.mail.MailConsumer$1.onComplete(MailConsumer.java:172) > at > org.apache.camel.util.UnitOfWorkHelper.doneSynchronizations(UnitOfWorkHelper.java:55) > at > org.apache.camel.impl.DefaultUnitOfWork.done(DefaultUnitOfWork.java:172) > at > org.apache.camel.processor.UnitOfWorkProcessor.doneUow(UnitOfWorkProcessor.java:121) > at > org.apache.camel.processor.UnitOfWorkProcessor.access$000(UnitOfWorkProcessor.java:36) > at > org.apache.camel.processor.UnitOfWorkProcessor$1.done(UnitOfWorkProcessor.java:106) > at > org.apache.camel.processor.DefaultChannel$1.done(DefaultChannel.java:262) > at > org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:330) > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:208) > at > org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:256) > at > org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:99) > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70) > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98) > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89) > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68) > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70) > at > org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:169) > at > org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:111) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:662) > > > the MailConsumer.java: > /** > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > package org.apache.camel.component.mail; > > import java.util.Enumeration; > import java.util.LinkedList; > import java.util.Queue; > import java.util.UUID; > import javax.mail.Flags; > import javax.mail.Folder; > import javax.mail.FolderNotFoundException; > import javax.mail.Header; > import javax.mail.Message; > import javax.mail.MessagingException; > import javax.mail.Store; > import javax.mail.search.FlagTerm; > > import org.apache.camel.BatchConsumer; > import org.apache.camel.Exchange; > import org.apache.camel.Processor; > import org.apache.camel.ShutdownRunningTask; > import org.apache.camel.impl.ScheduledPollConsumer; > import org.apache.camel.spi.ShutdownAware; > import org.apache.camel.spi.Synchronization; > import org.apache.camel.util.CastUtils; > import org.apache.camel.util.ObjectHelper; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > import org.springframework.mail.javamail.JavaMailSenderImpl; > > /** > * A {@link org.apache.camel.Consumer Consumer} which consumes messages from > JavaMail using a > * {@link javax.mail.Transport Transport} and dispatches them to the {@link > Processor} > */ > public class MailConsumer extends ScheduledPollConsumer implements > BatchConsumer, ShutdownAware { > public static final String POP3_UID = "CamelPop3Uid"; > public static final long DEFAULT_CONSUMER_DELAY = 60 * 1000L; > private static final transient Logger LOG = > LoggerFactory.getLogger(MailConsumer.class); > > private final JavaMailSenderImpl sender; > private Folder folder; > private Store store; > private int maxMessagesPerPoll; > private volatile ShutdownRunningTask shutdownRunningTask; > private volatile int pendingExchanges; > > public MailConsumer(MailEndpoint endpoint, Processor processor, > JavaMailSenderImpl sender) { > super(endpoint, processor); > this.sender = sender; > } > > @Override > protected void doStart() throws Exception { > super.doStart(); > } > > @Override > protected void doStop() throws Exception { > if (folder != null && folder.isOpen()) { > folder.close(true); > } > if (store != null && store.isConnected()) { > store.close(); > } > > super.doStop(); > } > > protected int poll() throws Exception { > // must reset for each poll > shutdownRunningTask = null; > pendingExchanges = 0; > int polledMessages = 0; > > ensureIsConnected(); > > if (store == null || folder == null) { > throw new IllegalStateException("MailConsumer did not connect > properly to the MailStore: " > + > getEndpoint().getConfiguration().getMailStoreLogInformation()); > } > > if (LOG.isDebugEnabled()) { > LOG.debug("Polling mailbox folder: " + > getEndpoint().getConfiguration().getMailStoreLogInformation()); > } > > if (getEndpoint().getConfiguration().getFetchSize() == 0) { > LOG.warn("Fetch size is 0 meaning the configuration is set to > poll no new messages at all. Camel will skip this poll."); > return 0; > } > > // ensure folder is open > if (!folder.isOpen()) { > folder.open(Folder.READ_WRITE); > } > > try { > int count = folder.getMessageCount(); > if (count > 0) { > Message[] messages; > > // should we process all messages or only unseen messages > if (getEndpoint().getConfiguration().isUnseen()) { > messages = folder.search(new FlagTerm(new > Flags(Flags.Flag.SEEN), false)); > } else { > messages = folder.getMessages(); > } > > polledMessages = > processBatch(CastUtils.cast(createExchanges(messages))); > } else if (count == -1) { > throw new MessagingException("Folder: " + > folder.getFullName() + " is closed"); > } > } catch (Exception e) { > handleException(e); > } finally { > // need to ensure we release resources > try { > if (folder.isOpen()) { > folder.close(true); > } > } catch (Exception e) { > // some mail servers will lock the folder so we ignore in > this case (CAMEL-1263) > LOG.debug("Could not close mailbox folder: " + > folder.getName(), e); > } > } > > return polledMessages; > } > > public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { > this.maxMessagesPerPoll = maxMessagesPerPoll; > } > > public int processBatch(Queue exchanges) throws Exception { > int total = exchanges.size(); > > // limit if needed > if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) { > LOG.debug("Limiting to maximum messages to poll {} as there was > {} messages in this poll.", maxMessagesPerPoll, total); > total = maxMessagesPerPoll; > } > > for (int index = 0; index < total && isBatchAllowed(); index++) { > // only loop if we are started (allowed to run) > Exchange exchange = ObjectHelper.cast(Exchange.class, > exchanges.poll()); > // add current index and total as properties > exchange.setProperty(Exchange.BATCH_INDEX, index); > exchange.setProperty(Exchange.BATCH_SIZE, total); > exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - > 1); > > // update pending number of exchanges > pendingExchanges = total - index - 1; > > // must use the original message in case we need to workaround a > charset issue when extracting mail content > final Message mail = > exchange.getIn(MailMessage.class).getOriginalMessage(); > > // add on completion to handle after work when the exchange is > done > exchange.addOnCompletion(new Synchronization() { > public void onComplete(Exchange exchange) { > processCommit(mail, exchange); > } > > public void onFailure(Exchange exchange) { > processRollback(mail, exchange); > } > > @Override > public String toString() { > return "MailConsumerOnCompletion"; > } > }); > > // process the exchange > processExchange(exchange); > } > > return total; > } > > public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { > // store a reference what to do in case when shutting down and we > have pending messages > this.shutdownRunningTask = shutdownRunningTask; > // do not defer shutdown > return false; > } > > public int getPendingExchangesSize() { > // only return the real pending size in case we are configured to > complete all tasks > if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) { > return pendingExchanges; > } else { > return 0; > } > } > > public void prepareShutdown() { > // noop > } > > public boolean isBatchAllowed() { > // stop if we are not running > boolean answer = isRunAllowed(); > if (!answer) { > return false; > } > > if (shutdownRunningTask == null) { > // we are not shutting down so continue to run > return true; > } > > // we are shutting down so only continue if we are configured to > complete all tasks > return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask; > } > > protected Queue<Exchange> createExchanges(Message[] messages) throws > MessagingException { > Queue<Exchange> answer = new LinkedList<Exchange>(); > > int fetchSize = getEndpoint().getConfiguration().getFetchSize(); > int count = fetchSize == -1 ? messages.length : Math.min(fetchSize, > messages.length); > > if (LOG.isDebugEnabled()) { > LOG.debug("Fetching {} messages. Total {} messages.", count, > messages.length); > } > > for (int i = 0; i < count; i++) { > Message message = messages[i]; > if (!message.getFlags().contains(Flags.Flag.DELETED)) { > Exchange exchange = getEndpoint().createExchange(message); > > // If the protocol is POP3 we need to remember the uid on > the exchange > // so we can find the mail message again later to be able to > delete it > if > (getEndpoint().getConfiguration().getProtocol().startsWith("pop3")) { > String uid = generatePop3Uid(message); > if (uid != null) { > exchange.setProperty(POP3_UID, uid); > LOG.trace("POP3 mail message using uid {}", uid); > } > } > answer.add(exchange); > } else { > if (LOG.isDebugEnabled()) { > LOG.debug("Skipping message as it was flagged as > deleted: {}", MailUtils.dumpMessage(message)); > } > } > } > > return answer; > } > > /** > * Strategy to process the mail message. > */ > protected void processExchange(Exchange exchange) throws Exception { > if (LOG.isDebugEnabled()) { > MailMessage msg = (MailMessage) exchange.getIn(); > LOG.debug("Processing message: {}", > MailUtils.dumpMessage(msg.getMessage())); > } > getProcessor().process(exchange); > } > > /** > * Strategy to flag the message after being processed. > * > * @param mail the mail message > * @param exchange the exchange > */ > protected void processCommit(Message mail, Exchange exchange) { > try { > // ensure folder is open > if (!folder.isOpen()) { > folder.open(Folder.READ_WRITE); > } > > > > // If the protocol is POP3, the message needs to be synced with > the folder via the UID. > // Otherwise setting the DELETE/SEEN flag won't delete the > message. > String uid = (String) exchange.removeProperty(POP3_UID); > if (uid != null) { > int count = folder.getMessageCount(); > Message found = null; > LOG.trace("Looking for POP3Message with UID {} from folder > with {} mails", uid, count); > for (int i = 1; i <= count; ++i) { > Message msg = folder.getMessage(i); > if (uid.equals(generatePop3Uid(msg))) { > LOG.debug("Found POP3Message with UID {} from folder > with {} mails", uid, count); > found = msg; > break; > } > } > > if (found == null) { > boolean delete = > getEndpoint().getConfiguration().isDelete(); > LOG.warn("POP3message not found in folder. Message > cannot be marked as " + (delete ? "DELETED" : "SEEN")); > } else { > mail = found; > } > } > > if (getEndpoint().getConfiguration().isDelete()) { > LOG.trace("Exchange processed, so flagging message as > DELETED"); > mail.setFlag(Flags.Flag.DELETED, true); > } else { > LOG.trace("Exchange processed, so flagging message as > SEEN"); > mail.setFlag(Flags.Flag.SEEN, true); > } > } catch (MessagingException e) { > LOG.warn("Error occurred during flagging message as > DELETED/SEEN", e); > exchange.setException(e); > } > } > > /** > * Strategy when processing the exchange failed. > * > * @param mail the mail message > * @param exchange the exchange > */ > protected void processRollback(Message mail, Exchange exchange) { > Exception cause = exchange.getException(); > if (cause != null) { > LOG.warn("Exchange failed, so rolling back message status: " + > exchange, cause); > } else { > LOG.warn("Exchange failed, so rolling back message status: " + > exchange); > } > } > > /** > * Generates an UID of the POP3Message > * > * @param message the POP3Message > * @return the generated uid > */ > protected String generatePop3Uid(Message message) { > String uid = null; > > // create an UID based on message headers on the POP3Message, that > ought > // to be unique > StringBuilder buffer = new StringBuilder(); > try { > Enumeration it = message.getAllHeaders(); > while (it.hasMoreElements()) { > Header header = (Header)it.nextElement(); > > buffer.append(header.getName()).append("=").append(header.getValue()).append("\n"); > } > if (buffer.length() > 0) { > LOG.debug("Generating UID from the following:\n" + buffer); > uid = > UUID.nameUUIDFromBytes(buffer.toString().getBytes()).toString(); > } > } catch (MessagingException e) { > LOG.warn("Cannot reader headers from mail message. This > exception will be ignored.", e); > } > > return uid; > } > > private void ensureIsConnected() throws MessagingException { > MailConfiguration config = getEndpoint().getConfiguration(); > > boolean connected = false; > try { > if (store != null && store.isConnected()) { > connected = true; > } > } catch (Exception e) { > LOG.debug("Exception while testing for is connected to > MailStore: " > + > getEndpoint().getConfiguration().getMailStoreLogInformation() > + ". Caused by: " + e.getMessage(), e); > } > > if (!connected) { > // ensure resources get recreated on reconnection > store = null; > folder = null; > > if (LOG.isDebugEnabled()) { > LOG.debug("Connecting to MailStore: {}", > getEndpoint().getConfiguration().getMailStoreLogInformation()); > } > store = sender.getSession().getStore(config.getProtocol()); > store.connect(config.getHost(), config.getPort(), > config.getUsername(), config.getPassword()); > } > > if (folder == null) { > if (LOG.isDebugEnabled()) { > LOG.debug("Getting folder {}", config.getFolderName()); > } > folder = store.getFolder(config.getFolderName()); > if (folder == null || !folder.exists()) { > throw new FolderNotFoundException(folder, "Folder not found > or invalid: " + config.getFolderName()); > } > } > } > > @Override > public MailEndpoint getEndpoint() { > return (MailEndpoint) super.getEndpoint(); > } > > } > > > as for the mailsession, we are trying to test it and still no idea. > > Regards, > KEVIN > > > > > > -- > View this message in context: > http://camel.465427.n5.nabble.com/camel-mail-imap-javax-mail-FolderClosedException-tp4796608p4801671.html > Sent from the Camel - Users mailing list archive at Nabble.com. > -- Claus Ibsen ----------------- FuseSource Email: cib...@fusesource.com Web: http://fusesource.com Twitter: davsclaus, fusenews Blog: http://davsclaus.blogspot.com/ Author of Camel in Action: http://www.manning.com/ibsen/