http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/78fe32c0/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
----------------------------------------------------------------------
diff --git 
a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
 
b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
index ba03629..4711ae0 100644
--- 
a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
+++ 
b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
@@ -1,848 +1,851 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-package org.apache.logging.log4j.flume.appender;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.crypto.Cipher;
-import javax.crypto.SecretKey;
-
-import org.apache.flume.Event;
-import org.apache.flume.event.SimpleEvent;
-import org.apache.logging.log4j.LoggingException;
-import org.apache.logging.log4j.core.appender.ManagerFactory;
-import org.apache.logging.log4j.core.config.Property;
-import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
-import org.apache.logging.log4j.core.config.plugins.util.PluginType;
-import org.apache.logging.log4j.core.util.ExecutorServices;
-import org.apache.logging.log4j.core.util.FileUtils;
-import org.apache.logging.log4j.core.util.Log4jThread;
-import org.apache.logging.log4j.core.util.Log4jThreadFactory;
-import org.apache.logging.log4j.core.util.SecretKeyProvider;
-import org.apache.logging.log4j.util.Strings;
-
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.CursorConfig;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.StatsConfig;
-import com.sleepycat.je.Transaction;
-
-/**
- * Manager that persists data to Berkeley DB before passing it on to Flume.
- */
-public class FlumePersistentManager extends FlumeAvroManager {
-
-    /** Attribute name for the key provider. */
-    public static final String KEY_PROVIDER = "keyProvider";
-
-    private static final Charset UTF8 = StandardCharsets.UTF_8;
-
-    private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
-
-    private static final long SHUTDOWN_WAIT_MILLIS = 60000;
-
-    private static final long LOCK_TIMEOUT_SLEEP_MILLIS = 500;
-
-    private static BDBManagerFactory factory = new BDBManagerFactory();
-
-    private final Database database;
-
-    private final Environment environment;
-
-    private final WriterThread worker;
-
-    private final Gate gate = new Gate();
-
-    private final SecretKey secretKey;
-
-    private final int lockTimeoutRetryCount;
-
-    private final ExecutorService threadPool;
-
-    private final AtomicLong dbCount = new AtomicLong();
-
-    /**
-     * Constructor
-     * @param name The unique name of this manager.
-     * @param shortName Original name for the Manager.
-     * @param agents An array of Agents.
-     * @param batchSize The number of events to include in a batch.
-     * @param retries The number of times to retry connecting before giving up.
-     * @param connectionTimeout The amount of time to wait for a connection to 
be established.
-     * @param requestTimeout The amount of time to wair for a response to a 
request.
-     * @param delay The amount of time to wait between retries.
-     * @param database The database to write to.
-     * @param environment The database environment.
-     * @param secretKey The SecretKey to use for encryption.
-     * @param lockTimeoutRetryCount The number of times to retry a lock 
timeout.
-     */
-    protected FlumePersistentManager(final String name, final String 
shortName, final Agent[] agents,
-                                     final int batchSize, final int retries, 
final int connectionTimeout,
-                                     final int requestTimeout, final int 
delay, final Database database,
-                                     final Environment environment, final 
SecretKey secretKey,
-                                     final int lockTimeoutRetryCount) {
-        super(name, shortName, agents, batchSize, delay, retries, 
connectionTimeout, requestTimeout);
-        this.database = database;
-        this.environment = environment;
-        dbCount.set(database.count());
-        this.worker = new WriterThread(database, environment, this, gate, 
batchSize, secretKey, dbCount,
-            lockTimeoutRetryCount);
-        this.worker.start();
-        this.secretKey = secretKey;
-        this.threadPool = 
Executors.newCachedThreadPool(Log4jThreadFactory.createDaemonThreadFactory("Flume"));
-        this.lockTimeoutRetryCount = lockTimeoutRetryCount;
-    }
-
-
-    /**
-     * Returns a FlumeAvroManager.
-     * @param name The name of the manager.
-     * @param agents The agents to use.
-     * @param properties Properties to pass to the Manager.
-     * @param batchSize The number of events to include in a batch.
-     * @param retries The number of times to retry connecting before giving up.
-     * @param connectionTimeout The amount of time to wait to establish a 
connection.
-     * @param requestTimeout The amount of time to wait for a response to a 
request.
-     * @param delayMillis Amount of time to delay before delivering a batch.
-     * @param lockTimeoutRetryCount The number of times to retry after a lock 
timeout.
-     * @param dataDir The location of the Berkeley database.
-     * @return A FlumeAvroManager.
-     */
-    public static FlumePersistentManager getManager(final String name, final 
Agent[] agents,
-                                                    final Property[] 
properties, int batchSize, final int retries,
-                                                    final int 
connectionTimeout, final int requestTimeout,
-                                                    final int delayMillis, 
final int lockTimeoutRetryCount,
-                                                    final String dataDir) {
-        if (agents == null || agents.length == 0) {
-            throw new IllegalArgumentException("At least one agent is 
required");
-        }
-
-        if (batchSize <= 0) {
-            batchSize = 1;
-        }
-        final String dataDirectory = Strings.isEmpty(dataDir) ? 
DEFAULT_DATA_DIR : dataDir;
-
-        final StringBuilder sb = new StringBuilder("FlumePersistent[");
-        boolean first = true;
-        for (final Agent agent : agents) {
-            if (!first) {
-                sb.append(',');
-            }
-            sb.append(agent.getHost()).append(':').append(agent.getPort());
-            first = false;
-        }
-        sb.append(']');
-        sb.append(' ').append(dataDirectory);
-        return getManager(sb.toString(), factory, new FactoryData(name, 
agents, batchSize, retries,
-            connectionTimeout, requestTimeout, delayMillis, 
lockTimeoutRetryCount, dataDir, properties));
-    }
-
-    @Override
-    public void send(final Event event)  {
-        if (worker.isShutdown()) {
-            throw new LoggingException("Unable to record event");
-        }
-
-        final Map<String, String> headers = event.getHeaders();
-        final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
-        try {
-            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            final DataOutputStream daos = new DataOutputStream(baos);
-            daos.writeInt(event.getBody().length);
-            daos.write(event.getBody(), 0, event.getBody().length);
-            daos.writeInt(event.getHeaders().size());
-            for (final Map.Entry<String, String> entry : headers.entrySet()) {
-                daos.writeUTF(entry.getKey());
-                daos.writeUTF(entry.getValue());
-            }
-            byte[] eventData = baos.toByteArray();
-            if (secretKey != null) {
-                final Cipher cipher = Cipher.getInstance("AES");
-                cipher.init(Cipher.ENCRYPT_MODE, secretKey);
-                eventData = cipher.doFinal(eventData);
-            }
-            final Future<Integer> future = threadPool.submit(new 
BDBWriter(keyData, eventData, environment, database,
-                gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
-            boolean interrupted = false;
-            int ieCount = 0;
-            do {
-                try {
-                    future.get();
-                } catch (final InterruptedException ie) {
-                    interrupted = true;
-                    ++ieCount;
-                }
-            } while (interrupted && ieCount <= 1);
-
-        } catch (final Exception ex) {
-            throw new LoggingException("Exception occurred writing log event", 
ex);
-        }
-    }
-
-    @Override
-    protected void releaseSub(final long timeout, final TimeUnit timeUnit) {
-        LOGGER.debug("Shutting down FlumePersistentManager");
-        worker.shutdown();
-        final long requestedTimeoutMillis = timeUnit.toMillis(timeout);
-        final long shutdownWaitMillis = requestedTimeoutMillis < 0 ? 
SHUTDOWN_WAIT_MILLIS : requestedTimeoutMillis;
-               try {
-            worker.join(shutdownWaitMillis);
-        } catch (final InterruptedException ie) {
-            // Ignore the exception and shutdown.
-        }
-        ExecutorServices.shutdown(threadPool, shutdownWaitMillis, 
TimeUnit.MILLISECONDS, toString());
-        try {
-            worker.join();
-        } catch (final InterruptedException ex) {
-            logDebug("interrupted while waiting for worker to complete", ex);
-        }
-        try {
-            LOGGER.debug("FlumePersistenceManager dataset status: {}", 
database.getStats(new StatsConfig()));
-            database.close();
-        } catch (final Exception ex) {
-            logWarn("Failed to close database", ex);
-        }
-        try {
-            environment.cleanLog();
-            environment.close();
-        } catch (final Exception ex) {
-            logWarn("Failed to close environment", ex);
-        }
-        super.releaseSub(timeout, timeUnit);
-    }
-
-    private void doSend(final SimpleEvent event) {
-        LOGGER.debug("Sending event to Flume");
-        super.send(event);
-    }
-
-    /**
-     * Thread for writing to Berkeley DB to avoid having interrupts close the 
database.
-     */
-    private static class BDBWriter implements Callable<Integer> {
-        private final byte[] eventData;
-        private final byte[] keyData;
-        private final Environment environment;
-        private final Database database;
-        private final Gate gate;
-        private final AtomicLong dbCount;
-        private final long batchSize;
-        private final int lockTimeoutRetryCount;
-
-        public BDBWriter(final byte[] keyData, final byte[] eventData, final 
Environment environment,
-                         final Database database, final Gate gate, final 
AtomicLong dbCount, final long batchSize,
-                         final int lockTimeoutRetryCount) {
-            this.keyData = keyData;
-            this.eventData = eventData;
-            this.environment = environment;
-            this.database = database;
-            this.gate = gate;
-            this.dbCount = dbCount;
-            this.batchSize = batchSize;
-            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
-        }
-
-        @Override
-        public Integer call() throws Exception {
-            final DatabaseEntry key = new DatabaseEntry(keyData);
-            final DatabaseEntry data = new DatabaseEntry(eventData);
-            Exception exception = null;
-            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; 
++retryIndex) {
-                Transaction txn = null;
-                try {
-                    txn = environment.beginTransaction(null, null);
-                    try {
-                        database.put(txn, key, data);
-                        txn.commit();
-                        txn = null;
-                        if (dbCount.incrementAndGet() >= batchSize) {
-                            gate.open();
-                        }
-                        exception = null;
-                        break;
-                    } catch (final LockConflictException lce) {
-                        exception = lce;
-                        // Fall through and retry.
-                    } catch (final Exception ex) {
-                        if (txn != null) {
-                            txn.abort();
-                        }
-                        throw ex;
-                    } finally {
-                        if (txn != null) {
-                            txn.abort();
-                            txn = null;
-                        }
-                    }
-                } catch (final LockConflictException lce) {
-                    exception = lce;
-                    if (txn != null) {
-                        try {
-                            txn.abort();
-                            txn = null;
-                        } catch (final Exception ex) {
-                            LOGGER.trace("Ignoring exception while aborting 
transaction during lock conflict.");
-                        }
-                    }
-
-                }
-                try {
-                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
-                } catch (final InterruptedException ie) {
-                    // Ignore the error
-                }
-            }
-            if (exception != null) {
-                throw exception;
-            }
-            return eventData.length;
-        }
-    }
-
-    /**
-     * Factory data.
-     */
-    private static class FactoryData {
-        private final String name;
-        private final Agent[] agents;
-        private final int batchSize;
-        private final String dataDir;
-        private final int retries;
-        private final int connectionTimeout;
-        private final int requestTimeout;
-        private final int delayMillis;
-        private final int lockTimeoutRetryCount;
-        private final Property[] properties;
-
-        /**
-         * Constructor.
-         * @param name The name of the Appender.
-         * @param agents The agents.
-         * @param batchSize The number of events to include in a batch.
-         * @param dataDir The directory for data.
-         */
-        public FactoryData(final String name, final Agent[] agents, final int 
batchSize, final int retries,
-                           final int connectionTimeout, final int 
requestTimeout, final int delayMillis,
-                           final int lockTimeoutRetryCount, final String 
dataDir, final Property[] properties) {
-            this.name = name;
-            this.agents = agents;
-            this.batchSize = batchSize;
-            this.dataDir = dataDir;
-            this.retries = retries;
-            this.connectionTimeout = connectionTimeout;
-            this.requestTimeout = requestTimeout;
-            this.delayMillis = delayMillis;
-            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
-            this.properties = properties;
-        }
-    }
-
-    /**
-     * Avro Manager Factory.
-     */
-    private static class BDBManagerFactory implements 
ManagerFactory<FlumePersistentManager, FactoryData> {
-
-        /**
-         * Create the FlumeKratiManager.
-         * @param name The name of the entity to manage.
-         * @param data The data required to create the entity.
-         * @return The FlumeKratiManager.
-         */
-        @Override
-        public FlumePersistentManager createManager(final String name, final 
FactoryData data) {
-            SecretKey secretKey = null;
-            Database database = null;
-            Environment environment = null;
-
-            final Map<String, String> properties = new HashMap<>();
-            if (data.properties != null) {
-                for (final Property property : data.properties) {
-                    properties.put(property.getName(), property.getValue());
-                }
-            }
-
-            try {
-                final File dir = new File(data.dataDir);
-                FileUtils.mkdir(dir, true);
-                final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
-                dbEnvConfig.setTransactional(true);
-                dbEnvConfig.setAllowCreate(true);
-                dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
-                environment = new Environment(dir, dbEnvConfig);
-                final DatabaseConfig dbConfig = new DatabaseConfig();
-                dbConfig.setTransactional(true);
-                dbConfig.setAllowCreate(true);
-                database = environment.openDatabase(null, name, dbConfig);
-            } catch (final Exception ex) {
-                LOGGER.error("Could not create FlumePersistentManager", ex);
-                // For consistency, close database as well as environment even 
though it should never happen since the
-                // database is that last thing in the block above, but this 
does guard against a future line being
-                // inserted at the end that would bomb (like some debug 
logging).
-                if (database != null) {
-                    database.close();
-                    database = null;
-                }
-                if (environment != null) {
-                    environment.close();
-                    environment = null;
-                }
-                return null;
-            }
-
-            try {
-                String key = null;
-                for (final Map.Entry<String, String> entry : 
properties.entrySet()) {
-                    if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
-                        key = entry.getValue();
-                        break;
-                    }
-                }
-                if (key != null) {
-                    final PluginManager manager = new 
PluginManager("KeyProvider");
-                    manager.collectPlugins();
-                    final Map<String, PluginType<?>> plugins = 
manager.getPlugins();
-                    if (plugins != null) {
-                        boolean found = false;
-                        for (final Map.Entry<String, PluginType<?>> entry : 
plugins.entrySet()) {
-                            if (entry.getKey().equalsIgnoreCase(key)) {
-                                found = true;
-                                final Class<?> cl = 
entry.getValue().getPluginClass();
-                                try {
-                                    final SecretKeyProvider provider = 
(SecretKeyProvider) cl.newInstance();
-                                    secretKey = provider.getSecretKey();
-                                    LOGGER.debug("Persisting events using 
SecretKeyProvider {}", cl.getName());
-                                } catch (final Exception ex) {
-                                    LOGGER.error("Unable to create 
SecretKeyProvider {}, encryption will be disabled",
-                                        cl.getName());
-                                }
-                                break;
-                            }
-                        }
-                        if (!found) {
-                            LOGGER.error("Unable to locate SecretKey provider 
{}, encryption will be disabled", key);
-                        }
-                    } else {
-                        LOGGER.error("Unable to locate SecretKey provider {}, 
encryption will be disabled", key);
-                    }
-                }
-            } catch (final Exception ex) {
-                LOGGER.warn("Error setting up encryption - encryption will be 
disabled", ex);
-            }
-            return new FlumePersistentManager(name, data.name, data.agents, 
data.batchSize, data.retries,
-                data.connectionTimeout, data.requestTimeout, data.delayMillis, 
database, environment, secretKey,
-                data.lockTimeoutRetryCount);
-        }
-    }
-
-    /**
-     * Thread that sends data to Flume and pulls it from Berkeley DB.
-     */
-    private static class WriterThread extends Log4jThread  {
-        private volatile boolean shutdown = false;
-        private final Database database;
-        private final Environment environment;
-        private final FlumePersistentManager manager;
-        private final Gate gate;
-        private final SecretKey secretKey;
-        private final int batchSize;
-        private final AtomicLong dbCounter;
-        private final int lockTimeoutRetryCount;
-
-        public WriterThread(final Database database, final Environment 
environment,
-                            final FlumePersistentManager manager, final Gate 
gate, final int batchsize,
-                            final SecretKey secretKey, final AtomicLong 
dbCount, final int lockTimeoutRetryCount) {
-            super("FlumePersistentManager-Writer");
-            this.database = database;
-            this.environment = environment;
-            this.manager = manager;
-            this.gate = gate;
-            this.batchSize = batchsize;
-            this.secretKey = secretKey;
-            this.setDaemon(true);
-            this.dbCounter = dbCount;
-            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
-        }
-
-        public void shutdown() {
-            LOGGER.debug("Writer thread shutting down");
-            this.shutdown = true;
-            gate.open();
-        }
-
-        public boolean isShutdown() {
-            return shutdown;
-        }
-
-        @Override
-        public void run() {
-            LOGGER.trace("WriterThread started - batch size = " + batchSize + 
", delayMillis = " + manager.getDelayMillis());
-            long nextBatchMillis = System.currentTimeMillis() + 
manager.getDelayMillis();
-            while (!shutdown) {
-                final long nowMillis = System.currentTimeMillis();
-                final long dbCount = database.count();
-                dbCounter.set(dbCount);
-                if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= 
nowMillis) {
-                    nextBatchMillis = nowMillis + manager.getDelayMillis();
-                    try {
-                        boolean errors = false;
-                        final DatabaseEntry key = new DatabaseEntry();
-                        final DatabaseEntry data = new DatabaseEntry();
-
-                        gate.close();
-                        OperationStatus status;
-                        if (batchSize > 1) {
-                            try {
-                                errors = sendBatch(key, data);
-                            } catch (final Exception ex) {
-                                break;
-                            }
-                        } else {
-                            Exception exception = null;
-                            for (int retryIndex = 0; retryIndex < 
lockTimeoutRetryCount; ++retryIndex) {
-                                exception = null;
-                                Transaction txn = null;
-                                Cursor cursor = null;
-                                try {
-                                    txn = environment.beginTransaction(null, 
null);
-                                    cursor = database.openCursor(txn, null);
-                                    try {
-                                        status = cursor.getFirst(key, data, 
LockMode.RMW);
-                                        while (status == 
OperationStatus.SUCCESS) {
-                                            final SimpleEvent event = 
createEvent(data);
-                                            if (event != null) {
-                                                try {
-                                                    manager.doSend(event);
-                                                } catch (final Exception ioe) {
-                                                    errors = true;
-                                                    LOGGER.error("Error 
sending event", ioe);
-                                                    break;
-                                                }
-                                                try {
-                                                    cursor.delete();
-                                                } catch (final Exception ex) {
-                                                    LOGGER.error("Unable to 
delete event", ex);
-                                                }
-                                            }
-                                            status = cursor.getNext(key, data, 
LockMode.RMW);
-                                        }
-                                        if (cursor != null) {
-                                            cursor.close();
-                                            cursor = null;
-                                        }
-                                        txn.commit();
-                                        txn = null;
-                                        dbCounter.decrementAndGet();
-                                        exception = null;
-                                        break;
-                                    } catch (final LockConflictException lce) {
-                                        exception = lce;
-                                        // Fall through and retry.
-                                    } catch (final Exception ex) {
-                                        LOGGER.error("Error reading or writing 
to database", ex);
-                                        shutdown = true;
-                                        break;
-                                    } finally {
-                                        if (cursor != null) {
-                                            cursor.close();
-                                            cursor = null;
-                                        }
-                                        if (txn != null) {
-                                            txn.abort();
-                                            txn = null;
-                                        }
-                                    }
-                                } catch (final LockConflictException lce) {
-                                    exception = lce;
-                                    if (cursor != null) {
-                                        try {
-                                            cursor.close();
-                                            cursor = null;
-                                        } catch (final Exception ex) {
-                                            LOGGER.trace("Ignored exception 
closing cursor during lock conflict.");
-                                        }
-                                    }
-                                    if (txn != null) {
-                                        try {
-                                            txn.abort();
-                                            txn = null;
-                                        } catch (final Exception ex) {
-                                            LOGGER.trace("Ignored exception 
aborting tx during lock conflict.");
-                                        }
-                                    }
-                                }
-                                try {
-                                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
-                                } catch (final InterruptedException ie) {
-                                    // Ignore the error
-                                }
-                            }
-                            if (exception != null) {
-                                LOGGER.error("Unable to read or update data 
base", exception);
-                            }
-                        }
-                        if (errors) {
-                            Thread.sleep(manager.getDelayMillis());
-                            continue;
-                        }
-                    } catch (final Exception ex) {
-                        LOGGER.warn("WriterThread encountered an exception. 
Continuing.", ex);
-                    }
-                } else {
-                    if (nextBatchMillis <= nowMillis) {
-                        nextBatchMillis = nowMillis + manager.getDelayMillis();
-                    }
-                    try {
-                        final long interval = nextBatchMillis - nowMillis;
-                        gate.waitForOpen(interval);
-                    } catch (final InterruptedException ie) {
-                        LOGGER.warn("WriterThread interrupted, continuing");
-                    } catch (final Exception ex) {
-                        LOGGER.error("WriterThread encountered an exception 
waiting for work", ex);
-                        break;
-                    }
-                }
-            }
-
-            if (batchSize > 1 && database.count() > 0) {
-                final DatabaseEntry key = new DatabaseEntry();
-                final DatabaseEntry data = new DatabaseEntry();
-                try {
-                    sendBatch(key, data);
-                } catch (final Exception ex) {
-                    LOGGER.warn("Unable to write final batch");
-                }
-            }
-            LOGGER.trace("WriterThread exiting");
-        }
-
-        private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) 
throws Exception {
-            boolean errors = false;
-            OperationStatus status;
-            Cursor cursor = null;
-            try {
-               final BatchEvent batch = new BatchEvent();
-               for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; 
++retryIndex) {
-                       try {
-                               cursor = database.openCursor(null, 
CursorConfig.DEFAULT);
-                               status = cursor.getFirst(key, data, null);
-
-                               for (int i = 0; status == 
OperationStatus.SUCCESS && i < batchSize; ++i) {
-                                       final SimpleEvent event = 
createEvent(data);
-                                       if (event != null) {
-                                               batch.addEvent(event);
-                                       }
-                                       status = cursor.getNext(key, data, 
null);
-                               }
-                               break;
-                       } catch (final LockConflictException lce) {
-                               if (cursor != null) {
-                                       try {
-                                cursor.close();
-                                cursor = null;
-                            } catch (final Exception ex) {
-                                LOGGER.trace("Ignored exception closing cursor 
during lock conflict.");
-                            }
-                        }
-                    }
-               }
-
-                try {
-                    manager.send(batch);
-                } catch (final Exception ioe) {
-                    LOGGER.error("Error sending events", ioe);
-                    errors = true;
-                }
-                if (!errors) {
-                       if (cursor != null) {
-                           cursor.close();
-                           cursor = null;
-                       }
-                    Transaction txn = null;
-                    Exception exception = null;
-                    for (int retryIndex = 0; retryIndex < 
lockTimeoutRetryCount; ++retryIndex) {
-                        try {
-                            txn = environment.beginTransaction(null, null);
-                            try {
-                                for (final Event event : batch.getEvents()) {
-                                    try {
-                                        final Map<String, String> headers = 
event.getHeaders();
-                                        key = new 
DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
-                                        database.delete(txn, key);
-                                    } catch (final Exception ex) {
-                                        LOGGER.error("Error deleting key from 
database", ex);
-                                    }
-                                }
-                                txn.commit();
-                                long count = dbCounter.get();
-                                while (!dbCounter.compareAndSet(count, count - 
batch.getEvents().size())) {
-                                    count = dbCounter.get();
-                                }
-                                exception = null;
-                                break;
-                            } catch (final LockConflictException lce) {
-                                exception = lce;
-                                if (cursor != null) {
-                                    try {
-                                        cursor.close();
-                                        cursor = null;
-                                    } catch (final Exception ex) {
-                                        LOGGER.trace("Ignored exception 
closing cursor during lock conflict.");
-                                    }
-                                }
-                                if (txn != null) {
-                                    try {
-                                        txn.abort();
-                                        txn = null;
-                                    } catch (final Exception ex) {
-                                        LOGGER.trace("Ignored exception 
aborting transaction during lock conflict.");
-                                    }
-                                }
-                            } catch (final Exception ex) {
-                                LOGGER.error("Unable to commit transaction", 
ex);
-                                if (txn != null) {
-                                    txn.abort();
-                                }
-                            }
-                        } catch (final LockConflictException lce) {
-                            exception = lce;
-                            if (cursor != null) {
-                                try {
-                                    cursor.close();
-                                    cursor = null;
-                                } catch (final Exception ex) {
-                                    LOGGER.trace("Ignored exception closing 
cursor during lock conflict.");
-                                }
-                            }
-                            if (txn != null) {
-                                try {
-                                    txn.abort();
-                                    txn = null;
-                                } catch (final Exception ex) {
-                                    LOGGER.trace("Ignored exception aborting 
transaction during lock conflict.");
-                                }
-                            }
-                        } finally {
-                            if (cursor != null) {
-                                cursor.close();
-                                cursor = null;
-                            }
-                            if (txn != null) {
-                                txn.abort();
-                                txn = null;
-                            }
-                        }
-                        try {
-                            Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
-                        } catch (final InterruptedException ie) {
-                            // Ignore the error
-                        }
-                    }
-                    if (exception != null) {
-                        LOGGER.error("Unable to delete events from data base", 
exception);
-                    }
-                }
-            } catch (final Exception ex) {
-                LOGGER.error("Error reading database", ex);
-                shutdown = true;
-                throw ex;
-            } finally {
-                if (cursor != null) {
-                    cursor.close();
-                }
-            }
-
-            return errors;
-        }
-
-        private SimpleEvent createEvent(final DatabaseEntry data) {
-            final SimpleEvent event = new SimpleEvent();
-            try {
-                byte[] eventData = data.getData();
-                if (secretKey != null) {
-                    final Cipher cipher = Cipher.getInstance("AES");
-                    cipher.init(Cipher.DECRYPT_MODE, secretKey);
-                    eventData = cipher.doFinal(eventData);
-                }
-                final ByteArrayInputStream bais = new 
ByteArrayInputStream(eventData);
-                final DataInputStream dais = new DataInputStream(bais);
-                int length = dais.readInt();
-                final byte[] bytes = new byte[length];
-                dais.read(bytes, 0, length);
-                event.setBody(bytes);
-                length = dais.readInt();
-                final Map<String, String> map = new HashMap<>(length);
-                for (int i = 0; i < length; ++i) {
-                    final String headerKey = dais.readUTF();
-                    final String value = dais.readUTF();
-                    map.put(headerKey, value);
-                }
-                event.setHeaders(map);
-                return event;
-            } catch (final Exception ex) {
-                LOGGER.error("Error retrieving event", ex);
-                return null;
-            }
-        }
-
-    }
-
-    /**
-     * An internal class.
-     */
-    private static class Gate {
-
-        private boolean isOpen = false;
-
-        public boolean isOpen() {
-            return isOpen;
-        }
-
-        public synchronized void open() {
-            isOpen = true;
-            notifyAll();
-        }
-
-        public synchronized void close() {
-            isOpen = false;
-        }
-
-        public synchronized void waitForOpen(final long timeout) throws 
InterruptedException {
-            wait(timeout);
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.crypto.Cipher;
+import javax.crypto.SecretKey;
+
+import org.apache.flume.Event;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.logging.log4j.LoggingException;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
+import org.apache.logging.log4j.core.config.plugins.util.PluginType;
+import org.apache.logging.log4j.core.util.ExecutorServices;
+import org.apache.logging.log4j.core.util.FileUtils;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.core.util.Log4jThreadFactory;
+import org.apache.logging.log4j.core.util.SecretKeyProvider;
+import org.apache.logging.log4j.util.Strings;
+
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.CursorConfig;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockConflictException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.StatsConfig;
+import com.sleepycat.je.Transaction;
+
+/**
+ * Manager that persists data to Berkeley DB before passing it on to Flume.
+ */
+public class FlumePersistentManager extends FlumeAvroManager {
+
+    /** Attribute name for the key provider. */
+    public static final String KEY_PROVIDER = "keyProvider";
+
+    private static final Charset UTF8 = StandardCharsets.UTF_8;
+
+    private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
+
+    private static final long SHUTDOWN_WAIT_MILLIS = 60000;
+
+    private static final long LOCK_TIMEOUT_SLEEP_MILLIS = 500;
+
+    private static BDBManagerFactory factory = new BDBManagerFactory();
+
+    private final Database database;
+
+    private final Environment environment;
+
+    private final WriterThread worker;
+
+    private final Gate gate = new Gate();
+
+    private final SecretKey secretKey;
+
+    private final int lockTimeoutRetryCount;
+
+    private final ExecutorService threadPool;
+
+    private final AtomicLong dbCount = new AtomicLong();
+
+    /**
+     * Constructor
+     * @param name The unique name of this manager.
+     * @param shortName Original name for the Manager.
+     * @param agents An array of Agents.
+     * @param batchSize The number of events to include in a batch.
+     * @param retries The number of times to retry connecting before giving up.
+     * @param connectionTimeout The amount of time to wait for a connection to 
be established.
+     * @param requestTimeout The amount of time to wair for a response to a 
request.
+     * @param delay The amount of time to wait between retries.
+     * @param database The database to write to.
+     * @param environment The database environment.
+     * @param secretKey The SecretKey to use for encryption.
+     * @param lockTimeoutRetryCount The number of times to retry a lock 
timeout.
+     */
+    protected FlumePersistentManager(final String name, final String 
shortName, final Agent[] agents,
+                                     final int batchSize, final int retries, 
final int connectionTimeout,
+                                     final int requestTimeout, final int 
delay, final Database database,
+                                     final Environment environment, final 
SecretKey secretKey,
+                                     final int lockTimeoutRetryCount) {
+        super(name, shortName, agents, batchSize, delay, retries, 
connectionTimeout, requestTimeout);
+        this.database = database;
+        this.environment = environment;
+        dbCount.set(database.count());
+        this.worker = new WriterThread(database, environment, this, gate, 
batchSize, secretKey, dbCount,
+            lockTimeoutRetryCount);
+        this.worker.start();
+        this.secretKey = secretKey;
+        this.threadPool = 
Executors.newCachedThreadPool(Log4jThreadFactory.createDaemonThreadFactory("Flume"));
+        this.lockTimeoutRetryCount = lockTimeoutRetryCount;
+    }
+
+
+    /**
+     * Returns a FlumeAvroManager.
+     * @param name The name of the manager.
+     * @param agents The agents to use.
+     * @param properties Properties to pass to the Manager.
+     * @param batchSize The number of events to include in a batch.
+     * @param retries The number of times to retry connecting before giving up.
+     * @param connectionTimeout The amount of time to wait to establish a 
connection.
+     * @param requestTimeout The amount of time to wait for a response to a 
request.
+     * @param delayMillis Amount of time to delay before delivering a batch.
+     * @param lockTimeoutRetryCount The number of times to retry after a lock 
timeout.
+     * @param dataDir The location of the Berkeley database.
+     * @return A FlumeAvroManager.
+     */
+    public static FlumePersistentManager getManager(final String name, final 
Agent[] agents,
+                                                    final Property[] 
properties, int batchSize, final int retries,
+                                                    final int 
connectionTimeout, final int requestTimeout,
+                                                    final int delayMillis, 
final int lockTimeoutRetryCount,
+                                                    final String dataDir) {
+        if (agents == null || agents.length == 0) {
+            throw new IllegalArgumentException("At least one agent is 
required");
+        }
+
+        if (batchSize <= 0) {
+            batchSize = 1;
+        }
+        final String dataDirectory = Strings.isEmpty(dataDir) ? 
DEFAULT_DATA_DIR : dataDir;
+
+        final StringBuilder sb = new StringBuilder("FlumePersistent[");
+        boolean first = true;
+        for (final Agent agent : agents) {
+            if (!first) {
+                sb.append(',');
+            }
+            sb.append(agent.getHost()).append(':').append(agent.getPort());
+            first = false;
+        }
+        sb.append(']');
+        sb.append(' ').append(dataDirectory);
+        return getManager(sb.toString(), factory, new FactoryData(name, 
agents, batchSize, retries,
+            connectionTimeout, requestTimeout, delayMillis, 
lockTimeoutRetryCount, dataDir, properties));
+    }
+
+    @Override
+    public void send(final Event event)  {
+        if (worker.isShutdown()) {
+            throw new LoggingException("Unable to record event");
+        }
+
+        final Map<String, String> headers = event.getHeaders();
+        final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
+        try {
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            final DataOutputStream daos = new DataOutputStream(baos);
+            daos.writeInt(event.getBody().length);
+            daos.write(event.getBody(), 0, event.getBody().length);
+            daos.writeInt(event.getHeaders().size());
+            for (final Map.Entry<String, String> entry : headers.entrySet()) {
+                daos.writeUTF(entry.getKey());
+                daos.writeUTF(entry.getValue());
+            }
+            byte[] eventData = baos.toByteArray();
+            if (secretKey != null) {
+                final Cipher cipher = Cipher.getInstance("AES");
+                cipher.init(Cipher.ENCRYPT_MODE, secretKey);
+                eventData = cipher.doFinal(eventData);
+            }
+            final Future<Integer> future = threadPool.submit(new 
BDBWriter(keyData, eventData, environment, database,
+                gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
+            boolean interrupted = false;
+            int ieCount = 0;
+            do {
+                try {
+                    future.get();
+                } catch (final InterruptedException ie) {
+                    interrupted = true;
+                    ++ieCount;
+                }
+            } while (interrupted && ieCount <= 1);
+
+        } catch (final Exception ex) {
+            throw new LoggingException("Exception occurred writing log event", 
ex);
+        }
+    }
+
+    @Override
+    protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
+       boolean closed = true;
+        LOGGER.debug("Shutting down FlumePersistentManager");
+        worker.shutdown();
+        final long requestedTimeoutMillis = timeUnit.toMillis(timeout);
+        final long shutdownWaitMillis = requestedTimeoutMillis < 0 ? 
SHUTDOWN_WAIT_MILLIS : requestedTimeoutMillis;
+               try {
+            worker.join(shutdownWaitMillis);
+        } catch (final InterruptedException ie) {
+            // Ignore the exception and shutdown.
+        }
+        ExecutorServices.shutdown(threadPool, shutdownWaitMillis, 
TimeUnit.MILLISECONDS, toString());
+        try {
+            worker.join();
+        } catch (final InterruptedException ex) {
+            logDebug("interrupted while waiting for worker to complete", ex);
+        }
+        try {
+            LOGGER.debug("FlumePersistenceManager dataset status: {}", 
database.getStats(new StatsConfig()));
+            database.close();
+        } catch (final Exception ex) {
+            logWarn("Failed to close database", ex);
+            closed = false;
+        }
+        try {
+            environment.cleanLog();
+            environment.close();
+        } catch (final Exception ex) {
+            logWarn("Failed to close environment", ex);
+            closed = false;
+        }
+        return closed && super.releaseSub(timeout, timeUnit);
+    }
+
+    private void doSend(final SimpleEvent event) {
+        LOGGER.debug("Sending event to Flume");
+        super.send(event);
+    }
+
+    /**
+     * Thread for writing to Berkeley DB to avoid having interrupts close the 
database.
+     */
+    private static class BDBWriter implements Callable<Integer> {
+        private final byte[] eventData;
+        private final byte[] keyData;
+        private final Environment environment;
+        private final Database database;
+        private final Gate gate;
+        private final AtomicLong dbCount;
+        private final long batchSize;
+        private final int lockTimeoutRetryCount;
+
+        public BDBWriter(final byte[] keyData, final byte[] eventData, final 
Environment environment,
+                         final Database database, final Gate gate, final 
AtomicLong dbCount, final long batchSize,
+                         final int lockTimeoutRetryCount) {
+            this.keyData = keyData;
+            this.eventData = eventData;
+            this.environment = environment;
+            this.database = database;
+            this.gate = gate;
+            this.dbCount = dbCount;
+            this.batchSize = batchSize;
+            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+            final DatabaseEntry key = new DatabaseEntry(keyData);
+            final DatabaseEntry data = new DatabaseEntry(eventData);
+            Exception exception = null;
+            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; 
++retryIndex) {
+                Transaction txn = null;
+                try {
+                    txn = environment.beginTransaction(null, null);
+                    try {
+                        database.put(txn, key, data);
+                        txn.commit();
+                        txn = null;
+                        if (dbCount.incrementAndGet() >= batchSize) {
+                            gate.open();
+                        }
+                        exception = null;
+                        break;
+                    } catch (final LockConflictException lce) {
+                        exception = lce;
+                        // Fall through and retry.
+                    } catch (final Exception ex) {
+                        if (txn != null) {
+                            txn.abort();
+                        }
+                        throw ex;
+                    } finally {
+                        if (txn != null) {
+                            txn.abort();
+                            txn = null;
+                        }
+                    }
+                } catch (final LockConflictException lce) {
+                    exception = lce;
+                    if (txn != null) {
+                        try {
+                            txn.abort();
+                            txn = null;
+                        } catch (final Exception ex) {
+                            LOGGER.trace("Ignoring exception while aborting 
transaction during lock conflict.");
+                        }
+                    }
+
+                }
+                try {
+                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+                } catch (final InterruptedException ie) {
+                    // Ignore the error
+                }
+            }
+            if (exception != null) {
+                throw exception;
+            }
+            return eventData.length;
+        }
+    }
+
+    /**
+     * Factory data.
+     */
+    private static class FactoryData {
+        private final String name;
+        private final Agent[] agents;
+        private final int batchSize;
+        private final String dataDir;
+        private final int retries;
+        private final int connectionTimeout;
+        private final int requestTimeout;
+        private final int delayMillis;
+        private final int lockTimeoutRetryCount;
+        private final Property[] properties;
+
+        /**
+         * Constructor.
+         * @param name The name of the Appender.
+         * @param agents The agents.
+         * @param batchSize The number of events to include in a batch.
+         * @param dataDir The directory for data.
+         */
+        public FactoryData(final String name, final Agent[] agents, final int 
batchSize, final int retries,
+                           final int connectionTimeout, final int 
requestTimeout, final int delayMillis,
+                           final int lockTimeoutRetryCount, final String 
dataDir, final Property[] properties) {
+            this.name = name;
+            this.agents = agents;
+            this.batchSize = batchSize;
+            this.dataDir = dataDir;
+            this.retries = retries;
+            this.connectionTimeout = connectionTimeout;
+            this.requestTimeout = requestTimeout;
+            this.delayMillis = delayMillis;
+            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
+            this.properties = properties;
+        }
+    }
+
+    /**
+     * Avro Manager Factory.
+     */
+    private static class BDBManagerFactory implements 
ManagerFactory<FlumePersistentManager, FactoryData> {
+
+        /**
+         * Create the FlumeKratiManager.
+         * @param name The name of the entity to manage.
+         * @param data The data required to create the entity.
+         * @return The FlumeKratiManager.
+         */
+        @Override
+        public FlumePersistentManager createManager(final String name, final 
FactoryData data) {
+            SecretKey secretKey = null;
+            Database database = null;
+            Environment environment = null;
+
+            final Map<String, String> properties = new HashMap<>();
+            if (data.properties != null) {
+                for (final Property property : data.properties) {
+                    properties.put(property.getName(), property.getValue());
+                }
+            }
+
+            try {
+                final File dir = new File(data.dataDir);
+                FileUtils.mkdir(dir, true);
+                final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
+                dbEnvConfig.setTransactional(true);
+                dbEnvConfig.setAllowCreate(true);
+                dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
+                environment = new Environment(dir, dbEnvConfig);
+                final DatabaseConfig dbConfig = new DatabaseConfig();
+                dbConfig.setTransactional(true);
+                dbConfig.setAllowCreate(true);
+                database = environment.openDatabase(null, name, dbConfig);
+            } catch (final Exception ex) {
+                LOGGER.error("Could not create FlumePersistentManager", ex);
+                // For consistency, close database as well as environment even 
though it should never happen since the
+                // database is that last thing in the block above, but this 
does guard against a future line being
+                // inserted at the end that would bomb (like some debug 
logging).
+                if (database != null) {
+                    database.close();
+                    database = null;
+                }
+                if (environment != null) {
+                    environment.close();
+                    environment = null;
+                }
+                return null;
+            }
+
+            try {
+                String key = null;
+                for (final Map.Entry<String, String> entry : 
properties.entrySet()) {
+                    if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
+                        key = entry.getValue();
+                        break;
+                    }
+                }
+                if (key != null) {
+                    final PluginManager manager = new 
PluginManager("KeyProvider");
+                    manager.collectPlugins();
+                    final Map<String, PluginType<?>> plugins = 
manager.getPlugins();
+                    if (plugins != null) {
+                        boolean found = false;
+                        for (final Map.Entry<String, PluginType<?>> entry : 
plugins.entrySet()) {
+                            if (entry.getKey().equalsIgnoreCase(key)) {
+                                found = true;
+                                final Class<?> cl = 
entry.getValue().getPluginClass();
+                                try {
+                                    final SecretKeyProvider provider = 
(SecretKeyProvider) cl.newInstance();
+                                    secretKey = provider.getSecretKey();
+                                    LOGGER.debug("Persisting events using 
SecretKeyProvider {}", cl.getName());
+                                } catch (final Exception ex) {
+                                    LOGGER.error("Unable to create 
SecretKeyProvider {}, encryption will be disabled",
+                                        cl.getName());
+                                }
+                                break;
+                            }
+                        }
+                        if (!found) {
+                            LOGGER.error("Unable to locate SecretKey provider 
{}, encryption will be disabled", key);
+                        }
+                    } else {
+                        LOGGER.error("Unable to locate SecretKey provider {}, 
encryption will be disabled", key);
+                    }
+                }
+            } catch (final Exception ex) {
+                LOGGER.warn("Error setting up encryption - encryption will be 
disabled", ex);
+            }
+            return new FlumePersistentManager(name, data.name, data.agents, 
data.batchSize, data.retries,
+                data.connectionTimeout, data.requestTimeout, data.delayMillis, 
database, environment, secretKey,
+                data.lockTimeoutRetryCount);
+        }
+    }
+
+    /**
+     * Thread that sends data to Flume and pulls it from Berkeley DB.
+     */
+    private static class WriterThread extends Log4jThread  {
+        private volatile boolean shutdown = false;
+        private final Database database;
+        private final Environment environment;
+        private final FlumePersistentManager manager;
+        private final Gate gate;
+        private final SecretKey secretKey;
+        private final int batchSize;
+        private final AtomicLong dbCounter;
+        private final int lockTimeoutRetryCount;
+
+        public WriterThread(final Database database, final Environment 
environment,
+                            final FlumePersistentManager manager, final Gate 
gate, final int batchsize,
+                            final SecretKey secretKey, final AtomicLong 
dbCount, final int lockTimeoutRetryCount) {
+            super("FlumePersistentManager-Writer");
+            this.database = database;
+            this.environment = environment;
+            this.manager = manager;
+            this.gate = gate;
+            this.batchSize = batchsize;
+            this.secretKey = secretKey;
+            this.setDaemon(true);
+            this.dbCounter = dbCount;
+            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
+        }
+
+        public void shutdown() {
+            LOGGER.debug("Writer thread shutting down");
+            this.shutdown = true;
+            gate.open();
+        }
+
+        public boolean isShutdown() {
+            return shutdown;
+        }
+
+        @Override
+        public void run() {
+            LOGGER.trace("WriterThread started - batch size = " + batchSize + 
", delayMillis = " + manager.getDelayMillis());
+            long nextBatchMillis = System.currentTimeMillis() + 
manager.getDelayMillis();
+            while (!shutdown) {
+                final long nowMillis = System.currentTimeMillis();
+                final long dbCount = database.count();
+                dbCounter.set(dbCount);
+                if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= 
nowMillis) {
+                    nextBatchMillis = nowMillis + manager.getDelayMillis();
+                    try {
+                        boolean errors = false;
+                        final DatabaseEntry key = new DatabaseEntry();
+                        final DatabaseEntry data = new DatabaseEntry();
+
+                        gate.close();
+                        OperationStatus status;
+                        if (batchSize > 1) {
+                            try {
+                                errors = sendBatch(key, data);
+                            } catch (final Exception ex) {
+                                break;
+                            }
+                        } else {
+                            Exception exception = null;
+                            for (int retryIndex = 0; retryIndex < 
lockTimeoutRetryCount; ++retryIndex) {
+                                exception = null;
+                                Transaction txn = null;
+                                Cursor cursor = null;
+                                try {
+                                    txn = environment.beginTransaction(null, 
null);
+                                    cursor = database.openCursor(txn, null);
+                                    try {
+                                        status = cursor.getFirst(key, data, 
LockMode.RMW);
+                                        while (status == 
OperationStatus.SUCCESS) {
+                                            final SimpleEvent event = 
createEvent(data);
+                                            if (event != null) {
+                                                try {
+                                                    manager.doSend(event);
+                                                } catch (final Exception ioe) {
+                                                    errors = true;
+                                                    LOGGER.error("Error 
sending event", ioe);
+                                                    break;
+                                                }
+                                                try {
+                                                    cursor.delete();
+                                                } catch (final Exception ex) {
+                                                    LOGGER.error("Unable to 
delete event", ex);
+                                                }
+                                            }
+                                            status = cursor.getNext(key, data, 
LockMode.RMW);
+                                        }
+                                        if (cursor != null) {
+                                            cursor.close();
+                                            cursor = null;
+                                        }
+                                        txn.commit();
+                                        txn = null;
+                                        dbCounter.decrementAndGet();
+                                        exception = null;
+                                        break;
+                                    } catch (final LockConflictException lce) {
+                                        exception = lce;
+                                        // Fall through and retry.
+                                    } catch (final Exception ex) {
+                                        LOGGER.error("Error reading or writing 
to database", ex);
+                                        shutdown = true;
+                                        break;
+                                    } finally {
+                                        if (cursor != null) {
+                                            cursor.close();
+                                            cursor = null;
+                                        }
+                                        if (txn != null) {
+                                            txn.abort();
+                                            txn = null;
+                                        }
+                                    }
+                                } catch (final LockConflictException lce) {
+                                    exception = lce;
+                                    if (cursor != null) {
+                                        try {
+                                            cursor.close();
+                                            cursor = null;
+                                        } catch (final Exception ex) {
+                                            LOGGER.trace("Ignored exception 
closing cursor during lock conflict.");
+                                        }
+                                    }
+                                    if (txn != null) {
+                                        try {
+                                            txn.abort();
+                                            txn = null;
+                                        } catch (final Exception ex) {
+                                            LOGGER.trace("Ignored exception 
aborting tx during lock conflict.");
+                                        }
+                                    }
+                                }
+                                try {
+                                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+                                } catch (final InterruptedException ie) {
+                                    // Ignore the error
+                                }
+                            }
+                            if (exception != null) {
+                                LOGGER.error("Unable to read or update data 
base", exception);
+                            }
+                        }
+                        if (errors) {
+                            Thread.sleep(manager.getDelayMillis());
+                            continue;
+                        }
+                    } catch (final Exception ex) {
+                        LOGGER.warn("WriterThread encountered an exception. 
Continuing.", ex);
+                    }
+                } else {
+                    if (nextBatchMillis <= nowMillis) {
+                        nextBatchMillis = nowMillis + manager.getDelayMillis();
+                    }
+                    try {
+                        final long interval = nextBatchMillis - nowMillis;
+                        gate.waitForOpen(interval);
+                    } catch (final InterruptedException ie) {
+                        LOGGER.warn("WriterThread interrupted, continuing");
+                    } catch (final Exception ex) {
+                        LOGGER.error("WriterThread encountered an exception 
waiting for work", ex);
+                        break;
+                    }
+                }
+            }
+
+            if (batchSize > 1 && database.count() > 0) {
+                final DatabaseEntry key = new DatabaseEntry();
+                final DatabaseEntry data = new DatabaseEntry();
+                try {
+                    sendBatch(key, data);
+                } catch (final Exception ex) {
+                    LOGGER.warn("Unable to write final batch");
+                }
+            }
+            LOGGER.trace("WriterThread exiting");
+        }
+
+        private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) 
throws Exception {
+            boolean errors = false;
+            OperationStatus status;
+            Cursor cursor = null;
+            try {
+               final BatchEvent batch = new BatchEvent();
+               for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; 
++retryIndex) {
+                       try {
+                               cursor = database.openCursor(null, 
CursorConfig.DEFAULT);
+                               status = cursor.getFirst(key, data, null);
+
+                               for (int i = 0; status == 
OperationStatus.SUCCESS && i < batchSize; ++i) {
+                                       final SimpleEvent event = 
createEvent(data);
+                                       if (event != null) {
+                                               batch.addEvent(event);
+                                       }
+                                       status = cursor.getNext(key, data, 
null);
+                               }
+                               break;
+                       } catch (final LockConflictException lce) {
+                               if (cursor != null) {
+                                       try {
+                                cursor.close();
+                                cursor = null;
+                            } catch (final Exception ex) {
+                                LOGGER.trace("Ignored exception closing cursor 
during lock conflict.");
+                            }
+                        }
+                    }
+               }
+
+                try {
+                    manager.send(batch);
+                } catch (final Exception ioe) {
+                    LOGGER.error("Error sending events", ioe);
+                    errors = true;
+                }
+                if (!errors) {
+                       if (cursor != null) {
+                           cursor.close();
+                           cursor = null;
+                       }
+                    Transaction txn = null;
+                    Exception exception = null;
+                    for (int retryIndex = 0; retryIndex < 
lockTimeoutRetryCount; ++retryIndex) {
+                        try {
+                            txn = environment.beginTransaction(null, null);
+                            try {
+                                for (final Event event : batch.getEvents()) {
+                                    try {
+                                        final Map<String, String> headers = 
event.getHeaders();
+                                        key = new 
DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
+                                        database.delete(txn, key);
+                                    } catch (final Exception ex) {
+                                        LOGGER.error("Error deleting key from 
database", ex);
+                                    }
+                                }
+                                txn.commit();
+                                long count = dbCounter.get();
+                                while (!dbCounter.compareAndSet(count, count - 
batch.getEvents().size())) {
+                                    count = dbCounter.get();
+                                }
+                                exception = null;
+                                break;
+                            } catch (final LockConflictException lce) {
+                                exception = lce;
+                                if (cursor != null) {
+                                    try {
+                                        cursor.close();
+                                        cursor = null;
+                                    } catch (final Exception ex) {
+                                        LOGGER.trace("Ignored exception 
closing cursor during lock conflict.");
+                                    }
+                                }
+                                if (txn != null) {
+                                    try {
+                                        txn.abort();
+                                        txn = null;
+                                    } catch (final Exception ex) {
+                                        LOGGER.trace("Ignored exception 
aborting transaction during lock conflict.");
+                                    }
+                                }
+                            } catch (final Exception ex) {
+                                LOGGER.error("Unable to commit transaction", 
ex);
+                                if (txn != null) {
+                                    txn.abort();
+                                }
+                            }
+                        } catch (final LockConflictException lce) {
+                            exception = lce;
+                            if (cursor != null) {
+                                try {
+                                    cursor.close();
+                                    cursor = null;
+                                } catch (final Exception ex) {
+                                    LOGGER.trace("Ignored exception closing 
cursor during lock conflict.");
+                                }
+                            }
+                            if (txn != null) {
+                                try {
+                                    txn.abort();
+                                    txn = null;
+                                } catch (final Exception ex) {
+                                    LOGGER.trace("Ignored exception aborting 
transaction during lock conflict.");
+                                }
+                            }
+                        } finally {
+                            if (cursor != null) {
+                                cursor.close();
+                                cursor = null;
+                            }
+                            if (txn != null) {
+                                txn.abort();
+                                txn = null;
+                            }
+                        }
+                        try {
+                            Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+                        } catch (final InterruptedException ie) {
+                            // Ignore the error
+                        }
+                    }
+                    if (exception != null) {
+                        LOGGER.error("Unable to delete events from data base", 
exception);
+                    }
+                }
+            } catch (final Exception ex) {
+                LOGGER.error("Error reading database", ex);
+                shutdown = true;
+                throw ex;
+            } finally {
+                if (cursor != null) {
+                    cursor.close();
+                }
+            }
+
+            return errors;
+        }
+
+        private SimpleEvent createEvent(final DatabaseEntry data) {
+            final SimpleEvent event = new SimpleEvent();
+            try {
+                byte[] eventData = data.getData();
+                if (secretKey != null) {
+                    final Cipher cipher = Cipher.getInstance("AES");
+                    cipher.init(Cipher.DECRYPT_MODE, secretKey);
+                    eventData = cipher.doFinal(eventData);
+                }
+                final ByteArrayInputStream bais = new 
ByteArrayInputStream(eventData);
+                final DataInputStream dais = new DataInputStream(bais);
+                int length = dais.readInt();
+                final byte[] bytes = new byte[length];
+                dais.read(bytes, 0, length);
+                event.setBody(bytes);
+                length = dais.readInt();
+                final Map<String, String> map = new HashMap<>(length);
+                for (int i = 0; i < length; ++i) {
+                    final String headerKey = dais.readUTF();
+                    final String value = dais.readUTF();
+                    map.put(headerKey, value);
+                }
+                event.setHeaders(map);
+                return event;
+            } catch (final Exception ex) {
+                LOGGER.error("Error retrieving event", ex);
+                return null;
+            }
+        }
+
+    }
+
+    /**
+     * An internal class.
+     */
+    private static class Gate {
+
+        private boolean isOpen = false;
+
+        public boolean isOpen() {
+            return isOpen;
+        }
+
+        public synchronized void open() {
+            isOpen = true;
+            notifyAll();
+        }
+
+        public synchronized void close() {
+            isOpen = false;
+        }
+
+        public synchronized void waitForOpen(final long timeout) throws 
InterruptedException {
+            wait(timeout);
+        }
+    }
+}

Reply via email to