http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/78fe32c0/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
----------------------------------------------------------------------
diff --git
a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
index ba03629..4711ae0 100644
---
a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
+++
b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
@@ -1,848 +1,851 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-package org.apache.logging.log4j.flume.appender;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.crypto.Cipher;
-import javax.crypto.SecretKey;
-
-import org.apache.flume.Event;
-import org.apache.flume.event.SimpleEvent;
-import org.apache.logging.log4j.LoggingException;
-import org.apache.logging.log4j.core.appender.ManagerFactory;
-import org.apache.logging.log4j.core.config.Property;
-import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
-import org.apache.logging.log4j.core.config.plugins.util.PluginType;
-import org.apache.logging.log4j.core.util.ExecutorServices;
-import org.apache.logging.log4j.core.util.FileUtils;
-import org.apache.logging.log4j.core.util.Log4jThread;
-import org.apache.logging.log4j.core.util.Log4jThreadFactory;
-import org.apache.logging.log4j.core.util.SecretKeyProvider;
-import org.apache.logging.log4j.util.Strings;
-
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.CursorConfig;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.StatsConfig;
-import com.sleepycat.je.Transaction;
-
-/**
- * Manager that persists data to Berkeley DB before passing it on to Flume.
- */
-public class FlumePersistentManager extends FlumeAvroManager {
-
- /** Attribute name for the key provider. */
- public static final String KEY_PROVIDER = "keyProvider";
-
- private static final Charset UTF8 = StandardCharsets.UTF_8;
-
- private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
-
- private static final long SHUTDOWN_WAIT_MILLIS = 60000;
-
- private static final long LOCK_TIMEOUT_SLEEP_MILLIS = 500;
-
- private static BDBManagerFactory factory = new BDBManagerFactory();
-
- private final Database database;
-
- private final Environment environment;
-
- private final WriterThread worker;
-
- private final Gate gate = new Gate();
-
- private final SecretKey secretKey;
-
- private final int lockTimeoutRetryCount;
-
- private final ExecutorService threadPool;
-
- private final AtomicLong dbCount = new AtomicLong();
-
- /**
- * Constructor
- * @param name The unique name of this manager.
- * @param shortName Original name for the Manager.
- * @param agents An array of Agents.
- * @param batchSize The number of events to include in a batch.
- * @param retries The number of times to retry connecting before giving up.
- * @param connectionTimeout The amount of time to wait for a connection to
be established.
- * @param requestTimeout The amount of time to wair for a response to a
request.
- * @param delay The amount of time to wait between retries.
- * @param database The database to write to.
- * @param environment The database environment.
- * @param secretKey The SecretKey to use for encryption.
- * @param lockTimeoutRetryCount The number of times to retry a lock
timeout.
- */
- protected FlumePersistentManager(final String name, final String
shortName, final Agent[] agents,
- final int batchSize, final int retries,
final int connectionTimeout,
- final int requestTimeout, final int
delay, final Database database,
- final Environment environment, final
SecretKey secretKey,
- final int lockTimeoutRetryCount) {
- super(name, shortName, agents, batchSize, delay, retries,
connectionTimeout, requestTimeout);
- this.database = database;
- this.environment = environment;
- dbCount.set(database.count());
- this.worker = new WriterThread(database, environment, this, gate,
batchSize, secretKey, dbCount,
- lockTimeoutRetryCount);
- this.worker.start();
- this.secretKey = secretKey;
- this.threadPool =
Executors.newCachedThreadPool(Log4jThreadFactory.createDaemonThreadFactory("Flume"));
- this.lockTimeoutRetryCount = lockTimeoutRetryCount;
- }
-
-
- /**
- * Returns a FlumeAvroManager.
- * @param name The name of the manager.
- * @param agents The agents to use.
- * @param properties Properties to pass to the Manager.
- * @param batchSize The number of events to include in a batch.
- * @param retries The number of times to retry connecting before giving up.
- * @param connectionTimeout The amount of time to wait to establish a
connection.
- * @param requestTimeout The amount of time to wait for a response to a
request.
- * @param delayMillis Amount of time to delay before delivering a batch.
- * @param lockTimeoutRetryCount The number of times to retry after a lock
timeout.
- * @param dataDir The location of the Berkeley database.
- * @return A FlumeAvroManager.
- */
- public static FlumePersistentManager getManager(final String name, final
Agent[] agents,
- final Property[]
properties, int batchSize, final int retries,
- final int
connectionTimeout, final int requestTimeout,
- final int delayMillis,
final int lockTimeoutRetryCount,
- final String dataDir) {
- if (agents == null || agents.length == 0) {
- throw new IllegalArgumentException("At least one agent is
required");
- }
-
- if (batchSize <= 0) {
- batchSize = 1;
- }
- final String dataDirectory = Strings.isEmpty(dataDir) ?
DEFAULT_DATA_DIR : dataDir;
-
- final StringBuilder sb = new StringBuilder("FlumePersistent[");
- boolean first = true;
- for (final Agent agent : agents) {
- if (!first) {
- sb.append(',');
- }
- sb.append(agent.getHost()).append(':').append(agent.getPort());
- first = false;
- }
- sb.append(']');
- sb.append(' ').append(dataDirectory);
- return getManager(sb.toString(), factory, new FactoryData(name,
agents, batchSize, retries,
- connectionTimeout, requestTimeout, delayMillis,
lockTimeoutRetryCount, dataDir, properties));
- }
-
- @Override
- public void send(final Event event) {
- if (worker.isShutdown()) {
- throw new LoggingException("Unable to record event");
- }
-
- final Map<String, String> headers = event.getHeaders();
- final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
- try {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final DataOutputStream daos = new DataOutputStream(baos);
- daos.writeInt(event.getBody().length);
- daos.write(event.getBody(), 0, event.getBody().length);
- daos.writeInt(event.getHeaders().size());
- for (final Map.Entry<String, String> entry : headers.entrySet()) {
- daos.writeUTF(entry.getKey());
- daos.writeUTF(entry.getValue());
- }
- byte[] eventData = baos.toByteArray();
- if (secretKey != null) {
- final Cipher cipher = Cipher.getInstance("AES");
- cipher.init(Cipher.ENCRYPT_MODE, secretKey);
- eventData = cipher.doFinal(eventData);
- }
- final Future<Integer> future = threadPool.submit(new
BDBWriter(keyData, eventData, environment, database,
- gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
- boolean interrupted = false;
- int ieCount = 0;
- do {
- try {
- future.get();
- } catch (final InterruptedException ie) {
- interrupted = true;
- ++ieCount;
- }
- } while (interrupted && ieCount <= 1);
-
- } catch (final Exception ex) {
- throw new LoggingException("Exception occurred writing log event",
ex);
- }
- }
-
- @Override
- protected void releaseSub(final long timeout, final TimeUnit timeUnit) {
- LOGGER.debug("Shutting down FlumePersistentManager");
- worker.shutdown();
- final long requestedTimeoutMillis = timeUnit.toMillis(timeout);
- final long shutdownWaitMillis = requestedTimeoutMillis < 0 ?
SHUTDOWN_WAIT_MILLIS : requestedTimeoutMillis;
- try {
- worker.join(shutdownWaitMillis);
- } catch (final InterruptedException ie) {
- // Ignore the exception and shutdown.
- }
- ExecutorServices.shutdown(threadPool, shutdownWaitMillis,
TimeUnit.MILLISECONDS, toString());
- try {
- worker.join();
- } catch (final InterruptedException ex) {
- logDebug("interrupted while waiting for worker to complete", ex);
- }
- try {
- LOGGER.debug("FlumePersistenceManager dataset status: {}",
database.getStats(new StatsConfig()));
- database.close();
- } catch (final Exception ex) {
- logWarn("Failed to close database", ex);
- }
- try {
- environment.cleanLog();
- environment.close();
- } catch (final Exception ex) {
- logWarn("Failed to close environment", ex);
- }
- super.releaseSub(timeout, timeUnit);
- }
-
- private void doSend(final SimpleEvent event) {
- LOGGER.debug("Sending event to Flume");
- super.send(event);
- }
-
- /**
- * Thread for writing to Berkeley DB to avoid having interrupts close the
database.
- */
- private static class BDBWriter implements Callable<Integer> {
- private final byte[] eventData;
- private final byte[] keyData;
- private final Environment environment;
- private final Database database;
- private final Gate gate;
- private final AtomicLong dbCount;
- private final long batchSize;
- private final int lockTimeoutRetryCount;
-
- public BDBWriter(final byte[] keyData, final byte[] eventData, final
Environment environment,
- final Database database, final Gate gate, final
AtomicLong dbCount, final long batchSize,
- final int lockTimeoutRetryCount) {
- this.keyData = keyData;
- this.eventData = eventData;
- this.environment = environment;
- this.database = database;
- this.gate = gate;
- this.dbCount = dbCount;
- this.batchSize = batchSize;
- this.lockTimeoutRetryCount = lockTimeoutRetryCount;
- }
-
- @Override
- public Integer call() throws Exception {
- final DatabaseEntry key = new DatabaseEntry(keyData);
- final DatabaseEntry data = new DatabaseEntry(eventData);
- Exception exception = null;
- for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount;
++retryIndex) {
- Transaction txn = null;
- try {
- txn = environment.beginTransaction(null, null);
- try {
- database.put(txn, key, data);
- txn.commit();
- txn = null;
- if (dbCount.incrementAndGet() >= batchSize) {
- gate.open();
- }
- exception = null;
- break;
- } catch (final LockConflictException lce) {
- exception = lce;
- // Fall through and retry.
- } catch (final Exception ex) {
- if (txn != null) {
- txn.abort();
- }
- throw ex;
- } finally {
- if (txn != null) {
- txn.abort();
- txn = null;
- }
- }
- } catch (final LockConflictException lce) {
- exception = lce;
- if (txn != null) {
- try {
- txn.abort();
- txn = null;
- } catch (final Exception ex) {
- LOGGER.trace("Ignoring exception while aborting
transaction during lock conflict.");
- }
- }
-
- }
- try {
- Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
- } catch (final InterruptedException ie) {
- // Ignore the error
- }
- }
- if (exception != null) {
- throw exception;
- }
- return eventData.length;
- }
- }
-
- /**
- * Factory data.
- */
- private static class FactoryData {
- private final String name;
- private final Agent[] agents;
- private final int batchSize;
- private final String dataDir;
- private final int retries;
- private final int connectionTimeout;
- private final int requestTimeout;
- private final int delayMillis;
- private final int lockTimeoutRetryCount;
- private final Property[] properties;
-
- /**
- * Constructor.
- * @param name The name of the Appender.
- * @param agents The agents.
- * @param batchSize The number of events to include in a batch.
- * @param dataDir The directory for data.
- */
- public FactoryData(final String name, final Agent[] agents, final int
batchSize, final int retries,
- final int connectionTimeout, final int
requestTimeout, final int delayMillis,
- final int lockTimeoutRetryCount, final String
dataDir, final Property[] properties) {
- this.name = name;
- this.agents = agents;
- this.batchSize = batchSize;
- this.dataDir = dataDir;
- this.retries = retries;
- this.connectionTimeout = connectionTimeout;
- this.requestTimeout = requestTimeout;
- this.delayMillis = delayMillis;
- this.lockTimeoutRetryCount = lockTimeoutRetryCount;
- this.properties = properties;
- }
- }
-
- /**
- * Avro Manager Factory.
- */
- private static class BDBManagerFactory implements
ManagerFactory<FlumePersistentManager, FactoryData> {
-
- /**
- * Create the FlumeKratiManager.
- * @param name The name of the entity to manage.
- * @param data The data required to create the entity.
- * @return The FlumeKratiManager.
- */
- @Override
- public FlumePersistentManager createManager(final String name, final
FactoryData data) {
- SecretKey secretKey = null;
- Database database = null;
- Environment environment = null;
-
- final Map<String, String> properties = new HashMap<>();
- if (data.properties != null) {
- for (final Property property : data.properties) {
- properties.put(property.getName(), property.getValue());
- }
- }
-
- try {
- final File dir = new File(data.dataDir);
- FileUtils.mkdir(dir, true);
- final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
- dbEnvConfig.setTransactional(true);
- dbEnvConfig.setAllowCreate(true);
- dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
- environment = new Environment(dir, dbEnvConfig);
- final DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
- database = environment.openDatabase(null, name, dbConfig);
- } catch (final Exception ex) {
- LOGGER.error("Could not create FlumePersistentManager", ex);
- // For consistency, close database as well as environment even
though it should never happen since the
- // database is that last thing in the block above, but this
does guard against a future line being
- // inserted at the end that would bomb (like some debug
logging).
- if (database != null) {
- database.close();
- database = null;
- }
- if (environment != null) {
- environment.close();
- environment = null;
- }
- return null;
- }
-
- try {
- String key = null;
- for (final Map.Entry<String, String> entry :
properties.entrySet()) {
- if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
- key = entry.getValue();
- break;
- }
- }
- if (key != null) {
- final PluginManager manager = new
PluginManager("KeyProvider");
- manager.collectPlugins();
- final Map<String, PluginType<?>> plugins =
manager.getPlugins();
- if (plugins != null) {
- boolean found = false;
- for (final Map.Entry<String, PluginType<?>> entry :
plugins.entrySet()) {
- if (entry.getKey().equalsIgnoreCase(key)) {
- found = true;
- final Class<?> cl =
entry.getValue().getPluginClass();
- try {
- final SecretKeyProvider provider =
(SecretKeyProvider) cl.newInstance();
- secretKey = provider.getSecretKey();
- LOGGER.debug("Persisting events using
SecretKeyProvider {}", cl.getName());
- } catch (final Exception ex) {
- LOGGER.error("Unable to create
SecretKeyProvider {}, encryption will be disabled",
- cl.getName());
- }
- break;
- }
- }
- if (!found) {
- LOGGER.error("Unable to locate SecretKey provider
{}, encryption will be disabled", key);
- }
- } else {
- LOGGER.error("Unable to locate SecretKey provider {},
encryption will be disabled", key);
- }
- }
- } catch (final Exception ex) {
- LOGGER.warn("Error setting up encryption - encryption will be
disabled", ex);
- }
- return new FlumePersistentManager(name, data.name, data.agents,
data.batchSize, data.retries,
- data.connectionTimeout, data.requestTimeout, data.delayMillis,
database, environment, secretKey,
- data.lockTimeoutRetryCount);
- }
- }
-
- /**
- * Thread that sends data to Flume and pulls it from Berkeley DB.
- */
- private static class WriterThread extends Log4jThread {
- private volatile boolean shutdown = false;
- private final Database database;
- private final Environment environment;
- private final FlumePersistentManager manager;
- private final Gate gate;
- private final SecretKey secretKey;
- private final int batchSize;
- private final AtomicLong dbCounter;
- private final int lockTimeoutRetryCount;
-
- public WriterThread(final Database database, final Environment
environment,
- final FlumePersistentManager manager, final Gate
gate, final int batchsize,
- final SecretKey secretKey, final AtomicLong
dbCount, final int lockTimeoutRetryCount) {
- super("FlumePersistentManager-Writer");
- this.database = database;
- this.environment = environment;
- this.manager = manager;
- this.gate = gate;
- this.batchSize = batchsize;
- this.secretKey = secretKey;
- this.setDaemon(true);
- this.dbCounter = dbCount;
- this.lockTimeoutRetryCount = lockTimeoutRetryCount;
- }
-
- public void shutdown() {
- LOGGER.debug("Writer thread shutting down");
- this.shutdown = true;
- gate.open();
- }
-
- public boolean isShutdown() {
- return shutdown;
- }
-
- @Override
- public void run() {
- LOGGER.trace("WriterThread started - batch size = " + batchSize +
", delayMillis = " + manager.getDelayMillis());
- long nextBatchMillis = System.currentTimeMillis() +
manager.getDelayMillis();
- while (!shutdown) {
- final long nowMillis = System.currentTimeMillis();
- final long dbCount = database.count();
- dbCounter.set(dbCount);
- if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <=
nowMillis) {
- nextBatchMillis = nowMillis + manager.getDelayMillis();
- try {
- boolean errors = false;
- final DatabaseEntry key = new DatabaseEntry();
- final DatabaseEntry data = new DatabaseEntry();
-
- gate.close();
- OperationStatus status;
- if (batchSize > 1) {
- try {
- errors = sendBatch(key, data);
- } catch (final Exception ex) {
- break;
- }
- } else {
- Exception exception = null;
- for (int retryIndex = 0; retryIndex <
lockTimeoutRetryCount; ++retryIndex) {
- exception = null;
- Transaction txn = null;
- Cursor cursor = null;
- try {
- txn = environment.beginTransaction(null,
null);
- cursor = database.openCursor(txn, null);
- try {
- status = cursor.getFirst(key, data,
LockMode.RMW);
- while (status ==
OperationStatus.SUCCESS) {
- final SimpleEvent event =
createEvent(data);
- if (event != null) {
- try {
- manager.doSend(event);
- } catch (final Exception ioe) {
- errors = true;
- LOGGER.error("Error
sending event", ioe);
- break;
- }
- try {
- cursor.delete();
- } catch (final Exception ex) {
- LOGGER.error("Unable to
delete event", ex);
- }
- }
- status = cursor.getNext(key, data,
LockMode.RMW);
- }
- if (cursor != null) {
- cursor.close();
- cursor = null;
- }
- txn.commit();
- txn = null;
- dbCounter.decrementAndGet();
- exception = null;
- break;
- } catch (final LockConflictException lce) {
- exception = lce;
- // Fall through and retry.
- } catch (final Exception ex) {
- LOGGER.error("Error reading or writing
to database", ex);
- shutdown = true;
- break;
- } finally {
- if (cursor != null) {
- cursor.close();
- cursor = null;
- }
- if (txn != null) {
- txn.abort();
- txn = null;
- }
- }
- } catch (final LockConflictException lce) {
- exception = lce;
- if (cursor != null) {
- try {
- cursor.close();
- cursor = null;
- } catch (final Exception ex) {
- LOGGER.trace("Ignored exception
closing cursor during lock conflict.");
- }
- }
- if (txn != null) {
- try {
- txn.abort();
- txn = null;
- } catch (final Exception ex) {
- LOGGER.trace("Ignored exception
aborting tx during lock conflict.");
- }
- }
- }
- try {
- Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
- } catch (final InterruptedException ie) {
- // Ignore the error
- }
- }
- if (exception != null) {
- LOGGER.error("Unable to read or update data
base", exception);
- }
- }
- if (errors) {
- Thread.sleep(manager.getDelayMillis());
- continue;
- }
- } catch (final Exception ex) {
- LOGGER.warn("WriterThread encountered an exception.
Continuing.", ex);
- }
- } else {
- if (nextBatchMillis <= nowMillis) {
- nextBatchMillis = nowMillis + manager.getDelayMillis();
- }
- try {
- final long interval = nextBatchMillis - nowMillis;
- gate.waitForOpen(interval);
- } catch (final InterruptedException ie) {
- LOGGER.warn("WriterThread interrupted, continuing");
- } catch (final Exception ex) {
- LOGGER.error("WriterThread encountered an exception
waiting for work", ex);
- break;
- }
- }
- }
-
- if (batchSize > 1 && database.count() > 0) {
- final DatabaseEntry key = new DatabaseEntry();
- final DatabaseEntry data = new DatabaseEntry();
- try {
- sendBatch(key, data);
- } catch (final Exception ex) {
- LOGGER.warn("Unable to write final batch");
- }
- }
- LOGGER.trace("WriterThread exiting");
- }
-
- private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data)
throws Exception {
- boolean errors = false;
- OperationStatus status;
- Cursor cursor = null;
- try {
- final BatchEvent batch = new BatchEvent();
- for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount;
++retryIndex) {
- try {
- cursor = database.openCursor(null,
CursorConfig.DEFAULT);
- status = cursor.getFirst(key, data, null);
-
- for (int i = 0; status ==
OperationStatus.SUCCESS && i < batchSize; ++i) {
- final SimpleEvent event =
createEvent(data);
- if (event != null) {
- batch.addEvent(event);
- }
- status = cursor.getNext(key, data,
null);
- }
- break;
- } catch (final LockConflictException lce) {
- if (cursor != null) {
- try {
- cursor.close();
- cursor = null;
- } catch (final Exception ex) {
- LOGGER.trace("Ignored exception closing cursor
during lock conflict.");
- }
- }
- }
- }
-
- try {
- manager.send(batch);
- } catch (final Exception ioe) {
- LOGGER.error("Error sending events", ioe);
- errors = true;
- }
- if (!errors) {
- if (cursor != null) {
- cursor.close();
- cursor = null;
- }
- Transaction txn = null;
- Exception exception = null;
- for (int retryIndex = 0; retryIndex <
lockTimeoutRetryCount; ++retryIndex) {
- try {
- txn = environment.beginTransaction(null, null);
- try {
- for (final Event event : batch.getEvents()) {
- try {
- final Map<String, String> headers =
event.getHeaders();
- key = new
DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
- database.delete(txn, key);
- } catch (final Exception ex) {
- LOGGER.error("Error deleting key from
database", ex);
- }
- }
- txn.commit();
- long count = dbCounter.get();
- while (!dbCounter.compareAndSet(count, count -
batch.getEvents().size())) {
- count = dbCounter.get();
- }
- exception = null;
- break;
- } catch (final LockConflictException lce) {
- exception = lce;
- if (cursor != null) {
- try {
- cursor.close();
- cursor = null;
- } catch (final Exception ex) {
- LOGGER.trace("Ignored exception
closing cursor during lock conflict.");
- }
- }
- if (txn != null) {
- try {
- txn.abort();
- txn = null;
- } catch (final Exception ex) {
- LOGGER.trace("Ignored exception
aborting transaction during lock conflict.");
- }
- }
- } catch (final Exception ex) {
- LOGGER.error("Unable to commit transaction",
ex);
- if (txn != null) {
- txn.abort();
- }
- }
- } catch (final LockConflictException lce) {
- exception = lce;
- if (cursor != null) {
- try {
- cursor.close();
- cursor = null;
- } catch (final Exception ex) {
- LOGGER.trace("Ignored exception closing
cursor during lock conflict.");
- }
- }
- if (txn != null) {
- try {
- txn.abort();
- txn = null;
- } catch (final Exception ex) {
- LOGGER.trace("Ignored exception aborting
transaction during lock conflict.");
- }
- }
- } finally {
- if (cursor != null) {
- cursor.close();
- cursor = null;
- }
- if (txn != null) {
- txn.abort();
- txn = null;
- }
- }
- try {
- Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
- } catch (final InterruptedException ie) {
- // Ignore the error
- }
- }
- if (exception != null) {
- LOGGER.error("Unable to delete events from data base",
exception);
- }
- }
- } catch (final Exception ex) {
- LOGGER.error("Error reading database", ex);
- shutdown = true;
- throw ex;
- } finally {
- if (cursor != null) {
- cursor.close();
- }
- }
-
- return errors;
- }
-
- private SimpleEvent createEvent(final DatabaseEntry data) {
- final SimpleEvent event = new SimpleEvent();
- try {
- byte[] eventData = data.getData();
- if (secretKey != null) {
- final Cipher cipher = Cipher.getInstance("AES");
- cipher.init(Cipher.DECRYPT_MODE, secretKey);
- eventData = cipher.doFinal(eventData);
- }
- final ByteArrayInputStream bais = new
ByteArrayInputStream(eventData);
- final DataInputStream dais = new DataInputStream(bais);
- int length = dais.readInt();
- final byte[] bytes = new byte[length];
- dais.read(bytes, 0, length);
- event.setBody(bytes);
- length = dais.readInt();
- final Map<String, String> map = new HashMap<>(length);
- for (int i = 0; i < length; ++i) {
- final String headerKey = dais.readUTF();
- final String value = dais.readUTF();
- map.put(headerKey, value);
- }
- event.setHeaders(map);
- return event;
- } catch (final Exception ex) {
- LOGGER.error("Error retrieving event", ex);
- return null;
- }
- }
-
- }
-
- /**
- * An internal class.
- */
- private static class Gate {
-
- private boolean isOpen = false;
-
- public boolean isOpen() {
- return isOpen;
- }
-
- public synchronized void open() {
- isOpen = true;
- notifyAll();
- }
-
- public synchronized void close() {
- isOpen = false;
- }
-
- public synchronized void waitForOpen(final long timeout) throws
InterruptedException {
- wait(timeout);
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.crypto.Cipher;
+import javax.crypto.SecretKey;
+
+import org.apache.flume.Event;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.logging.log4j.LoggingException;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
+import org.apache.logging.log4j.core.config.plugins.util.PluginType;
+import org.apache.logging.log4j.core.util.ExecutorServices;
+import org.apache.logging.log4j.core.util.FileUtils;
+import org.apache.logging.log4j.core.util.Log4jThread;
+import org.apache.logging.log4j.core.util.Log4jThreadFactory;
+import org.apache.logging.log4j.core.util.SecretKeyProvider;
+import org.apache.logging.log4j.util.Strings;
+
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.CursorConfig;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockConflictException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.StatsConfig;
+import com.sleepycat.je.Transaction;
+
+/**
+ * Manager that persists data to Berkeley DB before passing it on to Flume.
+ */
+public class FlumePersistentManager extends FlumeAvroManager {
+
+ /** Attribute name for the key provider. */
+ public static final String KEY_PROVIDER = "keyProvider";
+
+ private static final Charset UTF8 = StandardCharsets.UTF_8;
+
+ private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
+
+ private static final long SHUTDOWN_WAIT_MILLIS = 60000;
+
+ private static final long LOCK_TIMEOUT_SLEEP_MILLIS = 500;
+
+ private static BDBManagerFactory factory = new BDBManagerFactory();
+
+ private final Database database;
+
+ private final Environment environment;
+
+ private final WriterThread worker;
+
+ private final Gate gate = new Gate();
+
+ private final SecretKey secretKey;
+
+ private final int lockTimeoutRetryCount;
+
+ private final ExecutorService threadPool;
+
+ private final AtomicLong dbCount = new AtomicLong();
+
+ /**
+ * Constructor
+ * @param name The unique name of this manager.
+ * @param shortName Original name for the Manager.
+ * @param agents An array of Agents.
+ * @param batchSize The number of events to include in a batch.
+ * @param retries The number of times to retry connecting before giving up.
+ * @param connectionTimeout The amount of time to wait for a connection to
be established.
+ * @param requestTimeout The amount of time to wair for a response to a
request.
+ * @param delay The amount of time to wait between retries.
+ * @param database The database to write to.
+ * @param environment The database environment.
+ * @param secretKey The SecretKey to use for encryption.
+ * @param lockTimeoutRetryCount The number of times to retry a lock
timeout.
+ */
+ protected FlumePersistentManager(final String name, final String
shortName, final Agent[] agents,
+ final int batchSize, final int retries,
final int connectionTimeout,
+ final int requestTimeout, final int
delay, final Database database,
+ final Environment environment, final
SecretKey secretKey,
+ final int lockTimeoutRetryCount) {
+ super(name, shortName, agents, batchSize, delay, retries,
connectionTimeout, requestTimeout);
+ this.database = database;
+ this.environment = environment;
+ dbCount.set(database.count());
+ this.worker = new WriterThread(database, environment, this, gate,
batchSize, secretKey, dbCount,
+ lockTimeoutRetryCount);
+ this.worker.start();
+ this.secretKey = secretKey;
+ this.threadPool =
Executors.newCachedThreadPool(Log4jThreadFactory.createDaemonThreadFactory("Flume"));
+ this.lockTimeoutRetryCount = lockTimeoutRetryCount;
+ }
+
+
+ /**
+ * Returns a FlumeAvroManager.
+ * @param name The name of the manager.
+ * @param agents The agents to use.
+ * @param properties Properties to pass to the Manager.
+ * @param batchSize The number of events to include in a batch.
+ * @param retries The number of times to retry connecting before giving up.
+ * @param connectionTimeout The amount of time to wait to establish a
connection.
+ * @param requestTimeout The amount of time to wait for a response to a
request.
+ * @param delayMillis Amount of time to delay before delivering a batch.
+ * @param lockTimeoutRetryCount The number of times to retry after a lock
timeout.
+ * @param dataDir The location of the Berkeley database.
+ * @return A FlumeAvroManager.
+ */
+ public static FlumePersistentManager getManager(final String name, final
Agent[] agents,
+ final Property[]
properties, int batchSize, final int retries,
+ final int
connectionTimeout, final int requestTimeout,
+ final int delayMillis,
final int lockTimeoutRetryCount,
+ final String dataDir) {
+ if (agents == null || agents.length == 0) {
+ throw new IllegalArgumentException("At least one agent is
required");
+ }
+
+ if (batchSize <= 0) {
+ batchSize = 1;
+ }
+ final String dataDirectory = Strings.isEmpty(dataDir) ?
DEFAULT_DATA_DIR : dataDir;
+
+ final StringBuilder sb = new StringBuilder("FlumePersistent[");
+ boolean first = true;
+ for (final Agent agent : agents) {
+ if (!first) {
+ sb.append(',');
+ }
+ sb.append(agent.getHost()).append(':').append(agent.getPort());
+ first = false;
+ }
+ sb.append(']');
+ sb.append(' ').append(dataDirectory);
+ return getManager(sb.toString(), factory, new FactoryData(name,
agents, batchSize, retries,
+ connectionTimeout, requestTimeout, delayMillis,
lockTimeoutRetryCount, dataDir, properties));
+ }
+
+ @Override
+ public void send(final Event event) {
+ if (worker.isShutdown()) {
+ throw new LoggingException("Unable to record event");
+ }
+
+ final Map<String, String> headers = event.getHeaders();
+ final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
+ try {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream daos = new DataOutputStream(baos);
+ daos.writeInt(event.getBody().length);
+ daos.write(event.getBody(), 0, event.getBody().length);
+ daos.writeInt(event.getHeaders().size());
+ for (final Map.Entry<String, String> entry : headers.entrySet()) {
+ daos.writeUTF(entry.getKey());
+ daos.writeUTF(entry.getValue());
+ }
+ byte[] eventData = baos.toByteArray();
+ if (secretKey != null) {
+ final Cipher cipher = Cipher.getInstance("AES");
+ cipher.init(Cipher.ENCRYPT_MODE, secretKey);
+ eventData = cipher.doFinal(eventData);
+ }
+ final Future<Integer> future = threadPool.submit(new
BDBWriter(keyData, eventData, environment, database,
+ gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
+ boolean interrupted = false;
+ int ieCount = 0;
+ do {
+ try {
+ future.get();
+ } catch (final InterruptedException ie) {
+ interrupted = true;
+ ++ieCount;
+ }
+ } while (interrupted && ieCount <= 1);
+
+ } catch (final Exception ex) {
+ throw new LoggingException("Exception occurred writing log event",
ex);
+ }
+ }
+
+ @Override
+ protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
+ boolean closed = true;
+ LOGGER.debug("Shutting down FlumePersistentManager");
+ worker.shutdown();
+ final long requestedTimeoutMillis = timeUnit.toMillis(timeout);
+ final long shutdownWaitMillis = requestedTimeoutMillis < 0 ?
SHUTDOWN_WAIT_MILLIS : requestedTimeoutMillis;
+ try {
+ worker.join(shutdownWaitMillis);
+ } catch (final InterruptedException ie) {
+ // Ignore the exception and shutdown.
+ }
+ ExecutorServices.shutdown(threadPool, shutdownWaitMillis,
TimeUnit.MILLISECONDS, toString());
+ try {
+ worker.join();
+ } catch (final InterruptedException ex) {
+ logDebug("interrupted while waiting for worker to complete", ex);
+ }
+ try {
+ LOGGER.debug("FlumePersistenceManager dataset status: {}",
database.getStats(new StatsConfig()));
+ database.close();
+ } catch (final Exception ex) {
+ logWarn("Failed to close database", ex);
+ closed = false;
+ }
+ try {
+ environment.cleanLog();
+ environment.close();
+ } catch (final Exception ex) {
+ logWarn("Failed to close environment", ex);
+ closed = false;
+ }
+ return closed && super.releaseSub(timeout, timeUnit);
+ }
+
+ private void doSend(final SimpleEvent event) {
+ LOGGER.debug("Sending event to Flume");
+ super.send(event);
+ }
+
+ /**
+ * Thread for writing to Berkeley DB to avoid having interrupts close the
database.
+ */
+ private static class BDBWriter implements Callable<Integer> {
+ private final byte[] eventData;
+ private final byte[] keyData;
+ private final Environment environment;
+ private final Database database;
+ private final Gate gate;
+ private final AtomicLong dbCount;
+ private final long batchSize;
+ private final int lockTimeoutRetryCount;
+
+ public BDBWriter(final byte[] keyData, final byte[] eventData, final
Environment environment,
+ final Database database, final Gate gate, final
AtomicLong dbCount, final long batchSize,
+ final int lockTimeoutRetryCount) {
+ this.keyData = keyData;
+ this.eventData = eventData;
+ this.environment = environment;
+ this.database = database;
+ this.gate = gate;
+ this.dbCount = dbCount;
+ this.batchSize = batchSize;
+ this.lockTimeoutRetryCount = lockTimeoutRetryCount;
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ final DatabaseEntry key = new DatabaseEntry(keyData);
+ final DatabaseEntry data = new DatabaseEntry(eventData);
+ Exception exception = null;
+ for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount;
++retryIndex) {
+ Transaction txn = null;
+ try {
+ txn = environment.beginTransaction(null, null);
+ try {
+ database.put(txn, key, data);
+ txn.commit();
+ txn = null;
+ if (dbCount.incrementAndGet() >= batchSize) {
+ gate.open();
+ }
+ exception = null;
+ break;
+ } catch (final LockConflictException lce) {
+ exception = lce;
+ // Fall through and retry.
+ } catch (final Exception ex) {
+ if (txn != null) {
+ txn.abort();
+ }
+ throw ex;
+ } finally {
+ if (txn != null) {
+ txn.abort();
+ txn = null;
+ }
+ }
+ } catch (final LockConflictException lce) {
+ exception = lce;
+ if (txn != null) {
+ try {
+ txn.abort();
+ txn = null;
+ } catch (final Exception ex) {
+ LOGGER.trace("Ignoring exception while aborting
transaction during lock conflict.");
+ }
+ }
+
+ }
+ try {
+ Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+ } catch (final InterruptedException ie) {
+ // Ignore the error
+ }
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ return eventData.length;
+ }
+ }
+
+ /**
+ * Factory data.
+ */
+ private static class FactoryData {
+ private final String name;
+ private final Agent[] agents;
+ private final int batchSize;
+ private final String dataDir;
+ private final int retries;
+ private final int connectionTimeout;
+ private final int requestTimeout;
+ private final int delayMillis;
+ private final int lockTimeoutRetryCount;
+ private final Property[] properties;
+
+ /**
+ * Constructor.
+ * @param name The name of the Appender.
+ * @param agents The agents.
+ * @param batchSize The number of events to include in a batch.
+ * @param dataDir The directory for data.
+ */
+ public FactoryData(final String name, final Agent[] agents, final int
batchSize, final int retries,
+ final int connectionTimeout, final int
requestTimeout, final int delayMillis,
+ final int lockTimeoutRetryCount, final String
dataDir, final Property[] properties) {
+ this.name = name;
+ this.agents = agents;
+ this.batchSize = batchSize;
+ this.dataDir = dataDir;
+ this.retries = retries;
+ this.connectionTimeout = connectionTimeout;
+ this.requestTimeout = requestTimeout;
+ this.delayMillis = delayMillis;
+ this.lockTimeoutRetryCount = lockTimeoutRetryCount;
+ this.properties = properties;
+ }
+ }
+
+ /**
+ * Avro Manager Factory.
+ */
+ private static class BDBManagerFactory implements
ManagerFactory<FlumePersistentManager, FactoryData> {
+
+ /**
+ * Create the FlumeKratiManager.
+ * @param name The name of the entity to manage.
+ * @param data The data required to create the entity.
+ * @return The FlumeKratiManager.
+ */
+ @Override
+ public FlumePersistentManager createManager(final String name, final
FactoryData data) {
+ SecretKey secretKey = null;
+ Database database = null;
+ Environment environment = null;
+
+ final Map<String, String> properties = new HashMap<>();
+ if (data.properties != null) {
+ for (final Property property : data.properties) {
+ properties.put(property.getName(), property.getValue());
+ }
+ }
+
+ try {
+ final File dir = new File(data.dataDir);
+ FileUtils.mkdir(dir, true);
+ final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
+ dbEnvConfig.setTransactional(true);
+ dbEnvConfig.setAllowCreate(true);
+ dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
+ environment = new Environment(dir, dbEnvConfig);
+ final DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ database = environment.openDatabase(null, name, dbConfig);
+ } catch (final Exception ex) {
+ LOGGER.error("Could not create FlumePersistentManager", ex);
+ // For consistency, close database as well as environment even
though it should never happen since the
+ // database is that last thing in the block above, but this
does guard against a future line being
+ // inserted at the end that would bomb (like some debug
logging).
+ if (database != null) {
+ database.close();
+ database = null;
+ }
+ if (environment != null) {
+ environment.close();
+ environment = null;
+ }
+ return null;
+ }
+
+ try {
+ String key = null;
+ for (final Map.Entry<String, String> entry :
properties.entrySet()) {
+ if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
+ key = entry.getValue();
+ break;
+ }
+ }
+ if (key != null) {
+ final PluginManager manager = new
PluginManager("KeyProvider");
+ manager.collectPlugins();
+ final Map<String, PluginType<?>> plugins =
manager.getPlugins();
+ if (plugins != null) {
+ boolean found = false;
+ for (final Map.Entry<String, PluginType<?>> entry :
plugins.entrySet()) {
+ if (entry.getKey().equalsIgnoreCase(key)) {
+ found = true;
+ final Class<?> cl =
entry.getValue().getPluginClass();
+ try {
+ final SecretKeyProvider provider =
(SecretKeyProvider) cl.newInstance();
+ secretKey = provider.getSecretKey();
+ LOGGER.debug("Persisting events using
SecretKeyProvider {}", cl.getName());
+ } catch (final Exception ex) {
+ LOGGER.error("Unable to create
SecretKeyProvider {}, encryption will be disabled",
+ cl.getName());
+ }
+ break;
+ }
+ }
+ if (!found) {
+ LOGGER.error("Unable to locate SecretKey provider
{}, encryption will be disabled", key);
+ }
+ } else {
+ LOGGER.error("Unable to locate SecretKey provider {},
encryption will be disabled", key);
+ }
+ }
+ } catch (final Exception ex) {
+ LOGGER.warn("Error setting up encryption - encryption will be
disabled", ex);
+ }
+ return new FlumePersistentManager(name, data.name, data.agents,
data.batchSize, data.retries,
+ data.connectionTimeout, data.requestTimeout, data.delayMillis,
database, environment, secretKey,
+ data.lockTimeoutRetryCount);
+ }
+ }
+
+ /**
+ * Thread that sends data to Flume and pulls it from Berkeley DB.
+ */
+ private static class WriterThread extends Log4jThread {
+ private volatile boolean shutdown = false;
+ private final Database database;
+ private final Environment environment;
+ private final FlumePersistentManager manager;
+ private final Gate gate;
+ private final SecretKey secretKey;
+ private final int batchSize;
+ private final AtomicLong dbCounter;
+ private final int lockTimeoutRetryCount;
+
+ public WriterThread(final Database database, final Environment
environment,
+ final FlumePersistentManager manager, final Gate
gate, final int batchsize,
+ final SecretKey secretKey, final AtomicLong
dbCount, final int lockTimeoutRetryCount) {
+ super("FlumePersistentManager-Writer");
+ this.database = database;
+ this.environment = environment;
+ this.manager = manager;
+ this.gate = gate;
+ this.batchSize = batchsize;
+ this.secretKey = secretKey;
+ this.setDaemon(true);
+ this.dbCounter = dbCount;
+ this.lockTimeoutRetryCount = lockTimeoutRetryCount;
+ }
+
+ public void shutdown() {
+ LOGGER.debug("Writer thread shutting down");
+ this.shutdown = true;
+ gate.open();
+ }
+
+ public boolean isShutdown() {
+ return shutdown;
+ }
+
+ @Override
+ public void run() {
+ LOGGER.trace("WriterThread started - batch size = " + batchSize +
", delayMillis = " + manager.getDelayMillis());
+ long nextBatchMillis = System.currentTimeMillis() +
manager.getDelayMillis();
+ while (!shutdown) {
+ final long nowMillis = System.currentTimeMillis();
+ final long dbCount = database.count();
+ dbCounter.set(dbCount);
+ if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <=
nowMillis) {
+ nextBatchMillis = nowMillis + manager.getDelayMillis();
+ try {
+ boolean errors = false;
+ final DatabaseEntry key = new DatabaseEntry();
+ final DatabaseEntry data = new DatabaseEntry();
+
+ gate.close();
+ OperationStatus status;
+ if (batchSize > 1) {
+ try {
+ errors = sendBatch(key, data);
+ } catch (final Exception ex) {
+ break;
+ }
+ } else {
+ Exception exception = null;
+ for (int retryIndex = 0; retryIndex <
lockTimeoutRetryCount; ++retryIndex) {
+ exception = null;
+ Transaction txn = null;
+ Cursor cursor = null;
+ try {
+ txn = environment.beginTransaction(null,
null);
+ cursor = database.openCursor(txn, null);
+ try {
+ status = cursor.getFirst(key, data,
LockMode.RMW);
+ while (status ==
OperationStatus.SUCCESS) {
+ final SimpleEvent event =
createEvent(data);
+ if (event != null) {
+ try {
+ manager.doSend(event);
+ } catch (final Exception ioe) {
+ errors = true;
+ LOGGER.error("Error
sending event", ioe);
+ break;
+ }
+ try {
+ cursor.delete();
+ } catch (final Exception ex) {
+ LOGGER.error("Unable to
delete event", ex);
+ }
+ }
+ status = cursor.getNext(key, data,
LockMode.RMW);
+ }
+ if (cursor != null) {
+ cursor.close();
+ cursor = null;
+ }
+ txn.commit();
+ txn = null;
+ dbCounter.decrementAndGet();
+ exception = null;
+ break;
+ } catch (final LockConflictException lce) {
+ exception = lce;
+ // Fall through and retry.
+ } catch (final Exception ex) {
+ LOGGER.error("Error reading or writing
to database", ex);
+ shutdown = true;
+ break;
+ } finally {
+ if (cursor != null) {
+ cursor.close();
+ cursor = null;
+ }
+ if (txn != null) {
+ txn.abort();
+ txn = null;
+ }
+ }
+ } catch (final LockConflictException lce) {
+ exception = lce;
+ if (cursor != null) {
+ try {
+ cursor.close();
+ cursor = null;
+ } catch (final Exception ex) {
+ LOGGER.trace("Ignored exception
closing cursor during lock conflict.");
+ }
+ }
+ if (txn != null) {
+ try {
+ txn.abort();
+ txn = null;
+ } catch (final Exception ex) {
+ LOGGER.trace("Ignored exception
aborting tx during lock conflict.");
+ }
+ }
+ }
+ try {
+ Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+ } catch (final InterruptedException ie) {
+ // Ignore the error
+ }
+ }
+ if (exception != null) {
+ LOGGER.error("Unable to read or update data
base", exception);
+ }
+ }
+ if (errors) {
+ Thread.sleep(manager.getDelayMillis());
+ continue;
+ }
+ } catch (final Exception ex) {
+ LOGGER.warn("WriterThread encountered an exception.
Continuing.", ex);
+ }
+ } else {
+ if (nextBatchMillis <= nowMillis) {
+ nextBatchMillis = nowMillis + manager.getDelayMillis();
+ }
+ try {
+ final long interval = nextBatchMillis - nowMillis;
+ gate.waitForOpen(interval);
+ } catch (final InterruptedException ie) {
+ LOGGER.warn("WriterThread interrupted, continuing");
+ } catch (final Exception ex) {
+ LOGGER.error("WriterThread encountered an exception
waiting for work", ex);
+ break;
+ }
+ }
+ }
+
+ if (batchSize > 1 && database.count() > 0) {
+ final DatabaseEntry key = new DatabaseEntry();
+ final DatabaseEntry data = new DatabaseEntry();
+ try {
+ sendBatch(key, data);
+ } catch (final Exception ex) {
+ LOGGER.warn("Unable to write final batch");
+ }
+ }
+ LOGGER.trace("WriterThread exiting");
+ }
+
+ private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data)
throws Exception {
+ boolean errors = false;
+ OperationStatus status;
+ Cursor cursor = null;
+ try {
+ final BatchEvent batch = new BatchEvent();
+ for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount;
++retryIndex) {
+ try {
+ cursor = database.openCursor(null,
CursorConfig.DEFAULT);
+ status = cursor.getFirst(key, data, null);
+
+ for (int i = 0; status ==
OperationStatus.SUCCESS && i < batchSize; ++i) {
+ final SimpleEvent event =
createEvent(data);
+ if (event != null) {
+ batch.addEvent(event);
+ }
+ status = cursor.getNext(key, data,
null);
+ }
+ break;
+ } catch (final LockConflictException lce) {
+ if (cursor != null) {
+ try {
+ cursor.close();
+ cursor = null;
+ } catch (final Exception ex) {
+ LOGGER.trace("Ignored exception closing cursor
during lock conflict.");
+ }
+ }
+ }
+ }
+
+ try {
+ manager.send(batch);
+ } catch (final Exception ioe) {
+ LOGGER.error("Error sending events", ioe);
+ errors = true;
+ }
+ if (!errors) {
+ if (cursor != null) {
+ cursor.close();
+ cursor = null;
+ }
+ Transaction txn = null;
+ Exception exception = null;
+ for (int retryIndex = 0; retryIndex <
lockTimeoutRetryCount; ++retryIndex) {
+ try {
+ txn = environment.beginTransaction(null, null);
+ try {
+ for (final Event event : batch.getEvents()) {
+ try {
+ final Map<String, String> headers =
event.getHeaders();
+ key = new
DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
+ database.delete(txn, key);
+ } catch (final Exception ex) {
+ LOGGER.error("Error deleting key from
database", ex);
+ }
+ }
+ txn.commit();
+ long count = dbCounter.get();
+ while (!dbCounter.compareAndSet(count, count -
batch.getEvents().size())) {
+ count = dbCounter.get();
+ }
+ exception = null;
+ break;
+ } catch (final LockConflictException lce) {
+ exception = lce;
+ if (cursor != null) {
+ try {
+ cursor.close();
+ cursor = null;
+ } catch (final Exception ex) {
+ LOGGER.trace("Ignored exception
closing cursor during lock conflict.");
+ }
+ }
+ if (txn != null) {
+ try {
+ txn.abort();
+ txn = null;
+ } catch (final Exception ex) {
+ LOGGER.trace("Ignored exception
aborting transaction during lock conflict.");
+ }
+ }
+ } catch (final Exception ex) {
+ LOGGER.error("Unable to commit transaction",
ex);
+ if (txn != null) {
+ txn.abort();
+ }
+ }
+ } catch (final LockConflictException lce) {
+ exception = lce;
+ if (cursor != null) {
+ try {
+ cursor.close();
+ cursor = null;
+ } catch (final Exception ex) {
+ LOGGER.trace("Ignored exception closing
cursor during lock conflict.");
+ }
+ }
+ if (txn != null) {
+ try {
+ txn.abort();
+ txn = null;
+ } catch (final Exception ex) {
+ LOGGER.trace("Ignored exception aborting
transaction during lock conflict.");
+ }
+ }
+ } finally {
+ if (cursor != null) {
+ cursor.close();
+ cursor = null;
+ }
+ if (txn != null) {
+ txn.abort();
+ txn = null;
+ }
+ }
+ try {
+ Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+ } catch (final InterruptedException ie) {
+ // Ignore the error
+ }
+ }
+ if (exception != null) {
+ LOGGER.error("Unable to delete events from data base",
exception);
+ }
+ }
+ } catch (final Exception ex) {
+ LOGGER.error("Error reading database", ex);
+ shutdown = true;
+ throw ex;
+ } finally {
+ if (cursor != null) {
+ cursor.close();
+ }
+ }
+
+ return errors;
+ }
+
+ private SimpleEvent createEvent(final DatabaseEntry data) {
+ final SimpleEvent event = new SimpleEvent();
+ try {
+ byte[] eventData = data.getData();
+ if (secretKey != null) {
+ final Cipher cipher = Cipher.getInstance("AES");
+ cipher.init(Cipher.DECRYPT_MODE, secretKey);
+ eventData = cipher.doFinal(eventData);
+ }
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(eventData);
+ final DataInputStream dais = new DataInputStream(bais);
+ int length = dais.readInt();
+ final byte[] bytes = new byte[length];
+ dais.read(bytes, 0, length);
+ event.setBody(bytes);
+ length = dais.readInt();
+ final Map<String, String> map = new HashMap<>(length);
+ for (int i = 0; i < length; ++i) {
+ final String headerKey = dais.readUTF();
+ final String value = dais.readUTF();
+ map.put(headerKey, value);
+ }
+ event.setHeaders(map);
+ return event;
+ } catch (final Exception ex) {
+ LOGGER.error("Error retrieving event", ex);
+ return null;
+ }
+ }
+
+ }
+
+ /**
+ * An internal class.
+ */
+ private static class Gate {
+
+ private boolean isOpen = false;
+
+ public boolean isOpen() {
+ return isOpen;
+ }
+
+ public synchronized void open() {
+ isOpen = true;
+ notifyAll();
+ }
+
+ public synchronized void close() {
+ isOpen = false;
+ }
+
+ public synchronized void waitForOpen(final long timeout) throws
InterruptedException {
+ wait(timeout);
+ }
+ }
+}