http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java index 3b639e9..815a794 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java @@ -20,57 +20,54 @@ package org.apache.rya.periodic.notification.exporter; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.log4j.Logger; -import org.apache.rya.periodic.notification.api.BindingSetExporter; import org.apache.rya.periodic.notification.api.BindingSetRecord; import org.apache.rya.periodic.notification.api.LifeCycle; import org.openrdf.query.BindingSet; - -import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Executor service that runs {@link KafkaPeriodicBindingSetExporter}s. + * Executor service that runs {@link KafkaPeriodicBindingSetExporter}s. * */ public class KafkaExporterExecutor implements LifeCycle { - private static final Logger log = Logger.getLogger(BindingSetExporter.class); - private KafkaProducer<String, BindingSet> producer; - private BlockingQueue<BindingSetRecord> bindingSets; + private static final Logger log = LoggerFactory.getLogger(KafkaExporterExecutor.class); + private final KafkaProducer<String, BindingSet> producer; + private final BlockingQueue<BindingSetRecord> bindingSets; private ExecutorService executor; - private List<KafkaPeriodicBindingSetExporter> exporters; - private int num_Threads; + private final List<KafkaPeriodicBindingSetExporter> exporters; + private final int numThreads; private boolean running = false; /** * Creates a KafkaExporterExecutor for exporting periodic query results to Kafka. * @param producer for publishing results to Kafka - * @param num_Threads number of threads used to publish results + * @param numThreads number of threads used to publish results * @param bindingSets - work queue containing {@link BindingSet}s to be published */ - public KafkaExporterExecutor(KafkaProducer<String, BindingSet> producer, int num_Threads, BlockingQueue<BindingSetRecord> bindingSets) { - Preconditions.checkNotNull(producer); - Preconditions.checkNotNull(bindingSets); - this.producer = producer; - this.bindingSets = bindingSets; - this.num_Threads = num_Threads; + public KafkaExporterExecutor(final KafkaProducer<String, BindingSet> producer, final int numThreads, final BlockingQueue<BindingSetRecord> bindingSets) { + this.producer = Objects.requireNonNull(producer); + this.bindingSets = Objects.requireNonNull(bindingSets); + this.numThreads = numThreads; this.exporters = new ArrayList<>(); } @Override public void start() { if (!running) { - executor = Executors.newFixedThreadPool(num_Threads); + executor = Executors.newFixedThreadPool(numThreads); - for (int threadNumber = 0; threadNumber < num_Threads; threadNumber++) { - log.info("Creating exporter:" + threadNumber); - KafkaPeriodicBindingSetExporter exporter = new KafkaPeriodicBindingSetExporter(producer, threadNumber, bindingSets); + for (int threadNumber = 0; threadNumber < numThreads; threadNumber++) { + log.info("Creating exporter: {}", threadNumber); + final KafkaPeriodicBindingSetExporter exporter = new KafkaPeriodicBindingSetExporter(producer, threadNumber, bindingSets); exporters.add(exporter); executor.submit(exporter); } @@ -98,7 +95,7 @@ public class KafkaExporterExecutor implements LifeCycle { log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); executor.shutdownNow(); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { log.info("Interrupted during shutdown, exiting uncleanly"); } }
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java index 5397618..c343116 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java @@ -18,46 +18,42 @@ */ package org.apache.rya.periodic.notification.exporter; +import java.util.Objects; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; import org.apache.rya.periodic.notification.api.BindingSetExporter; import org.apache.rya.periodic.notification.api.BindingSetRecord; import org.apache.rya.periodic.notification.api.BindingSetRecordExportException; import org.openrdf.model.Literal; import org.openrdf.query.BindingSet; - -import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Object that exports {@link BindingSet}s to the Kafka topic indicated by * the {@link BindingSetRecord}. - * + * */ public class KafkaPeriodicBindingSetExporter implements BindingSetExporter, Runnable { - private static final Logger log = Logger.getLogger(BindingSetExporter.class); - private KafkaProducer<String, BindingSet> producer; - private BlockingQueue<BindingSetRecord> bindingSets; - private AtomicBoolean closed = new AtomicBoolean(false); - private int threadNumber; + private static final Logger log = LoggerFactory.getLogger(KafkaPeriodicBindingSetExporter.class); + private final KafkaProducer<String, BindingSet> producer; + private final BlockingQueue<BindingSetRecord> bindingSets; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final int threadNumber; - public KafkaPeriodicBindingSetExporter(KafkaProducer<String, BindingSet> producer, int threadNumber, - BlockingQueue<BindingSetRecord> bindingSets) { - Preconditions.checkNotNull(producer); - Preconditions.checkNotNull(bindingSets); + public KafkaPeriodicBindingSetExporter(final KafkaProducer<String, BindingSet> producer, final int threadNumber, + final BlockingQueue<BindingSetRecord> bindingSets) { this.threadNumber = threadNumber; - this.producer = producer; - this.bindingSets = bindingSets; + this.producer = Objects.requireNonNull(producer); + this.bindingSets = Objects.requireNonNull(bindingSets); } /** @@ -65,18 +61,21 @@ public class KafkaPeriodicBindingSetExporter implements BindingSetExporter, Runn * the indicated BindingSetRecord and the BindingSet is then exported to the topic. */ @Override - public void exportNotification(BindingSetRecord record) throws BindingSetRecordExportException { - String bindingName = IncrementalUpdateConstants.PERIODIC_BIN_ID; - BindingSet bindingSet = record.getBindingSet(); - String topic = record.getTopic(); - long binId = ((Literal) bindingSet.getValue(bindingName)).longValue(); - final Future<RecordMetadata> future = producer - .send(new ProducerRecord<String, BindingSet>(topic, Long.toString(binId), bindingSet)); + public void exportNotification(final BindingSetRecord record) throws BindingSetRecordExportException { try { + log.info("Exporting {} records to Kafka to topic: {}", record.getBindingSet().size(), record.getTopic()); + final String bindingName = IncrementalUpdateConstants.PERIODIC_BIN_ID; + + final BindingSet bindingSet = record.getBindingSet(); + final String topic = record.getTopic(); + final long binId = ((Literal) bindingSet.getValue(bindingName)).longValue(); + + final Future<RecordMetadata> future = producer + .send(new ProducerRecord<String, BindingSet>(topic, Long.toString(binId), bindingSet)); //wait for confirmation that results have been received future.get(5, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - throw new BindingSetRecordExportException(e.getMessage()); + } catch (final Exception e) { // catch all possible exceptional behavior and throw as our checked exception. + throw new BindingSetRecordExportException(e.getMessage(), e); } } @@ -87,11 +86,11 @@ public class KafkaPeriodicBindingSetExporter implements BindingSetExporter, Runn exportNotification(bindingSets.take()); } } catch (InterruptedException | BindingSetRecordExportException e) { - log.trace("Thread " + threadNumber + " is unable to process message."); + log.warn("Thread " + threadNumber + " is unable to process message.", e); } } - - + + public void shutdown() { closed.set(true); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java index a9a5ad1..e589141 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java @@ -20,36 +20,43 @@ import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.periodic.notification.api.BindingSetRecord; import org.apache.rya.periodic.notification.api.LifeCycle; import org.apache.rya.periodic.notification.api.NodeBin; import org.apache.rya.periodic.notification.notification.TimestampedNotification; - -import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Executor service that runs {@link TimestampedNotificationProcessor}s with basic * functionality for starting, stopping, and determining whether notification processors are - * being executed. + * being executed. * */ public class NotificationProcessorExecutor implements LifeCycle { - private static final Logger log = Logger.getLogger(TimestampedNotificationProcessor.class); - private BlockingQueue<TimestampedNotification> notifications; // notifications - private BlockingQueue<NodeBin> bins; // entries to delete from Fluo - private BlockingQueue<BindingSetRecord> bindingSets; // query results to - // export - private PeriodicQueryResultStorage periodicStorage; - private List<TimestampedNotificationProcessor> processors; - private int numberThreads; + private static final Logger log = LoggerFactory.getLogger(NotificationProcessorExecutor.class); + private final BlockingQueue<TimestampedNotification> notifications; + + /** + * entries to delete from Fluo + */ + private final BlockingQueue<NodeBin> bins; + + /** + * query results to export + */ + private final BlockingQueue<BindingSetRecord> bindingSets; + private final PeriodicQueryResultStorage periodicStorage; + private final List<TimestampedNotificationProcessor> processors; + private final int numberThreads; private ExecutorService executor; private boolean running = false; @@ -61,11 +68,11 @@ public class NotificationProcessorExecutor implements LifeCycle { * @param bindingSets - results read from the storage layer to be exported * @param numberThreads - number of threads used for processing */ - public NotificationProcessorExecutor(PeriodicQueryResultStorage periodicStorage, BlockingQueue<TimestampedNotification> notifications, - BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets, int numberThreads) { - this.notifications = Preconditions.checkNotNull(notifications); - this.bins = Preconditions.checkNotNull(bins); - this.bindingSets = Preconditions.checkNotNull(bindingSets); + public NotificationProcessorExecutor(final PeriodicQueryResultStorage periodicStorage, final BlockingQueue<TimestampedNotification> notifications, + final BlockingQueue<NodeBin> bins, final BlockingQueue<BindingSetRecord> bindingSets, final int numberThreads) { + this.notifications = Objects.requireNonNull(notifications); + this.bins = Objects.requireNonNull(bins); + this.bindingSets = Objects.requireNonNull(bindingSets); this.periodicStorage = periodicStorage; this.numberThreads = numberThreads; processors = new ArrayList<>(); @@ -76,8 +83,8 @@ public class NotificationProcessorExecutor implements LifeCycle { if (!running) { executor = Executors.newFixedThreadPool(numberThreads); for (int threadNumber = 0; threadNumber < numberThreads; threadNumber++) { - log.info("Creating exporter:" + threadNumber); - TimestampedNotificationProcessor processor = TimestampedNotificationProcessor.builder().setBindingSets(bindingSets) + log.info("Creating processor for thread: {}", threadNumber); + final TimestampedNotificationProcessor processor = TimestampedNotificationProcessor.builder().setBindingSets(bindingSets) .setBins(bins).setPeriodicStorage(periodicStorage).setNotifications(notifications).setThreadNumber(threadNumber) .build(); processors.add(processor); @@ -97,11 +104,11 @@ public class NotificationProcessorExecutor implements LifeCycle { } running = false; try { - if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); executor.shutdownNow(); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { log.info("Interrupted during shutdown, exiting uncleanly"); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java index 8b65683..ae586da 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java @@ -22,7 +22,6 @@ import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.periodic.notification.api.BinPruner; @@ -32,6 +31,8 @@ import org.apache.rya.periodic.notification.api.NotificationProcessor; import org.apache.rya.periodic.notification.exporter.KafkaPeriodicBindingSetExporter; import org.apache.rya.periodic.notification.notification.TimestampedNotification; import org.openrdf.query.BindingSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; @@ -46,19 +47,30 @@ import com.google.common.base.Preconditions; */ public class TimestampedNotificationProcessor implements NotificationProcessor, Runnable { - private static final Logger log = Logger.getLogger(TimestampedNotificationProcessor.class); - private PeriodicQueryResultStorage periodicStorage; - private BlockingQueue<TimestampedNotification> notifications; // notifications - // to process - private BlockingQueue<NodeBin> bins; // entries to delete from Fluo - private BlockingQueue<BindingSetRecord> bindingSets; // query results to export - private AtomicBoolean closed = new AtomicBoolean(false); - private int threadNumber; - - - public TimestampedNotificationProcessor(PeriodicQueryResultStorage periodicStorage, - BlockingQueue<TimestampedNotification> notifications, BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets, - int threadNumber) { + private static final Logger log = LoggerFactory.getLogger(TimestampedNotificationProcessor.class); + private final PeriodicQueryResultStorage periodicStorage; + + /** + * notifications to process + */ + private final BlockingQueue<TimestampedNotification> notifications; + + /** + * entries to delete from Fluo + */ + private final BlockingQueue<NodeBin> bins; + + /** + * query results to export + */ + private final BlockingQueue<BindingSetRecord> bindingSets; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final int threadNumber; + + + public TimestampedNotificationProcessor(final PeriodicQueryResultStorage periodicStorage, + final BlockingQueue<TimestampedNotification> notifications, final BlockingQueue<NodeBin> bins, final BlockingQueue<BindingSetRecord> bindingSets, + final int threadNumber) { this.notifications = Preconditions.checkNotNull(notifications); this.bins = Preconditions.checkNotNull(bins); this.bindingSets = Preconditions.checkNotNull(bindingSets); @@ -75,13 +87,13 @@ public class TimestampedNotificationProcessor implements NotificationProcessor, * bins can be deleted from Fluo and Accumulo. */ @Override - public void processNotification(TimestampedNotification notification) { + public void processNotification(final TimestampedNotification notification) { - String id = notification.getId(); - long ts = notification.getTimestamp().getTime(); - long period = notification.getPeriod(); - long bin = getBinFromTimestamp(ts, period); - NodeBin nodeBin = new NodeBin(id, bin); + final String id = notification.getId(); + final long ts = notification.getTimestamp().getTime(); + final long period = notification.getPeriod(); + final long bin = getBinFromTimestamp(ts, period); + final NodeBin nodeBin = new NodeBin(id, bin); try (CloseableIterator<BindingSet> iter = periodicStorage.listResults(id, Optional.of(bin));) { @@ -91,20 +103,20 @@ public class TimestampedNotificationProcessor implements NotificationProcessor, // add NodeBin to BinPruner queue so that bin can be deleted from // Fluo and Accumulo bins.add(nodeBin); - } catch (Exception e) { - log.debug("Encountered error: " + e.getMessage() + " while accessing periodic results for bin: " + bin + " for query: " + id); + } catch (final Exception e) { + log.warn("Encountered exception while accessing periodic results for bin: " + bin + " for query: " + id, e); } } /** * Computes left bin end point containing event time ts - * + * * @param ts - event time * @param start - time that periodic event began * @param period - length of period * @return left bin end point containing event time ts */ - private long getBinFromTimestamp(long ts, long period) { + private long getBinFromTimestamp(final long ts, final long period) { Preconditions.checkArgument(period > 0); return (ts / period) * period; } @@ -115,13 +127,13 @@ public class TimestampedNotificationProcessor implements NotificationProcessor, while(!closed.get()) { processNotification(notifications.take()); } - } catch (Exception e) { - log.trace("Thread_" + threadNumber + " is unable to process next notification."); + } catch (final Exception e) { + log.warn("Thread {} is unable to process next notification.", threadNumber); throw new RuntimeException(e); } } - + public void shutdown() { closed.set(true); } @@ -130,7 +142,7 @@ public class TimestampedNotificationProcessor implements NotificationProcessor, return new Builder(); } - + public static class Builder { @@ -138,7 +150,7 @@ public class TimestampedNotificationProcessor implements NotificationProcessor, private BlockingQueue<TimestampedNotification> notifications; // notifications to process private BlockingQueue<NodeBin> bins; // entries to delete from Fluo private BlockingQueue<BindingSetRecord> bindingSets; // query results to export - + private int threadNumber; /** @@ -146,7 +158,7 @@ public class TimestampedNotificationProcessor implements NotificationProcessor, * @param notifications - work queue containing notifications to be processed * @return this Builder for chaining method calls */ - public Builder setNotifications(BlockingQueue<TimestampedNotification> notifications) { + public Builder setNotifications(final BlockingQueue<TimestampedNotification> notifications) { this.notifications = notifications; return this; } @@ -156,7 +168,7 @@ public class TimestampedNotificationProcessor implements NotificationProcessor, * @param bins - work queue containing NodeBins to be pruned * @return this Builder for chaining method calls */ - public Builder setBins(BlockingQueue<NodeBin> bins) { + public Builder setBins(final BlockingQueue<NodeBin> bins) { this.bins = bins; return this; } @@ -166,7 +178,7 @@ public class TimestampedNotificationProcessor implements NotificationProcessor, * @param bindingSets - work queue containing BindingSets to be exported * @return this Builder for chaining method calls */ - public Builder setBindingSets(BlockingQueue<BindingSetRecord> bindingSets) { + public Builder setBindingSets(final BlockingQueue<BindingSetRecord> bindingSets) { this.bindingSets = bindingSets; return this; } @@ -176,17 +188,17 @@ public class TimestampedNotificationProcessor implements NotificationProcessor, * @param threadNumber - number of threads used by this processor * @return - number of threads used by this processor */ - public Builder setThreadNumber(int threadNumber) { + public Builder setThreadNumber(final int threadNumber) { this.threadNumber = threadNumber; return this; } - + /** * Set the PeriodicStorage layer * @param periodicStorage - periodic storage layer that periodic results are read from * @return - this Builder for chaining method calls */ - public Builder setPeriodicStorage(PeriodicQueryResultStorage periodicStorage) { + public Builder setPeriodicStorage(final PeriodicQueryResultStorage periodicStorage) { this.periodicStorage = periodicStorage; return this; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java index a9403c2..f3d5a8d 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java @@ -18,25 +18,25 @@ */ package org.apache.rya.periodic.notification.pruner; -import org.apache.log4j.Logger; +import java.util.Objects; + import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; import org.apache.rya.periodic.notification.api.BinPruner; import org.apache.rya.periodic.notification.api.NodeBin; - -import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Deletes BindingSets from time bins in the indicated PCJ table */ public class AccumuloBinPruner implements BinPruner { - private static final Logger log = Logger.getLogger(AccumuloBinPruner.class); - private PeriodicQueryResultStorage periodicStorage; + private static final Logger log = LoggerFactory.getLogger(AccumuloBinPruner.class); + private final PeriodicQueryResultStorage periodicStorage; - public AccumuloBinPruner(PeriodicQueryResultStorage periodicStorage) { - Preconditions.checkNotNull(periodicStorage); - this.periodicStorage = periodicStorage; + public AccumuloBinPruner(final PeriodicQueryResultStorage periodicStorage) { + this.periodicStorage = Objects.requireNonNull(periodicStorage); } /** @@ -44,20 +44,20 @@ public class AccumuloBinPruner implements BinPruner { * table indicated by the id. It is assumed that all BindingSet entries for * the corresponding bin are written to the PCJ table so that the bin Id * occurs first. - * + * * @param id * - pcj table id * @param bin * - temporal bin the BindingSets are contained in */ @Override - public void pruneBindingSetBin(NodeBin nodeBin) { - Preconditions.checkNotNull(nodeBin); - String id = nodeBin.getNodeId(); - long bin = nodeBin.getBin(); + public void pruneBindingSetBin(final NodeBin nodeBin) { + Objects.requireNonNull(nodeBin); + final String id = nodeBin.getNodeId(); + final long bin = nodeBin.getBin(); try { periodicStorage.deletePeriodicQueryResults(id, bin); - } catch (PeriodicQueryStorageException e) { + } catch (final PeriodicQueryStorageException e) { log.trace("Unable to delete results from Peroidic Table: " + id + " for bin: " + bin); throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java index bee9c02..0562180 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java @@ -23,13 +23,15 @@ import org.apache.fluo.api.client.Transaction; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.Span; -import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO; import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation; import org.apache.rya.periodic.notification.api.BinPruner; import org.apache.rya.periodic.notification.api.NodeBin; +import org.openrdf.query.BindingSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Optional; @@ -38,35 +40,35 @@ import com.google.common.base.Optional; */ public class FluoBinPruner implements BinPruner { - private static final Logger log = Logger.getLogger(FluoBinPruner.class); - private FluoClient client; + private static final Logger log = LoggerFactory.getLogger(FluoBinPruner.class); + private final FluoClient client; - public FluoBinPruner(FluoClient client) { + public FluoBinPruner(final FluoClient client) { this.client = client; } /** * This method deletes BindingSets in the specified bin from the BindingSet * Column of the indicated Fluo nodeId - * + * * @param id * - Fluo nodeId * @param bin * - bin id */ @Override - public void pruneBindingSetBin(NodeBin nodeBin) { - String id = nodeBin.getNodeId(); - long bin = nodeBin.getBin(); + public void pruneBindingSetBin(final NodeBin nodeBin) { + final String id = nodeBin.getNodeId(); + final long bin = nodeBin.getBin(); try (Transaction tx = client.newTransaction()) { - Optional<NodeType> type = NodeType.fromNodeId(id); + final Optional<NodeType> type = NodeType.fromNodeId(id); if (!type.isPresent()) { log.trace("Unable to determine NodeType from id: " + id); throw new RuntimeException(); } - Column batchInfoColumn = type.get().getResultColumn(); - String batchInfoSpanPrefix = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bin; - SpanBatchDeleteInformation batchInfo = SpanBatchDeleteInformation.builder().setColumn(batchInfoColumn) + final Column batchInfoColumn = type.get().getResultColumn(); + final String batchInfoSpanPrefix = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bin; + final SpanBatchDeleteInformation batchInfo = SpanBatchDeleteInformation.builder().setColumn(batchInfoColumn) .setSpan(Span.prefix(Bytes.of(batchInfoSpanPrefix))).build(); BatchInformationDAO.addBatch(tx, id, batchInfo); tx.commit(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java index 327154a..c21710a 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java @@ -19,6 +19,7 @@ package org.apache.rya.periodic.notification.pruner; import java.util.HashSet; +import java.util.Objects; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -26,13 +27,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.SnapshotBase; -import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; import org.apache.rya.periodic.notification.api.BinPruner; import org.apache.rya.periodic.notification.api.NodeBin; - -import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implementation of {@link BinPruner} that deletes old, already processed @@ -42,63 +42,63 @@ import com.google.common.base.Preconditions; */ public class PeriodicQueryPruner implements BinPruner, Runnable { - private static final Logger log = Logger.getLogger(PeriodicQueryPruner.class); - private FluoClient client; - private AccumuloBinPruner accPruner; - private FluoBinPruner fluoPruner; - private BlockingQueue<NodeBin> bins; - private AtomicBoolean closed = new AtomicBoolean(false); - private int threadNumber; + private static final Logger log = LoggerFactory.getLogger(PeriodicQueryPruner.class); + private final FluoClient client; + private final AccumuloBinPruner accPruner; + private final FluoBinPruner fluoPruner; + private final BlockingQueue<NodeBin> bins; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final int threadNumber; - public PeriodicQueryPruner(FluoBinPruner fluoPruner, AccumuloBinPruner accPruner, FluoClient client, BlockingQueue<NodeBin> bins, int threadNumber) { - this.fluoPruner = Preconditions.checkNotNull(fluoPruner); - this.accPruner = Preconditions.checkNotNull(accPruner); - this.client = Preconditions.checkNotNull(client); - this.bins = Preconditions.checkNotNull(bins); + public PeriodicQueryPruner(final FluoBinPruner fluoPruner, final AccumuloBinPruner accPruner, final FluoClient client, final BlockingQueue<NodeBin> bins, final int threadNumber) { + this.fluoPruner = Objects.requireNonNull(fluoPruner); + this.accPruner = Objects.requireNonNull(accPruner); + this.client = Objects.requireNonNull(client); + this.bins = Objects.requireNonNull(bins); this.threadNumber = threadNumber; } - + @Override public void run() { try { while (!closed.get()) { pruneBindingSetBin(bins.take()); } - } catch (InterruptedException e) { - log.trace("Thread " + threadNumber + " is unable to prune the next message."); + } catch (final InterruptedException e) { + log.warn("Thread {} is unable to prune the next message.", threadNumber); throw new RuntimeException(e); } } - + /** * Prunes BindingSet bins from the Rya Fluo Application in addition to the BindingSet * bins created in the PCJ tables associated with the give query id. - * @param id - QueryResult Id for the Rya Fluo application + * @param id - QueryResult Id for the Rya Fluo application * @param bin - bin id for bins to be deleted */ @Override - public void pruneBindingSetBin(NodeBin nodeBin) { - String pcjId = nodeBin.getNodeId(); - long bin = nodeBin.getBin(); + public void pruneBindingSetBin(final NodeBin nodeBin) { + final String pcjId = nodeBin.getNodeId(); + final long bin = nodeBin.getBin(); try(Snapshot sx = client.newSnapshot()) { - String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId); - Set<String> fluoIds = getNodeIdsFromResultId(sx, queryId); + final String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId); + final Set<String> fluoIds = getNodeIdsFromResultId(sx, queryId); accPruner.pruneBindingSetBin(nodeBin); - for(String fluoId: fluoIds) { + for(final String fluoId: fluoIds) { fluoPruner.pruneBindingSetBin(new NodeBin(fluoId, bin)); } - } catch (Exception e) { - log.trace("Could not successfully initialize PeriodicQueryBinPruner."); + } catch (final Exception e) { + log.warn("Could not successfully initialize PeriodicQueryBinPruner.", e); } } - - + + public void shutdown() { closed.set(true); } - private Set<String> getNodeIdsFromResultId(SnapshotBase sx, String id) { - Set<String> ids = new HashSet<>(); + private Set<String> getNodeIdsFromResultId(final SnapshotBase sx, final String id) { + final Set<String> ids = new HashSet<>(); PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, id, ids); return ids; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java index 1c11f96..d3e87c6 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java @@ -26,10 +26,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.fluo.api.client.FluoClient; -import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.periodic.notification.api.LifeCycle; import org.apache.rya.periodic.notification.api.NodeBin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; @@ -39,17 +40,17 @@ import com.google.common.base.Preconditions; */ public class PeriodicQueryPrunerExecutor implements LifeCycle { - private static final Logger log = Logger.getLogger(PeriodicQueryPrunerExecutor.class); - private FluoClient client; - private int numThreads; - private ExecutorService executor; - private BlockingQueue<NodeBin> bins; - private PeriodicQueryResultStorage periodicStorage; - private List<PeriodicQueryPruner> pruners; + private static final Logger log = LoggerFactory.getLogger(PeriodicQueryPrunerExecutor.class); + private final FluoClient client; + private final int numThreads; + private final ExecutorService executor; + private final BlockingQueue<NodeBin> bins; + private final PeriodicQueryResultStorage periodicStorage; + private final List<PeriodicQueryPruner> pruners; private boolean running = false; - public PeriodicQueryPrunerExecutor(PeriodicQueryResultStorage periodicStorage, FluoClient client, int numThreads, - BlockingQueue<NodeBin> bins) { + public PeriodicQueryPrunerExecutor(final PeriodicQueryResultStorage periodicStorage, final FluoClient client, final int numThreads, + final BlockingQueue<NodeBin> bins) { Preconditions.checkArgument(numThreads > 0); this.periodicStorage = periodicStorage; this.numThreads = numThreads; @@ -62,11 +63,11 @@ public class PeriodicQueryPrunerExecutor implements LifeCycle { @Override public void start() { if (!running) { - AccumuloBinPruner accPruner = new AccumuloBinPruner(periodicStorage); - FluoBinPruner fluoPruner = new FluoBinPruner(client); + final AccumuloBinPruner accPruner = new AccumuloBinPruner(periodicStorage); + final FluoBinPruner fluoPruner = new FluoBinPruner(client); for (int threadNumber = 0; threadNumber < numThreads; threadNumber++) { - PeriodicQueryPruner pruner = new PeriodicQueryPruner(fluoPruner, accPruner, client, bins, threadNumber); + final PeriodicQueryPruner pruner = new PeriodicQueryPruner(fluoPruner, accPruner, client, bins, threadNumber); pruners.add(pruner); executor.submit(pruner); } @@ -87,11 +88,11 @@ public class PeriodicQueryPrunerExecutor implements LifeCycle { running = false; } try { - if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); executor.shutdownNow(); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { log.info("Interrupted during shutdown, exiting uncleanly"); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java index f5cd13a..f741d20 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java @@ -42,11 +42,11 @@ import org.slf4j.LoggerFactory; */ public class KafkaNotificationProvider implements LifeCycle { private static final Logger LOG = LoggerFactory.getLogger(KafkaNotificationProvider.class); - private String topic; + private final String topic; private ExecutorService executor; - private NotificationCoordinatorExecutor coord; - private Properties props; - private int numThreads; + private final NotificationCoordinatorExecutor coord; + private final Properties props; + private final int numThreads; private boolean running = false; Deserializer<String> keyDe; Deserializer<CommandNotification> valDe; @@ -54,15 +54,15 @@ public class KafkaNotificationProvider implements LifeCycle { /** * Create KafkaNotificationProvider for reading new notification requests form Kafka - * @param topic - notification topic + * @param topic - notification topic * @param keyDe - Kafka message key deserializer * @param valDe - Kafka message value deserializer * @param props - properties used to creates a {@link KafkaConsumer} * @param coord - {@link NotificationCoordinatorExecutor} for managing and generating notifications * @param numThreads - number of threads used by this notification provider */ - public KafkaNotificationProvider(String topic, Deserializer<String> keyDe, Deserializer<CommandNotification> valDe, Properties props, - NotificationCoordinatorExecutor coord, int numThreads) { + public KafkaNotificationProvider(final String topic, final Deserializer<String> keyDe, final Deserializer<CommandNotification> valDe, final Properties props, + final NotificationCoordinatorExecutor coord, final int numThreads) { this.coord = coord; this.numThreads = numThreads; this.topic = topic; @@ -75,7 +75,7 @@ public class KafkaNotificationProvider implements LifeCycle { @Override public void stop() { if (consumers != null && consumers.size() > 0) { - for (PeriodicNotificationConsumer consumer : consumers) { + for (final PeriodicNotificationConsumer consumer : consumers) { consumer.shutdown(); } } @@ -88,11 +88,12 @@ public class KafkaNotificationProvider implements LifeCycle { LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); executor.shutdownNow(); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOG.info("Interrupted during shutdown, exiting uncleanly"); } } + @Override public void start() { if (!running) { if (!coord.currentlyRunning()) { @@ -102,14 +103,12 @@ public class KafkaNotificationProvider implements LifeCycle { executor = Executors.newFixedThreadPool(numThreads); // now create consumers to consume the messages - int threadNumber = 0; - for (int i = 0; i < numThreads; i++) { - LOG.info("Creating consumer:" + threadNumber); - KafkaConsumer<String, CommandNotification> consumer = new KafkaConsumer<String, CommandNotification>(props, keyDe, valDe); - PeriodicNotificationConsumer periodicConsumer = new PeriodicNotificationConsumer(topic, consumer, threadNumber, coord); + for (int threadNumber = 0; threadNumber < numThreads; threadNumber++) { + LOG.info("Creating consumer: {} for Kafka topic: {}", threadNumber, topic); + final KafkaConsumer<String, CommandNotification> consumer = new KafkaConsumer<String, CommandNotification>(props, keyDe, valDe); + final PeriodicNotificationConsumer periodicConsumer = new PeriodicNotificationConsumer(topic, consumer, threadNumber, coord); consumers.add(periodicConsumer); executor.submit(periodicConsumer); - threadNumber++; } running = true; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java index 6785ce8..f8fbed8 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java @@ -25,9 +25,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; -import org.apache.log4j.Logger; import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Consumer for the {@link KafkaNotificationProvider}. This consumer pull messages @@ -35,52 +36,55 @@ import org.apache.rya.periodic.notification.notification.CommandNotification; * */ public class PeriodicNotificationConsumer implements Runnable { - private KafkaConsumer<String, CommandNotification> consumer; - private int m_threadNumber; - private String topic; + private final KafkaConsumer<String, CommandNotification> consumer; + private final int threadNumber; + private final String topic; private final AtomicBoolean closed = new AtomicBoolean(false); - private NotificationCoordinatorExecutor coord; - private static final Logger LOG = Logger.getLogger(PeriodicNotificationConsumer.class); + private final NotificationCoordinatorExecutor coord; + private static final Logger LOG = LoggerFactory.getLogger(PeriodicNotificationConsumer.class); /** * Creates a new PeriodicNotificationConsumer for consuming new notification requests from * Kafka. * @param topic - new notification topic * @param consumer - consumer for pulling new requests from Kafka - * @param a_threadNumber - number of consumer threads to be used + * @param threadNumber - an identifier for this thread. * @param coord - notification coordinator for managing and generating notifications */ - public PeriodicNotificationConsumer(String topic, KafkaConsumer<String, CommandNotification> consumer, int a_threadNumber, - NotificationCoordinatorExecutor coord) { + public PeriodicNotificationConsumer(final String topic, final KafkaConsumer<String, CommandNotification> consumer, final int threadNumber, + final NotificationCoordinatorExecutor coord) { this.topic = topic; - m_threadNumber = a_threadNumber; + this.threadNumber = threadNumber; this.consumer = consumer; this.coord = coord; } + @Override public void run() { - + try { - LOG.info("Creating kafka stream for consumer:" + m_threadNumber); + LOG.info("Configuring KafkaConsumer on thread: {} to subscribe to topic: {}", threadNumber, topic); consumer.subscribe(Arrays.asList(topic)); while (!closed.get()) { - ConsumerRecords<String, CommandNotification> records = consumer.poll(10000); + final ConsumerRecords<String, CommandNotification> records = consumer.poll(10000); // Handle new records - for(ConsumerRecord<String, CommandNotification> record: records) { - CommandNotification notification = record.value(); - LOG.info("Thread " + m_threadNumber + " is adding notification " + notification + " to queue."); - LOG.info("Message: " + notification); + for(final ConsumerRecord<String, CommandNotification> record: records) { + final CommandNotification notification = record.value(); + LOG.info("Thread {} is adding notification: {}", threadNumber, notification); coord.processNextCommandNotification(notification); } } - } catch (WakeupException e) { + LOG.info("Finished polling."); + } catch (final WakeupException e) { // Ignore exception if closing - if (!closed.get()) throw e; + if (!closed.get()) { + throw e; + } } finally { consumer.close(); } } - + public void shutdown() { closed.set(true); consumer.wakeup(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/tests/pom.xml ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/pom.xml b/extras/periodic.notification/tests/pom.xml index feb1f0f..3f0c413 100644 --- a/extras/periodic.notification/tests/pom.xml +++ b/extras/periodic.notification/tests/pom.xml @@ -1,14 +1,22 @@ -<?xml version="1.0" encoding="utf-8"?> -<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor - license agreements. See the NOTICE file distributed with this work for additional - information regarding copyright ownership. The ASF licenses this file to - you under the Apache License, Version 2.0 (the "License"); you may not use - this file except in compliance with the License. You may obtain a copy of - the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required - by applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. See the License for the specific - language governing permissions and limitations under the License. --> +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java index 3b6062f..92e3276 100644 --- a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java +++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java @@ -126,7 +126,7 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase { producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new CommandNotificationSerializer()); //extract kafka specific properties from application config - app = PeriodicNotificationApplicationFactory.getPeriodicApplication(props); + app = PeriodicNotificationApplicationFactory.getPeriodicApplication(conf); registrar = new KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/tests/src/test/resources/notification.properties ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/src/test/resources/notification.properties b/extras/periodic.notification/tests/src/test/resources/notification.properties index 4b25b93..b5f2a90 100644 --- a/extras/periodic.notification/tests/src/test/resources/notification.properties +++ b/extras/periodic.notification/tests/src/test/resources/notification.properties @@ -1,4 +1,3 @@ -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -15,21 +14,21 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -#/ + accumulo.auths= accumulo.instance="instance" accumulo.user="root" accumulo.password="secret" accumulo.rya.prefix="rya_" accumulo.zookeepers= -fluo.app.name="fluo_app" -fluo.table.name="fluo_table" -kafka.bootstrap.servers=127.0.0.1:9092 -kafka.notification.topic=notifications -kafka.notification.client.id=consumer0 -kafka.notification.group.id=group0 -cep.coordinator.threads=1 -cep.producer.threads=1 -cep.exporter.threads=1 -cep.processor.threads=1 -cep.pruner.threads=1 \ No newline at end of file +rya.pcj.fluo.app.name="fluo_app" +rya.pcj.fluo.table.name="fluo_table" +rya.periodic.notification.kafka.bootstrap.servers=127.0.0.1:9092 +rya.periodic.notification.kafka.topic=notifications +rya.periodic.notification.kafka.client.id=consumer0 +rya.periodic.notification.kafka.group.id=group0 +rya.periodic.notification.coordinator.threads=1 +rya.periodic.notification.producer.threads=1 +rya.periodic.notification.exporter.threads=1 +rya.periodic.notification.processor.threads=1 +rya.periodic.notification.pruner.threads=1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/README.md ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/twill.yarn/README.md b/extras/periodic.notification/twill.yarn/README.md new file mode 100644 index 0000000..0475c9c --- /dev/null +++ b/extras/periodic.notification/twill.yarn/README.md @@ -0,0 +1,18 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/pom.xml ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/twill.yarn/pom.xml b/extras/periodic.notification/twill.yarn/pom.xml new file mode 100644 index 0000000..7b9fd02 --- /dev/null +++ b/extras/periodic.notification/twill.yarn/pom.xml @@ -0,0 +1,98 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.notification.parent</artifactId> + <version>3.2.12-incubating-SNAPSHOT</version> + </parent> + + <artifactId>rya.periodic.notification.twill.yarn</artifactId> + + <name>Apache Rya Periodic Notification Service on Twill on YARN </name> + <description>Twill Application for executing the Apache Rya Periodic Notification Service on YARN</description> + + <dependencies> + <dependency> + <groupId>com.beust</groupId> + <artifactId>jcommander</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>13.0.1</version> <!-- Override Rya's DependencyManagement. The Twill runtime requires 11.0 < guava < 14.0 (Service.listener) (AbstractExecutionThreadService.getServiceName) --> + </dependency> + <dependency> + <groupId>org.apache.twill</groupId> + <artifactId>twill-yarn</artifactId> + <version>0.12.0</version> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.notification.twill</artifactId> + <version>3.2.12-incubating-SNAPSHOT</version> + <classifier>twill-app</classifier> + <scope>provided</scope> <!-- prevent these dependencies from showing up on the classpath --> + </dependency> + + <!-- Add Accumulo as a runtime dependency --> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-core</artifactId> + <scope>runtime</scope> + <exclusions> + <!-- exclude logging implementations --> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>create-binary-distribution</id> + <goals> + <goal>single</goal> + </goals> + <phase>package</phase> + <configuration> + <tarLongFileMode>gnu</tarLongFileMode> + <descriptors> + <descriptor>src/main/assembly/binary-release.xml</descriptor> + </descriptors> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/assembly/binary-release.xml ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/twill.yarn/src/main/assembly/binary-release.xml b/extras/periodic.notification/twill.yarn/src/main/assembly/binary-release.xml new file mode 100644 index 0000000..a3bba3e --- /dev/null +++ b/extras/periodic.notification/twill.yarn/src/main/assembly/binary-release.xml @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd"> + <id>bin</id> + <formats> + <format>tar.gz</format> + </formats> + <includeBaseDirectory>true</includeBaseDirectory> + <componentDescriptors> + <componentDescriptor>src/main/assembly/component-release.xml</componentDescriptor> + </componentDescriptors> +</assembly> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/assembly/component-release.xml ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/twill.yarn/src/main/assembly/component-release.xml b/extras/periodic.notification/twill.yarn/src/main/assembly/component-release.xml new file mode 100644 index 0000000..8aabf93 --- /dev/null +++ b/extras/periodic.notification/twill.yarn/src/main/assembly/component-release.xml @@ -0,0 +1,104 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<component + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/component/1.1.3" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/component/1.1.3 http://maven.apache.org/xsd/component-1.1.3.xsd"> + <fileSets> + <fileSet> + <directory>src/main/scripts</directory> + <outputDirectory>bin</outputDirectory> + <directoryMode>0755</directoryMode> + <fileMode>0755</fileMode> + <includes> + <include>*.sh</include> + </includes> + <lineEnding>unix</lineEnding> + <filtered>true</filtered> + </fileSet> + <fileSet> + <directory>src/main/config</directory> + <outputDirectory>conf</outputDirectory> + <directoryMode>0755</directoryMode> + <fileMode>0644</fileMode> + <includes> + <include>*.xml</include> + <include>*.properties</include> + <include>hadoop/*.xml</include> + </includes> + <lineEnding>unix</lineEnding> + </fileSet> + <fileSet> + <directory>src/main/config</directory> + <outputDirectory>conf</outputDirectory> + <directoryMode>0755</directoryMode> + <fileMode>0755</fileMode> + <includes> + <include>*.sh</include> + </includes> + <lineEnding>unix</lineEnding> + </fileSet> + + <!-- create an empty directory for log files --> + <fileSet> + <directory>src/main/assembly</directory> + <outputDirectory>logs</outputDirectory> + <directoryMode>755</directoryMode> + <excludes> + <exclude>*</exclude> + </excludes> + </fileSet> + </fileSets> + <dependencySets> + <dependencySet> + <outputDirectory>lib</outputDirectory> + <useTransitiveFiltering>true</useTransitiveFiltering> + <scope>runtime</scope> + <!-- exclude hadoop, zookeeper jars --> + <excludes> + <exclude>org.apache.accumulo:*</exclude> + <exclude>org.apache.hadoop:*</exclude> + <exclude>org.apache.zookeeper:*</exclude> + </excludes> + </dependencySet> + <dependencySet> + <outputDirectory>lib</outputDirectory> + <scope>provided</scope> + <includes> + <include>org.apache.rya:rya.periodic.notification.twill:*:twill-app</include> + </includes> + </dependencySet> + <dependencySet> + <outputDirectory>lib-ahz</outputDirectory> + <useTransitiveFiltering>true</useTransitiveFiltering> + <scope>runtime</scope> + <!-- store the hadoop, zookeeper jars in a specific dir --> + <includes> + <include>org.apache.accumulo:*</include> + <include>org.apache.hadoop:*</include> + <include>org.apache.zookeeper:*</include> + </includes> + <excludes> + <exclude>log4j:log4j</exclude> <!-- twill uses logback & slf4j. --> + </excludes> + </dependencySet> + + </dependencySets> +</component> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/config/hadoop/core-site.xml ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/twill.yarn/src/main/config/hadoop/core-site.xml b/extras/periodic.notification/twill.yarn/src/main/config/hadoop/core-site.xml new file mode 100644 index 0000000..fa31186 --- /dev/null +++ b/extras/periodic.notification/twill.yarn/src/main/config/hadoop/core-site.xml @@ -0,0 +1,25 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<configuration> + <property> + <name>fs.defaultFS</name> + <value>hdfs://hdfs-namenode-host:8020/</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/config/hadoop/yarn-site.xml ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/twill.yarn/src/main/config/hadoop/yarn-site.xml b/extras/periodic.notification/twill.yarn/src/main/config/hadoop/yarn-site.xml new file mode 100644 index 0000000..a9de6dc --- /dev/null +++ b/extras/periodic.notification/twill.yarn/src/main/config/hadoop/yarn-site.xml @@ -0,0 +1,25 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<configuration> + <property> + <name>yarn.resourcemanager.hostname</name> + <value>yarn-resourcemanager-host</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/config/logback.xml ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/twill.yarn/src/main/config/logback.xml b/extras/periodic.notification/twill.yarn/src/main/config/logback.xml new file mode 100644 index 0000000..ed36338 --- /dev/null +++ b/extras/periodic.notification/twill.yarn/src/main/config/logback.xml @@ -0,0 +1,57 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<configuration> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{48} - %msg%n</pattern> + </encoder> + </appender> + + <appender name="FILE" class="ch.qos.logback.core.FileAppender"> + <file>logs/PeriodicNotificationApp.log</file> + <append>true</append> + <!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{48} - %msg%n</pattern> + <!-- set immediateFlush to false for much higher logging throughput --> + <immediateFlush>false</immediateFlush> + <charset>UTF-8</charset> + </encoder> + </appender> + + <logger name="org.apache.rya" level="DEBUG" /> + <logger name="org.apache.accumulo" level="INFO" /> + <logger name="org.apache.hadoop" level="INFO" /> + <logger name="org.apache.fluo" level="INFO" /> + <logger name="fluo.tx" level="INFO" /> + <logger name="kafka" level="INFO" /> + <logger name="org.apache.kafka" level="INFO" /> + <logger name="org.apache.zookeeper" level="INFO" /> + <logger name="org.apache.curator" level="INFO" /> + <logger name="org.apache.twill" level="INFO" /> + + <root level="DEBUG"> + <appender-ref ref="STDOUT" /> + <appender-ref ref="FILE" /> + </root> + +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/config/notification.properties ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/twill.yarn/src/main/config/notification.properties b/extras/periodic.notification/twill.yarn/src/main/config/notification.properties new file mode 100644 index 0000000..6d7b47b --- /dev/null +++ b/extras/periodic.notification/twill.yarn/src/main/config/notification.properties @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# String of Accumulo authorizations. +#accumulo.auths= + +# Accumulo instance name (required) +accumulo.instance= + +# Accumulo user (required) +accumulo.user= + +# Accumulo password (required) +accumulo.password= + +# Prefix for Accumulo backed Rya instance. +#accumulo.rya.prefix=rya_ + +# Zookeepers for underlying Accumulo instance (required) +accumulo.zookeepers= + +# Name of Fluo Application (required) +rya.pcj.fluo.app.name= + +# Name of Fluo Table (required) +rya.pcj.fluo.table.name= + +# Kafka Bootstrap servers for Producers and Consumers (required) +rya.periodic.notification.kafka.bootstrap.servers= + +# Topic to which new Periodic Notifications are published. +#rya.periodic.notification.kafka.topic=notifications + +# Client Id for notification topic. +#rya.periodic.notification.kafka.client.id=consumer0 + +# Group Id for notification topic. +#rya.periodic.notification.kafka.group.id=group0 + +# Number of threads used by coordinator. +#rya.periodic.notification.coordinator.threads=1 + +# Number of threads used by producer. +#rya.periodic.notification.producer.threads=1 + +# Number of threads used by exporter. +#rya.periodic.notification.exporter.threads=1 + +# Number of threads used by processor. +#rya.periodic.notification.processor.threads=1 + +# Number of threads used by pruner. +#rya.periodic.notification.pruner.threads=1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/config/twill-env.sh ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/twill.yarn/src/main/config/twill-env.sh b/extras/periodic.notification/twill.yarn/src/main/config/twill-env.sh new file mode 100644 index 0000000..f8716de --- /dev/null +++ b/extras/periodic.notification/twill.yarn/src/main/config/twill-env.sh @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +#addToClasspath() +#{ +# local dir=$1 +# local filterRegex=$2 + + +# if [ ! -d "$dir" ]; then +# echo "ERROR $dir does not exist or not a directory" +# exit 1 +# fi + +# for jar in $dir/*.jar; do +# if ! [[ $jar =~ $filterRegex ]]; then +# CLASSPATH="$CLASSPATH:$jar" +# fi +# done +#} + +loadLocalAHZLibraries() +{ +# Specify a hadoop configuration directory to use for connecting to the YARN cluster. +# At a minimum, this directory should contain core-site.xml and yarn-site.xml. +#HADOOP_CONF_DIR=/etc/hadoop/conf +HADOOP_CONF_DIR=conf/hadoop + +# use local libraries (lib-ahz) for the classpath and append the hadoop conf directory +TWILL_CP=lib/*:lib-ahz/*:$HADOOP_CONF_DIR +} + +loadSystemAHZLibraries() +{ +#EXCLUDE_RE="(.*log4j.*)|(.*asm.*)|(.*guava.*)|(.*gson.*)" +#addToClasspath "$ACCUMULO_HOME/lib" $EXCLUDE_RE + +# or use the hadoop classpath as specified by your path's hadoop command +# TODO add excludes for jars that should not be included +HADOOP_CP=$(hadoop classpath) + +# use the +TWILL_CP=lib/*:$HADOOP_CP +} + +# Select which method to use for resolving Accumulo, Hadoop, and Zookeeper libraries and configuration. +loadLocalAHZLibraries +#loadSystemAHZLibraries