Repository: incubator-fluo-recipes Updated Branches: refs/heads/master d7347ee15 -> 6f5177363
Fixes #102 - Simplify Accumulo export queue recipe * AccumuloExporter is now abstract class that is implemented by user and handles writing mutation given by user to Accumulo. * AccumuloExport objects are no longer placed on queue * Renamed SharedBatchWriter to AccumuloWriter * AccumuloExportQueue class contains code for configuring queue Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/6f517736 Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/6f517736 Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/6f517736 Branch: refs/heads/master Commit: 6f5177363ca309c35403fd822e3270559e9dfdbd Parents: d7347ee Author: Mike Walch <mwa...@apache.org> Authored: Wed Sep 7 12:04:44 2016 -0400 Committer: Mike Walch <mwa...@apache.org> Committed: Wed Sep 21 12:30:22 2016 -0400 ---------------------------------------------------------------------- docs/accumulo-export-queue.md | 89 ++++++ docs/accumulo-export.md | 57 ---- docs/export-queue.md | 2 +- modules/accumulo/pom.xml | 4 - .../recipes/accumulo/export/AccumuloExport.java | 37 --- .../accumulo/export/AccumuloExportQueue.java | 278 +++++++++++++++++++ .../accumulo/export/AccumuloExporter.java | 45 +-- .../accumulo/export/AccumuloReplicator.java | 83 ++++++ .../accumulo/export/DifferenceExport.java | 101 ------- .../accumulo/export/ReplicationExport.java | 82 ------ .../accumulo/export/SharedBatchWriter.java | 149 ---------- .../fluo/recipes/accumulo/export/TableInfo.java | 36 --- .../accumulo/export/AccumuloExportTest.java | 102 +++++++ .../accumulo/export/DifferenceExportTest.java | 105 ------- .../fluo/recipes/core/export/ExportQueue.java | 4 + .../recipes/test/export/AccumuloExporterIT.java | 32 +-- .../test/export/AccumuloReplicatorIT.java | 47 ++-- .../recipes/test/export/SimpleExporter.java | 33 +++ .../fluo/recipes/test/export/TestExport.java | 40 --- 19 files changed, 641 insertions(+), 685 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/docs/accumulo-export-queue.md ---------------------------------------------------------------------- diff --git a/docs/accumulo-export-queue.md b/docs/accumulo-export-queue.md new file mode 100644 index 0000000..dde04fb --- /dev/null +++ b/docs/accumulo-export-queue.md @@ -0,0 +1,89 @@ +# Accumulo Export Queue Specialization + +## Background + +The [Export Queue Recipe][1] provides a generic foundation for building export mechanism to any +external data store. The [AccumuloExportQueue] provides an implementation of this recipe for +Accumulo. The [AccumuloExportQueue] is located the 'fluo-recipes-accumulo' module and provides the +following functionality: + + * Safely batches writes to Accumulo made by multiple transactions exporting data. + * Stores Accumulo connection information in Fluo configuration, making it accessible by Export + Observers running on other nodes. + * Provides utility code that make it easier and shorter to code common Accumulo export patterns. + +## Example Use + +Exporting to Accumulo is easy. Follow the steps below: + +1. Implement a class that extends [AccumuloExporter]. This class will process exported objects that + are placed on your export queue. For example, the `SimpleExporter` class below processes String + key/value exports and generates mutations for Accumulo. + + ```java + public class SimpleExporter extends AccumuloExporter<String, String> { + + @Override + protected Collection<Mutation> processExport(SequencedExport<String, String> export) { + Mutation m = new Mutation(export.getKey()); + m.put("cf", "cq", export.getSequence(), export.getValue()); + return Collections.singleton(m); + } + } + ``` + +2. With a `SimpleExporter` created, configure a [AccumuloExportQueue] to use `SimpleExporter` and + give it information on how to connect to Accumulo. + + ```java + + FluoConfiguration fluoConfig = ...; + + // Set accumulo configuration + String instance = // Name of accumulo instance exporting to + String zookeepers = // Zookeepers used by Accumulo instance exporting to + String user = // Accumulo username, user that can write to exportTable + String password = // Accumulo user password + String exportTable = // Name of table to export to + + // Configure accumulo export queue + AccumuloExportQueue.configure(fluoConfig, new ExportQueue.Options(EXPORT_QUEUE_ID, + SimpleExporter.class.getName(), String.class.getName(), String.class.getName(), numMapBuckets), + new AccumuloExportQueue.Options(instance, zookeepers, user, password, exportTable)); + + // Initialize Fluo using fluoConfig + ``` + +3. Export queues can be retrieved in Fluo observers and objects can be added to them: + + ```java + public class MyObserver extends AbstractObserver { + + ExportQueue<String, String> exportQ; + + @Override + public void init(Context context) throws Exception { + exportQ = ExportQueue.getInstance(EXPORT_QUEUE_ID, context.getAppConfiguration()); + } + + @Override + public void process(TransactionBase tx, Bytes row, Column col) { + + // Read some data and do some work + + // Add results to export queue + String key = // key that identifies export + String value = // object to export + export.add(tx, key, value); + } + ``` + +## Other use cases + +[AccumuloReplicator] is a specialized [AccumuloExporter] that replicates a Fluo table to Accumulo. + +[1]: export-queue.md +[AccumuloExportQueue]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java +[AccumuloExporter]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java +[AccumuloReplicator]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java + http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/docs/accumulo-export.md ---------------------------------------------------------------------- diff --git a/docs/accumulo-export.md b/docs/accumulo-export.md deleted file mode 100644 index 4604f2c..0000000 --- a/docs/accumulo-export.md +++ /dev/null @@ -1,57 +0,0 @@ -# Accumulo Export Queue Specialization - -## Background - -The [Export Queue Recipe][1] provides a generic foundation for building export -mechanism to any external data store. A specific Export Queue implementation -for Accumulo is provided in the Fluo Recipes Accumulo module. - -This implementation provides the following functionality : - - * Safely batches writes to Accumulo made by multiple transactions exporting data. - * Stores Accumulo connection information in Fluo configuration, making it accessible by Export Observers running on other nodes. - * Provides utility code that make it easier and shorter to code common Accumulo export patterns. - -## Example Use - -Exporting to Accumulo is easy. Only two things need to be done. - - * Configure the export queue. - * Implement [AccumuloExport][2] with custom code that generates Accumulo mutations. - -The following code shows how to configure an Export Queue that will write to an -external Accumulo table. - -```java - -FluoConfiguration fluoConfig = ...; - -//Configure an export queue to use the classes Fluo Recipes provides for -//exporting to Accumulo -ExportQueue.configure(fluoConfig, new ExportQueue.Options(EXPORT_QUEUE_ID, - AccumuloExporter.class.getName(), String.class.getName(), AccumuloExport.class.getName(), - numMapBuckets)); - -String instance = //Name of accumulo instance exporting to -String zookeepers = //zookeepers used by Accumulo instance exporting to -String user = //Accumulo username, user that can write to exportTable -String password = //Accumulo user password -String exportTable = //Name of table to export to - -//Configure the Accumulo table to export to. -AccumuloExporter.setExportTableInfo(fluoConfig, EXPORT_QUEUE_ID, - new TableInfo(instance, zookeepers, user, password, exportTable)); - -//initialize Fluo using fluoConfig - -``` - -After the export queue is initialized as specified above, any Object that -implements [AccumuloExport][2] can be added to the queue. For the common -pattern of deleting old data and inserting new data, consider extending -[DifferenceExport][3]. - -[1]: export-queue.md -[2]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExport.java -[3]: ../modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java - http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/docs/export-queue.md ---------------------------------------------------------------------- diff --git a/docs/export-queue.md b/docs/export-queue.md index 0120113..c366e9d 100644 --- a/docs/export-queue.md +++ b/docs/export-queue.md @@ -281,5 +281,5 @@ example of write skew mentioned in the Percolater paper. [1]: ../modules/core/src/main/java/org/apache/fluo/recipes/core/export/Exporter.java [2]: https://en.wikipedia.org/wiki/Serializability -[3]: accumulo-export.md +[3]: accumulo-export-queue.md http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/pom.xml ---------------------------------------------------------------------- diff --git a/modules/accumulo/pom.xml b/modules/accumulo/pom.xml index 03df1e5..945b9fc 100644 --- a/modules/accumulo/pom.xml +++ b/modules/accumulo/pom.xml @@ -25,10 +25,6 @@ <name>Fluo Recipes Accumulo</name> <dependencies> <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExport.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExport.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExport.java deleted file mode 100644 index 9b31b7c..0000000 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExport.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.accumulo.export; - -import java.util.Collection; - -import org.apache.accumulo.core.data.Mutation; - -/** - * Implemented by users to export data to Accumulo. - * - * @param <K> Export queue key type - * @since 1.0.0 - */ -public interface AccumuloExport<K> { - - /** - * Creates mutations for export from user's data - * - * @param key Export queue key - * @param seq Export sequence number - */ - Collection<Mutation> toMutations(K key, long seq); -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java new file mode 100644 index 0000000..fdba5f3 --- /dev/null +++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportQueue.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.accumulo.export; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Mutation; +import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.api.config.SimpleConfiguration; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.RowColumn; +import org.apache.fluo.recipes.core.export.ExportQueue; + +public class AccumuloExportQueue { + + /** + * Configures AccumuloExportQueue + * + * @param fc Fluo configuration + * @param eqo Export queue options + * @param ao Accumulo export queue options + */ + public static void configure(FluoConfiguration fc, ExportQueue.Options eqo, Options ao) { + ExportQueue.configure(fc, eqo); + AccumuloWriter.setConfig(fc.getAppConfiguration(), eqo.getQueueId(), ao); + } + + /** + * Generates Accumulo mutations by comparing the differences between a RowColumn/Bytes map that is + * generated for old and new data and represents how the data should exist in Accumulo. When + * comparing each row/column/value (RCV) of old and new data, mutations are generated using the + * following rules: + * <ul> + * <li>If old and new data have the same RCV, nothing is done. + * <li>If old and new data have same row/column but different values, an update mutation is + * created for the row/column. + * <li>If old data has a row/column that is not in the new data, a delete mutation is generated. + * <li>If new data has a row/column that is not in the old data, an insert mutation is generated. + * <li>Only one mutation is generated per row. + * <li>The export sequence number is used for the timestamp in the mutation. + * </ul> + * + * @param oldData Map containing old row/column data + * @param newData Map containing new row/column data + * @param seq Export sequence number + */ + public static Collection<Mutation> generateMutations(long seq, Map<RowColumn, Bytes> oldData, + Map<RowColumn, Bytes> newData) { + Map<Bytes, Mutation> mutationMap = new HashMap<>(); + for (Map.Entry<RowColumn, Bytes> entry : oldData.entrySet()) { + RowColumn rc = entry.getKey(); + if (!newData.containsKey(rc)) { + Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray())); + m.putDelete(rc.getColumn().getFamily().toArray(), rc.getColumn().getQualifier().toArray(), + seq); + } + } + for (Map.Entry<RowColumn, Bytes> entry : newData.entrySet()) { + RowColumn rc = entry.getKey(); + Column col = rc.getColumn(); + Bytes newVal = entry.getValue(); + Bytes oldVal = oldData.get(rc); + if (oldVal == null || !oldVal.equals(newVal)) { + Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray())); + m.put(col.getFamily().toArray(), col.getQualifier().toArray(), seq, newVal.toArray()); + } + } + return mutationMap.values(); + } + + /** + * Writes mutations to Accumulo using a shared batch writer + * + * @since 1.0.0 + */ + static class AccumuloWriter { + + private static class Mutations { + List<Mutation> mutations; + CountDownLatch cdl = new CountDownLatch(1); + + Mutations(Collection<Mutation> mutations) { + this.mutations = new ArrayList<>(mutations); + } + } + + /** + * Sets AccumuloWriter config in app configuration + */ + static void setConfig(SimpleConfiguration sc, String id, Options ac) { + String prefix = "recipes.accumulo.writer." + id; + sc.setProperty(prefix + ".instance", ac.instanceName); + sc.setProperty(prefix + ".zookeepers", ac.zookeepers); + sc.setProperty(prefix + ".user", ac.user); + sc.setProperty(prefix + ".password", ac.password); + sc.setProperty(prefix + ".table", ac.table); + } + + /** + * Gets Accumulo Options from app configuration + */ + static Options getConfig(SimpleConfiguration sc, String id) { + String prefix = "recipes.accumulo.writer." + id; + String instanceName = sc.getString(prefix + ".instance"); + String zookeepers = sc.getString(prefix + ".zookeepers"); + String user = sc.getString(prefix + ".user"); + String password = sc.getString(prefix + ".password"); + String table = sc.getString(prefix + ".table"); + return new Options(instanceName, zookeepers, user, password, table); + } + + private static class ExportTask implements Runnable { + + private BatchWriter bw; + + ExportTask(String instanceName, String zookeepers, String user, String password, String table) + throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + ZooKeeperInstance zki = + new ZooKeeperInstance(new ClientConfiguration().withInstance(instanceName).withZkHosts( + zookeepers)); + + // TODO need to close batch writer + Connector conn = zki.getConnector(user, new PasswordToken(password)); + try { + bw = conn.createBatchWriter(table, new BatchWriterConfig()); + } catch (TableNotFoundException tnfe) { + try { + conn.tableOperations().create(table); + } catch (TableExistsException e) { + // nothing to do + } + + bw = conn.createBatchWriter(table, new BatchWriterConfig()); + } + } + + @Override + public void run() { + + ArrayList<Mutations> exports = new ArrayList<>(); + + while (true) { + try { + exports.clear(); + + // gather export from all threads that have placed an item on the queue + exports.add(exportQueue.take()); + exportQueue.drainTo(exports); + + for (Mutations ml : exports) { + bw.addMutations(ml.mutations); + } + + bw.flush(); + + // notify all threads waiting after flushing + for (Mutations ml : exports) { + ml.cdl.countDown(); + } + + } catch (InterruptedException | MutationsRejectedException e) { + throw new RuntimeException(e); + } + } + } + + } + + private static LinkedBlockingQueue<Mutations> exportQueue = null; + + private AccumuloWriter(String instanceName, String zookeepers, String user, String password, + String table) { + + // TODO: fix this write to static and remove findbugs max rank override in pom.xml + exportQueue = new LinkedBlockingQueue<>(10000); + + try { + Thread queueProcessingTask = + new Thread(new ExportTask(instanceName, zookeepers, user, password, table)); + queueProcessingTask.setDaemon(true); + queueProcessingTask.start(); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + private static Map<String, AccumuloWriter> exporters = new HashMap<>(); + + + static AccumuloWriter getInstance(SimpleConfiguration sc, String id) { + return getInstance(getConfig(sc, id)); + } + + static AccumuloWriter getInstance(Options ac) { + return getInstance(ac.instanceName, ac.zookeepers, ac.user, ac.password, ac.table); + } + + static synchronized AccumuloWriter getInstance(String instanceName, String zookeepers, + String user, String password, String table) { + + String key = + instanceName + ":" + zookeepers + ":" + user + ":" + password.hashCode() + ":" + table; + + AccumuloWriter ret = exporters.get(key); + + if (ret == null) { + ret = new AccumuloWriter(instanceName, zookeepers, user, password, table); + exporters.put(key, ret); + } + + return ret; + } + + void write(Collection<Mutation> mutations) { + Mutations work = new Mutations(mutations); + exportQueue.add(work); + try { + work.cdl.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + } + + /** + * Accumulo export queue options + * + * @since 1.0.0 + */ + public static class Options { + String instanceName; + String zookeepers; + String user; + String password; + String table; + + public Options(String instanceName, String zookeepers, String user, String password, + String table) { + this.instanceName = instanceName; + this.zookeepers = zookeepers; + this.user = user; + this.password = password; + this.table = table; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java index 7e48d12..54fc8ca 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java @@ -16,60 +16,45 @@ package org.apache.fluo.recipes.accumulo.export; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import org.apache.accumulo.core.data.Mutation; -import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.SimpleConfiguration; import org.apache.fluo.api.observer.Observer.Context; import org.apache.fluo.recipes.core.export.Exporter; import org.apache.fluo.recipes.core.export.SequencedExport; /** - * An {@link Exporter} that takes {@link AccumuloExport} objects and writes mutations to Accumulo + * An Accumulo-specific {@link Exporter} that writes mutations to Accumulo using a + * {@link AccumuloExportQueue.AccumuloWriter} * - * @param <K> Export queue key type * @since 1.0.0 */ -public class AccumuloExporter<K> extends Exporter<K, AccumuloExport<K>> { +public abstract class AccumuloExporter<K, V> extends Exporter<K, V> { - private SharedBatchWriter sbw; + private AccumuloExportQueue.AccumuloWriter accumuloWriter; @Override public void init(String queueId, Context context) throws Exception { - - SimpleConfiguration appConf = context.getAppConfiguration(); - - String instanceName = appConf.getString("recipes.accumuloExporter." + queueId + ".instance"); - String zookeepers = appConf.getString("recipes.accumuloExporter." + queueId + ".zookeepers"); - String user = appConf.getString("recipes.accumuloExporter." + queueId + ".user"); - // TODO look into using delegation token - String password = appConf.getString("recipes.accumuloExporter." + queueId + ".password"); - String table = appConf.getString("recipes.accumuloExporter." + queueId + ".table"); - - sbw = SharedBatchWriter.getInstance(instanceName, zookeepers, user, password, table); - } - - public static void setExportTableInfo(FluoConfiguration fconf, String queueId, TableInfo ti) { - SimpleConfiguration appConf = fconf.getAppConfiguration(); - appConf.setProperty("recipes.accumuloExporter." + queueId + ".instance", ti.instanceName); - appConf.setProperty("recipes.accumuloExporter." + queueId + ".zookeepers", ti.zookeepers); - appConf.setProperty("recipes.accumuloExporter." + queueId + ".user", ti.user); - appConf.setProperty("recipes.accumuloExporter." + queueId + ".password", ti.password); - appConf.setProperty("recipes.accumuloExporter." + queueId + ".table", ti.table); + accumuloWriter = + AccumuloExportQueue.AccumuloWriter.getInstance(context.getAppConfiguration(), queueId); } @Override - protected void processExports(Iterator<SequencedExport<K, AccumuloExport<K>>> exports) { + protected void processExports(Iterator<SequencedExport<K, V>> exports) { + ArrayList<Mutation> buffer = new ArrayList<>(); while (exports.hasNext()) { - SequencedExport<K, AccumuloExport<K>> export = exports.next(); - buffer.addAll(export.getValue().toMutations(export.getKey(), export.getSequence())); + SequencedExport<K, V> export = exports.next(); + buffer.addAll(processExport(export)); } if (buffer.size() > 0) { - sbw.write(buffer); + accumuloWriter.write(buffer); } } + + protected abstract Collection<Mutation> processExport(SequencedExport<K, V> export); + } http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java new file mode 100644 index 0000000..aaea742 --- /dev/null +++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.accumulo.export; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Predicate; + +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.recipes.core.export.SequencedExport; +import org.apache.fluo.recipes.core.transaction.LogEntry; +import org.apache.fluo.recipes.core.transaction.TxLog; + +/** + * An {@link AccumuloExporter} that replicates data to Accumulo using a {@link TxLog} + */ +public class AccumuloReplicator extends AccumuloExporter<String, TxLog> { + + @Override + protected Collection<Mutation> processExport(SequencedExport<String, TxLog> export) { + return generateMutations(export.getSequence(), export.getValue()); + } + + /** + * Returns LogEntry filter for Accumulo replication + */ + public static Predicate<LogEntry> getFilter() { + return le -> le.getOp().equals(LogEntry.Operation.DELETE) + || le.getOp().equals(LogEntry.Operation.SET); + } + + /** + * Generates Accumulo mutations from a Transaction log. Used to Replicate Fluo table to Accumulo. + * + * @param txLog Transaction log + * @param seq Export sequence number + * @return Collection of mutations + */ + public static Collection<Mutation> generateMutations(long seq, TxLog txLog) { + Map<Bytes, Mutation> mutationMap = new HashMap<>(); + for (LogEntry le : txLog.getLogEntries()) { + LogEntry.Operation op = le.getOp(); + Column col = le.getColumn(); + byte[] cf = col.getFamily().toArray(); + byte[] cq = col.getQualifier().toArray(); + byte[] cv = col.getVisibility().toArray(); + if (op.equals(LogEntry.Operation.DELETE) || op.equals(LogEntry.Operation.SET)) { + Mutation m = mutationMap.computeIfAbsent(le.getRow(), k -> new Mutation(k.toArray())); + if (op.equals(LogEntry.Operation.DELETE)) { + if (col.isVisibilitySet()) { + m.putDelete(cf, cq, new ColumnVisibility(cv), seq); + } else { + m.putDelete(cf, cq, seq); + } + } else { + if (col.isVisibilitySet()) { + m.put(cf, cq, new ColumnVisibility(cv), seq, le.getValue().toArray()); + } else { + m.put(cf, cq, seq, le.getValue().toArray()); + } + } + } + } + return mutationMap.values(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java deleted file mode 100644 index d6fefa7..0000000 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/DifferenceExport.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.accumulo.export; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; - -import com.google.common.base.Preconditions; -import org.apache.accumulo.core.data.Mutation; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.RowColumn; - -/** - * Implemented by users to export data to Accumulo by comparing the differences between a - * RowColumn/Bytes map that is generated for old and new data and represents how the data should - * exist in Accumulo. When comparing each row/column/value (RCV) of old and new data, mutations are - * generated using the following rules: - * <ul> - * <li>If old and new data have the same RCV, nothing is done. - * <li>If old and new data have same row/column but different values, an update mutation is created - * for the row/column. - * <li>If old data has a row/column that is not in the new data, a delete mutation is generated. - * <li>If new data has a row/column that is not in the old data, an insert mutation is generated. - * <li>Only one mutation is generated per row. - * <li>The export sequence number is used for the timestamp in the mutation. - * </ul> - * - * @param <K> Export queue key type - * @param <V> Type of export value object used to generate data - * @since 1.0.0 - */ -public abstract class DifferenceExport<K, V> implements AccumuloExport<K> { - - private Optional<V> oldVal; - private Optional<V> newVal; - - public DifferenceExport() {} - - public DifferenceExport(Optional<V> oldVal, Optional<V> newVal) { - Objects.requireNonNull(oldVal); - Objects.requireNonNull(newVal); - Preconditions.checkArgument(oldVal.isPresent() || newVal.isPresent(), - "At least one value must be set"); - this.oldVal = oldVal; - this.newVal = newVal; - } - - /** - * Generates RowColumn/Bytes map of how data should exist in Accumulo. This map is generated for - * old and new data and compared to create export mutations that will be written to Accumulo. - * - * @param key Export queue key - * @param val Export value object - * @return RowColumn/Bytes map of how data should exist in Accumulo - */ - protected abstract Map<RowColumn, Bytes> generateData(K key, Optional<V> val); - - @Override - public Collection<Mutation> toMutations(K key, long seq) { - Map<RowColumn, Bytes> oldData = generateData(key, oldVal); - Map<RowColumn, Bytes> newData = generateData(key, newVal); - - Map<Bytes, Mutation> mutationMap = new HashMap<>(); - for (Map.Entry<RowColumn, Bytes> entry : oldData.entrySet()) { - RowColumn rc = entry.getKey(); - if (!newData.containsKey(rc)) { - Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray())); - m.putDelete(rc.getColumn().getFamily().toArray(), rc.getColumn().getQualifier().toArray(), - seq); - } - } - for (Map.Entry<RowColumn, Bytes> entry : newData.entrySet()) { - RowColumn rc = entry.getKey(); - Column col = rc.getColumn(); - Bytes newVal = entry.getValue(); - Bytes oldVal = oldData.get(rc); - if (oldVal == null || !oldVal.equals(newVal)) { - Mutation m = mutationMap.computeIfAbsent(rc.getRow(), r -> new Mutation(r.toArray())); - m.put(col.getFamily().toArray(), col.getQualifier().toArray(), seq, newVal.toArray()); - } - } - return mutationMap.values(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java deleted file mode 100644 index 9276c6d..0000000 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/ReplicationExport.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.accumulo.export; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.function.Predicate; - -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.recipes.core.transaction.LogEntry; -import org.apache.fluo.recipes.core.transaction.TxLog; - -/** - * An implementation of {@link AccumuloExport} that replicates a Fluo table to Accumulo using a - * TxLog - * - * @param <K> Type of export queue key - * @since 1.0.0 - */ -public class ReplicationExport<K> implements AccumuloExport<K> { - - private TxLog txLog; - - public ReplicationExport() {} - - public ReplicationExport(TxLog txLog) { - Objects.requireNonNull(txLog); - this.txLog = txLog; - } - - public static Predicate<LogEntry> getFilter() { - return le -> le.getOp().equals(LogEntry.Operation.DELETE) - || le.getOp().equals(LogEntry.Operation.SET); - } - - @Override - public Collection<Mutation> toMutations(K key, long seq) { - Map<Bytes, Mutation> mutationMap = new HashMap<>(); - for (LogEntry le : txLog.getLogEntries()) { - LogEntry.Operation op = le.getOp(); - Column col = le.getColumn(); - byte[] cf = col.getFamily().toArray(); - byte[] cq = col.getQualifier().toArray(); - byte[] cv = col.getVisibility().toArray(); - if (op.equals(LogEntry.Operation.DELETE) || op.equals(LogEntry.Operation.SET)) { - Mutation m = mutationMap.computeIfAbsent(le.getRow(), k -> new Mutation(k.toArray())); - if (op.equals(LogEntry.Operation.DELETE)) { - if (col.isVisibilitySet()) { - m.putDelete(cf, cq, new ColumnVisibility(cv), seq); - } else { - m.putDelete(cf, cq, seq); - } - } else { - if (col.isVisibilitySet()) { - m.put(cf, cq, new ColumnVisibility(cv), seq, le.getValue().toArray()); - } else { - m.put(cf, cq, seq, le.getValue().toArray()); - } - } - } - } - return mutationMap.values(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/SharedBatchWriter.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/SharedBatchWriter.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/SharedBatchWriter.java deleted file mode 100644 index 9d189e6..0000000 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/SharedBatchWriter.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.accumulo.export; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.Mutation; - -class SharedBatchWriter { - - private static class Mutations { - - List<Mutation> mutations; - CountDownLatch cdl = new CountDownLatch(1); - - Mutations(Collection<Mutation> mutations) { - this.mutations = new ArrayList<>(mutations); - } - } - - private static class ExportTask implements Runnable { - - private BatchWriter bw; - - ExportTask(String instanceName, String zookeepers, String user, String password, String table) - throws TableNotFoundException, AccumuloException, AccumuloSecurityException { - ZooKeeperInstance zki = - new ZooKeeperInstance(new ClientConfiguration().withInstance(instanceName).withZkHosts( - zookeepers)); - - // TODO need to close batch writer - Connector conn = zki.getConnector(user, new PasswordToken(password)); - try { - bw = conn.createBatchWriter(table, new BatchWriterConfig()); - } catch (TableNotFoundException tnfe) { - try { - conn.tableOperations().create(table); - } catch (TableExistsException e) { - // nothing to do - } - - bw = conn.createBatchWriter(table, new BatchWriterConfig()); - } - } - - @Override - public void run() { - - ArrayList<Mutations> exports = new ArrayList<>(); - - while (true) { - try { - exports.clear(); - - // gather export from all threads that have placed an item on the queue - exports.add(exportQueue.take()); - exportQueue.drainTo(exports); - - for (Mutations ml : exports) { - bw.addMutations(ml.mutations); - } - - bw.flush(); - - // notify all threads waiting after flushing - for (Mutations ml : exports) { - ml.cdl.countDown(); - } - - } catch (InterruptedException | MutationsRejectedException e) { - throw new RuntimeException(e); - } - } - } - - } - - private static LinkedBlockingQueue<Mutations> exportQueue = null; - - private SharedBatchWriter(String instanceName, String zookeepers, String user, String password, - String table) throws Exception { - - // TODO: fix this write to static and remove findbugs max rank override in pom.xml - exportQueue = new LinkedBlockingQueue<>(10000); - Thread queueProcessingTask = - new Thread(new ExportTask(instanceName, zookeepers, user, password, table)); - - queueProcessingTask.setDaemon(true); - queueProcessingTask.start(); - } - - private static Map<String, SharedBatchWriter> exporters = new HashMap<>(); - - static synchronized SharedBatchWriter getInstance(String instanceName, String zookeepers, - String user, String password, String table) throws Exception { - - String key = - instanceName + ":" + zookeepers + ":" + user + ":" + password.hashCode() + ":" + table; - - SharedBatchWriter ret = exporters.get(key); - - if (ret == null) { - ret = new SharedBatchWriter(instanceName, zookeepers, user, password, table); - exporters.put(key, ret); - } - - return ret; - } - - void write(Collection<Mutation> mutations) { - Mutations work = new Mutations(mutations); - exportQueue.add(work); - try { - work.cdl.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/TableInfo.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/TableInfo.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/TableInfo.java deleted file mode 100644 index 9da6773..0000000 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/TableInfo.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.accumulo.export; - -/** - * @since 1.0.0 - */ -public class TableInfo { - String instanceName; - String zookeepers; - String user; - String password; - String table; - - public TableInfo(String instanceName, String zookeepers, String user, String password, - String table) { - this.instanceName = instanceName; - this.zookeepers = zookeepers; - this.user = user; - this.password = password; - this.table = table; - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java b/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java new file mode 100644 index 0000000..e4be08a --- /dev/null +++ b/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.accumulo.export; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.data.Mutation; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.RowColumn; +import org.junit.Assert; +import org.junit.Test; + +public class AccumuloExportTest { + + public static Map<RowColumn, Bytes> genData(String key, Optional<String> val) { + if (!val.isPresent()) { + return Collections.emptyMap(); + } + Map<RowColumn, Bytes> rcMap = new HashMap<>(); + String data = val.get(); + for (int i = 0; i < data.length(); i++) { + char c = data.charAt(i); + rcMap.put(new RowColumn("r:" + key, new Column("cf:" + c)), Bytes.of("v:" + c)); + } + return rcMap; + } + + public static Collection<Mutation> genMutations(String key, long seq, Optional<String> oldVal, + Optional<String> newVal) { + return AccumuloExportQueue.generateMutations(seq, genData(key, oldVal), genData(key, newVal)); + } + + public static Mutation makePut(String key, String val, long seq) { + Mutation m = new Mutation("r:" + key); + addPut(m, key, val, seq); + return m; + } + + public static void addPut(Mutation m, String key, String val, long seq) { + m.put("cf:" + val, "", seq, "v:" + val); + } + + public static Mutation makeDel(String key, String val, long seq) { + Mutation m = new Mutation("r:" + key); + addDel(m, key, val, seq); + return m; + } + + public static void addDel(Mutation m, String key, String val, long seq) { + m.putDelete("cf:" + val, "", seq); + } + + @Test + public void testDifferenceExport() { + Collection<Mutation> mutations; + + mutations = genMutations("k1", 1, Optional.empty(), Optional.of("a")); + Assert.assertEquals(1, mutations.size()); + Assert.assertTrue(mutations.contains(makePut("k1", "a", 1))); + + mutations = genMutations("k2", 2, Optional.of("ab"), Optional.of("ab")); + Assert.assertEquals(0, mutations.size()); + + mutations = genMutations("k2", 2, Optional.of("b"), Optional.of("ab")); + Assert.assertEquals(1, mutations.size()); + Assert.assertTrue(mutations.contains(makePut("k2", "a", 2))); + + mutations = genMutations("k3", 3, Optional.of("c"), Optional.of("d")); + Assert.assertEquals(1, mutations.size()); + Mutation m = makeDel("k3", "c", 3); + addPut(m, "k3", "d", 3); + Assert.assertTrue(mutations.contains(m)); + + mutations = genMutations("k4", 4, Optional.of("e"), Optional.empty()); + Assert.assertEquals(1, mutations.size()); + Assert.assertTrue(mutations.contains(makeDel("k4", "e", 4))); + + mutations = genMutations("k5", 5, Optional.of("ef"), Optional.of("fg")); + Assert.assertEquals(1, mutations.size()); + m = makeDel("k5", "e", 5); + addPut(m, "k5", "g", 5); + Assert.assertTrue(mutations.contains(m)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/DifferenceExportTest.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/DifferenceExportTest.java b/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/DifferenceExportTest.java deleted file mode 100644 index a6e1188..0000000 --- a/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/DifferenceExportTest.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.accumulo.export; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - -import org.apache.accumulo.core.data.Mutation; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.RowColumn; -import org.junit.Assert; -import org.junit.Test; - -public class DifferenceExportTest { - - public class DiffExport extends DifferenceExport<String, String> { - - DiffExport(Optional<String> oldV, Optional<String> newV) { - super(oldV, newV); - } - - @Override - protected Map<RowColumn, Bytes> generateData(String key, Optional<String> val) { - if (!val.isPresent()) { - return Collections.emptyMap(); - } - Map<RowColumn, Bytes> rcMap = new HashMap<>(); - String data = val.get(); - for (int i = 0; i < data.length(); i++) { - char c = data.charAt(i); - rcMap.put(new RowColumn("r:" + key, new Column("cf:" + c)), Bytes.of("v:" + c)); - } - return rcMap; - } - } - - public static Mutation makePut(String key, String val, long seq) { - Mutation m = new Mutation("r:" + key); - addPut(m, key, val, seq); - return m; - } - - public static void addPut(Mutation m, String key, String val, long seq) { - m.put("cf:" + val, "", seq, "v:" + val); - } - - public static Mutation makeDel(String key, String val, long seq) { - Mutation m = new Mutation("r:" + key); - addDel(m, key, val, seq); - return m; - } - - public static void addDel(Mutation m, String key, String val, long seq) { - m.putDelete("cf:" + val, "", seq); - } - - @Test - public void testDifferenceExport() { - Collection<Mutation> mutations; - - mutations = new DiffExport(Optional.empty(), Optional.of("a")).toMutations("k1", 1); - Assert.assertEquals(1, mutations.size()); - Assert.assertTrue(mutations.contains(makePut("k1", "a", 1))); - - mutations = new DiffExport(Optional.of("ab"), Optional.of("ab")).toMutations("k2", 2); - Assert.assertEquals(0, mutations.size()); - - mutations = new DiffExport(Optional.of("b"), Optional.of("ab")).toMutations("k2", 2); - Assert.assertEquals(1, mutations.size()); - Assert.assertTrue(mutations.contains(makePut("k2", "a", 2))); - - mutations = new DiffExport(Optional.of("c"), Optional.of("d")).toMutations("k3", 3); - Assert.assertEquals(1, mutations.size()); - Mutation m = makeDel("k3", "c", 3); - addPut(m, "k3", "d", 3); - Assert.assertTrue(mutations.contains(m)); - - mutations = new DiffExport(Optional.of("e"), Optional.empty()).toMutations("k4", 4); - Assert.assertEquals(1, mutations.size()); - Assert.assertTrue(mutations.contains(makeDel("k4", "e", 4))); - - mutations = new DiffExport(Optional.of("ef"), Optional.of("fg")).toMutations("k5", 5); - Assert.assertEquals(1, mutations.size()); - m = makeDel("k5", "e", 5); - addPut(m, "k5", "g", 5); - Assert.assertTrue(mutations.contains(m)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java index 81d277b..fcc3a74 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java @@ -262,6 +262,10 @@ public class ExportQueue<K, V> { return bucketsPerTablet; } + public String getQueueId() { + return queueId; + } + void save(SimpleConfiguration appConfig) { appConfig.setProperty(PREFIX + queueId + ".buckets", numBuckets + ""); appConfig.setProperty(PREFIX + queueId + ".exporter", exporterType + ""); http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java ---------------------------------------------------------------------- diff --git a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java index 38986df..38985f1 100644 --- a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java +++ b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java @@ -29,10 +29,8 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.client.Transaction; -import org.apache.fluo.api.config.FluoConfiguration; import org.apache.fluo.api.mini.MiniFluo; -import org.apache.fluo.recipes.accumulo.export.AccumuloExporter; -import org.apache.fluo.recipes.accumulo.export.TableInfo; +import org.apache.fluo.recipes.accumulo.export.AccumuloExportQueue; import org.apache.fluo.recipes.core.export.ExportQueue; import org.apache.fluo.recipes.test.AccumuloExportITBase; import org.apache.hadoop.io.Text; @@ -41,30 +39,28 @@ import org.junit.Test; public class AccumuloExporterIT extends AccumuloExportITBase { - private String et; + private String exportTable; public static final String QUEUE_ID = "aeqt"; @Override public void preFluoInitHook() throws Exception { - FluoConfiguration fluoConfig = getFluoConfiguration(); - ExportQueue.configure(fluoConfig, - new ExportQueue.Options(QUEUE_ID, AccumuloExporter.class.getName(), String.class.getName(), - TestExport.class.getName(), 5).setBucketsPerTablet(1)); - // create and configure export table - et = "export" + tableCounter.getAndIncrement(); - getAccumuloConnector().tableOperations().create(et); + exportTable = "export" + tableCounter.getAndIncrement(); + getAccumuloConnector().tableOperations().create(exportTable); + MiniAccumuloCluster miniAccumulo = getMiniAccumuloCluster(); - AccumuloExporter.setExportTableInfo(fluoConfig, QUEUE_ID, - new TableInfo(miniAccumulo.getInstanceName(), miniAccumulo.getZooKeepers(), ACCUMULO_USER, - ACCUMULO_PASSWORD, et)); + + AccumuloExportQueue.configure(getFluoConfiguration(), new ExportQueue.Options(QUEUE_ID, + SimpleExporter.class.getName(), String.class.getName(), String.class.getName(), 5) + .setBucketsPerTablet(1), new AccumuloExportQueue.Options(miniAccumulo.getInstanceName(), + miniAccumulo.getZooKeepers(), ACCUMULO_USER, ACCUMULO_PASSWORD, exportTable)); } @Test public void testAccumuloExport() throws Exception { - ExportQueue<String, TestExport> teq = + ExportQueue<String, String> teq = ExportQueue.getInstance(QUEUE_ID, getFluoConfiguration().getAppConfiguration()); Assert.assertEquals(6, getFluoSplits().size()); @@ -122,9 +118,9 @@ public class AccumuloExporterIT extends AccumuloExportITBase { } } - private void export(ExportQueue<String, TestExport> teq, Transaction tx, + private void export(ExportQueue<String, String> teq, Transaction tx, Map<String, String> expected, String k, String v) { - teq.add(tx, k, new TestExport(v)); + teq.add(tx, k, v); expected.put(k, v); } @@ -134,7 +130,7 @@ public class AccumuloExporterIT extends AccumuloExportITBase { } private Map<String, String> getExports() throws Exception { - Scanner scanner = getAccumuloConnector().createScanner(et, Authorizations.EMPTY); + Scanner scanner = getAccumuloConnector().createScanner(exportTable, Authorizations.EMPTY); Map<String, String> ret = new HashMap<>(); for (Entry<Key, Value> entry : scanner) { http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java ---------------------------------------------------------------------- diff --git a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java index 432fa4d..e5b4e1a 100644 --- a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java +++ b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloReplicatorIT.java @@ -27,13 +27,11 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.client.Transaction; -import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.mini.MiniFluo; -import org.apache.fluo.recipes.accumulo.export.AccumuloExport; -import org.apache.fluo.recipes.accumulo.export.AccumuloExporter; -import org.apache.fluo.recipes.accumulo.export.ReplicationExport; -import org.apache.fluo.recipes.accumulo.export.TableInfo; +import org.apache.fluo.recipes.accumulo.export.AccumuloExportQueue; +import org.apache.fluo.recipes.accumulo.export.AccumuloReplicator; import org.apache.fluo.recipes.core.export.ExportQueue; +import org.apache.fluo.recipes.core.transaction.TxLog; import org.apache.fluo.recipes.test.AccumuloExportITBase; import org.apache.fluo.recipes.core.transaction.RecordingTransaction; import org.apache.fluo.recipes.core.types.StringEncoder; @@ -44,32 +42,31 @@ import org.junit.Test; public class AccumuloReplicatorIT extends AccumuloExportITBase { - private String et; - public static final String QUEUE_ID = "aeqt"; + private String exportTable; + public static final String QUEUE_ID = "repq"; private TypeLayer tl = new TypeLayer(new StringEncoder()); @Override public void preFluoInitHook() throws Exception { - ExportQueue - .configure( - getFluoConfiguration(), - new ExportQueue.Options(QUEUE_ID, AccumuloExporter.class.getName(), Bytes.class - .getName(), AccumuloExport.class.getName(), 5)); // create and configure export table - et = "export" + tableCounter.getAndIncrement(); - getAccumuloConnector().tableOperations().create(et); + exportTable = "export" + tableCounter.getAndIncrement(); + getAccumuloConnector().tableOperations().create(exportTable); MiniAccumuloCluster miniAccumulo = getMiniAccumuloCluster(); - AccumuloExporter.setExportTableInfo(getFluoConfiguration(), QUEUE_ID, new TableInfo( - miniAccumulo.getInstanceName(), miniAccumulo.getZooKeepers(), ACCUMULO_USER, - ACCUMULO_PASSWORD, et)); + + AccumuloExportQueue.configure( + getFluoConfiguration(), + new ExportQueue.Options(QUEUE_ID, AccumuloReplicator.class.getName(), String.class + .getName(), TxLog.class.getName(), 5), + new AccumuloExportQueue.Options(miniAccumulo.getInstanceName(), miniAccumulo + .getZooKeepers(), ACCUMULO_USER, ACCUMULO_PASSWORD, exportTable)); } @Test public void testAccumuloReplicator() throws Exception { - ExportQueue<Bytes, AccumuloExport<?>> eq = + ExportQueue<String, TxLog> eq = ExportQueue.getInstance(QUEUE_ID, getFluoConfiguration().getAppConfiguration()); MiniFluo miniFluo = getMiniFluo(); @@ -78,12 +75,12 @@ public class AccumuloReplicatorIT extends AccumuloExportITBase { Map<String, String> expected = new HashMap<>(); try (Transaction tx = fc.newTransaction()) { - RecordingTransaction rtx = RecordingTransaction.wrap(tx, ReplicationExport.getFilter()); + RecordingTransaction rtx = RecordingTransaction.wrap(tx, AccumuloReplicator.getFilter()); TypedTransaction ttx = tl.wrap(rtx); write(ttx, expected, "k1", "v1"); write(ttx, expected, "k2", "v2"); write(ttx, expected, "k3", "v3"); - eq.add(tx, Bytes.of("q1"), new ReplicationExport<>(rtx.getTxLog())); + eq.add(tx, "q1", rtx.getTxLog()); tx.commit(); } @@ -91,13 +88,13 @@ public class AccumuloReplicatorIT extends AccumuloExportITBase { Assert.assertEquals(expected, getExports()); try (Transaction tx = fc.newTransaction()) { - RecordingTransaction rtx = RecordingTransaction.wrap(tx, ReplicationExport.getFilter()); + RecordingTransaction rtx = RecordingTransaction.wrap(tx, AccumuloReplicator.getFilter()); TypedTransaction ttx = tl.wrap(rtx); write(ttx, expected, "k1", "v4"); delete(ttx, expected, "k3"); write(ttx, expected, "k2", "v5"); write(ttx, expected, "k4", "v6"); - eq.add(tx, Bytes.of("q1"), new ReplicationExport<>(rtx.getTxLog())); + eq.add(tx, "q1", rtx.getTxLog()); tx.commit(); } @@ -105,13 +102,13 @@ public class AccumuloReplicatorIT extends AccumuloExportITBase { Assert.assertEquals(expected, getExports()); try (Transaction tx = fc.newTransaction()) { - RecordingTransaction rtx = RecordingTransaction.wrap(tx, ReplicationExport.getFilter()); + RecordingTransaction rtx = RecordingTransaction.wrap(tx, AccumuloReplicator.getFilter()); TypedTransaction ttx = tl.wrap(rtx); write(ttx, expected, "k2", "v7"); write(ttx, expected, "k3", "v8"); delete(ttx, expected, "k1"); delete(ttx, expected, "k4"); - eq.add(tx, Bytes.of("q1"), new ReplicationExport<>(rtx.getTxLog())); + eq.add(tx, "q1", rtx.getTxLog()); tx.commit(); } @@ -131,7 +128,7 @@ public class AccumuloReplicatorIT extends AccumuloExportITBase { } private Map<String, String> getExports() throws Exception { - Scanner scanner = getAccumuloConnector().createScanner(et, Authorizations.EMPTY); + Scanner scanner = getAccumuloConnector().createScanner(exportTable, Authorizations.EMPTY); Map<String, String> ret = new HashMap<>(); for (Entry<Key, Value> entry : scanner) { http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java ---------------------------------------------------------------------- diff --git a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java new file mode 100644 index 0000000..5aa5812 --- /dev/null +++ b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.test.export; + +import java.util.Collection; +import java.util.Collections; + +import org.apache.accumulo.core.data.Mutation; +import org.apache.fluo.recipes.accumulo.export.AccumuloExporter; +import org.apache.fluo.recipes.core.export.SequencedExport; + +public class SimpleExporter extends AccumuloExporter<String, String> { + + @Override + protected Collection<Mutation> processExport(SequencedExport<String, String> export) { + Mutation m = new Mutation(export.getKey()); + m.put("cf", "cq", export.getSequence(), export.getValue()); + return Collections.singleton(m); + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/6f517736/modules/test/src/test/java/org/apache/fluo/recipes/test/export/TestExport.java ---------------------------------------------------------------------- diff --git a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/TestExport.java b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/TestExport.java deleted file mode 100644 index 9132e0b..0000000 --- a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/TestExport.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.test.export; - -import java.util.Collection; -import java.util.Collections; - -import org.apache.accumulo.core.data.Mutation; -import org.apache.fluo.recipes.accumulo.export.AccumuloExport; - -public class TestExport implements AccumuloExport<String> { - - private String value; - - public TestExport() {} - - public TestExport(String value) { - this.value = value; - } - - @Override - public Collection<Mutation> toMutations(String key, long seq) { - Mutation m = new Mutation(key); - m.put("cf", "cq", seq, value); - return Collections.singletonList(m); - } -}