Updated Branches: refs/heads/ACCUMULO-391 [created] 75cccccb3
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d41fdb19/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java index c9539c4..0f6655a 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java @@ -16,27 +16,40 @@ */ package org.apache.accumulo.core.client.mapreduce; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.TableQueryConfig; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.user.RegExFilter; import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.Pair; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -49,11 +62,12 @@ import org.apache.hadoop.util.ToolRunner; import org.junit.Test; public class AccumuloInputFormatTest { - + private static final String PREFIX = AccumuloInputFormatTest.class.getSimpleName(); private static final String INSTANCE_NAME = PREFIX + "_mapreduce_instance"; private static final String TEST_TABLE_1 = PREFIX + "_mapreduce_table_1"; - + private static final String TEST_TABLE_2 = PREFIX + "_mapreduce_table_2"; + /** * Check that the iterator configuration is getting stored in the Job conf correctly. * @@ -62,7 +76,7 @@ public class AccumuloInputFormatTest { @Test public void testSetIterator() throws IOException { Job job = new Job(); - + IteratorSetting is = new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"); AccumuloInputFormat.addIterator(job, is); Configuration conf = job.getConfiguration(); @@ -71,36 +85,37 @@ public class AccumuloInputFormatTest { String iterators = conf.get("AccumuloInputFormat.ScanOpts.Iterators"); assertEquals(new String(Base64.encodeBase64(baos.toByteArray())), iterators); } - + @Test public void testAddIterator() throws IOException { Job job = new Job(); - + + AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", WholeRowIterator.class)); AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator")); IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"); iter.addOption("v1", "1"); iter.addOption("junk", "\0omg:!\\xyzzy"); AccumuloInputFormat.addIterator(job, iter); - + List<IteratorSetting> list = AccumuloInputFormat.getIterators(job); - + // Check the list size assertTrue(list.size() == 3); - + // Walk the list and make sure our settings are correct IteratorSetting setting = list.get(0); assertEquals(1, setting.getPriority()); assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator", setting.getIteratorClass()); assertEquals("WholeRow", setting.getName()); assertEquals(0, setting.getOptions().size()); - + setting = list.get(1); assertEquals(2, setting.getPriority()); assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass()); assertEquals("Versions", setting.getName()); assertEquals(0, setting.getOptions().size()); - + setting = list.get(2); assertEquals(3, setting.getPriority()); assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); @@ -109,7 +124,7 @@ public class AccumuloInputFormatTest { assertEquals("1", setting.getOptions().get("v1")); assertEquals("\0omg:!\\xyzzy", setting.getOptions().get("junk")); } - + /** * Test adding iterator options where the keys and values contain both the FIELD_SEPARATOR character (':') and ITERATOR_SEPARATOR (',') characters. There * should be no exceptions thrown when trying to parse these types of option entries. @@ -124,12 +139,12 @@ public class AccumuloInputFormatTest { someSetting.addOption(key, value); Job job = new Job(); AccumuloInputFormat.addIterator(job, someSetting); - + List<IteratorSetting> list = AccumuloInputFormat.getIterators(job); assertEquals(1, list.size()); assertEquals(1, list.get(0).getOptions().size()); assertEquals(list.get(0).getOptions().get(key), value); - + someSetting.addOption(key + "2", value); someSetting.setPriority(2); someSetting.setName("it2"); @@ -142,7 +157,7 @@ public class AccumuloInputFormatTest { assertEquals(list.get(1).getOptions().get(key), value); assertEquals(list.get(1).getOptions().get(key + "2"), value); } - + /** * Test getting iterator settings for multiple iterators set * @@ -151,69 +166,71 @@ public class AccumuloInputFormatTest { @Test public void testGetIteratorSettings() throws IOException { Job job = new Job(); - + AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator")); AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator")); AccumuloInputFormat.addIterator(job, new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator")); - + List<IteratorSetting> list = AccumuloInputFormat.getIterators(job); - + // Check the list size assertTrue(list.size() == 3); - + // Walk the list and make sure our settings are correct IteratorSetting setting = list.get(0); assertEquals(1, setting.getPriority()); assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", setting.getIteratorClass()); assertEquals("WholeRow", setting.getName()); - + setting = list.get(1); assertEquals(2, setting.getPriority()); assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass()); assertEquals("Versions", setting.getName()); - + setting = list.get(2); assertEquals(3, setting.getPriority()); assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass()); assertEquals("Count", setting.getName()); - + } - + @Test public void testSetRegex() throws IOException { Job job = new Job(); - + String regex = ">\"*%<>\'\\"; - + IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class); RegExFilter.setRegexs(is, regex, null, null, null, false); AccumuloInputFormat.addIterator(job, is); - + assertTrue(regex.equals(AccumuloInputFormat.getIterators(job).get(0).getName())); } - + private static AssertionError e1 = null; private static AssertionError e2 = null; - + private static class MRTester extends Configured implements Tool { + private static class TestMapper extends Mapper<Key,Value,Key,Value> { Key key = null; int count = 0; - + @Override protected void map(Key k, Value v, Context context) throws IOException, InterruptedException { try { + String tableName = ((InputFormatBase.RangeInputSplit) context.getInputSplit()).getTableName(); if (key != null) assertEquals(key.getRow().toString(), new String(v.get())); - assertEquals(k.getRow(), new Text(String.format("%09x", count + 1))); - assertEquals(new String(v.get()), String.format("%09x", count)); + assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow()); + assertEquals(String.format("%s_%09x", tableName, count), new String(v.get())); } catch (AssertionError e) { e1 = e; } key = new Key(k); count++; } - + @Override protected void cleanup(Context context) throws IOException, InterruptedException { try { @@ -223,59 +240,198 @@ public class AccumuloInputFormatTest { } } } - + @Override public int run(String[] args) throws Exception { - - if (args.length != 3) { + + if (args.length != 4) { throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table>"); } - + String user = args[0]; String pass = args[1]; - String table = args[2]; - + String table1 = args[2]; + String table2 = args[3]; + Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); job.setJarByClass(this.getClass()); - + job.setInputFormatClass(AccumuloInputFormat.class); - + AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass)); - AccumuloInputFormat.setInputTableName(job, table); + +// AccumuloInputFormat.setInputTableNames(job, Arrays.asList(new String[]{table1, table2})); + TableQueryConfig tableConfig1 = new TableQueryConfig(table1); + TableQueryConfig tableConfig2 = new TableQueryConfig(table2); + + AccumuloInputFormat.setTableQueryConfigurations(job, tableConfig1, tableConfig2); AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME); - + job.setMapperClass(TestMapper.class); job.setMapOutputKeyClass(Key.class); job.setMapOutputValueClass(Value.class); job.setOutputFormatClass(NullOutputFormat.class); - + job.setNumReduceTasks(0); - + job.waitForCompletion(true); - + return job.isSuccessful() ? 0 : 1; } - + public static void main(String[] args) throws Exception { assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args)); } } - + + /** + * Generate incrementing counts and attach table name to the key/value so that order and multi-table data can be verified. + */ @Test public void testMap() throws Exception { MockInstance mockInstance = new MockInstance(INSTANCE_NAME); Connector c = mockInstance.getConnector("root", new PasswordToken("")); c.tableOperations().create(TEST_TABLE_1); + c.tableOperations().create(TEST_TABLE_2); BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig()); + BatchWriter bw2 = c.createBatchWriter(TEST_TABLE_2, new BatchWriterConfig()); for (int i = 0; i < 100; i++) { - Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); - m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); - bw.addMutation(m); + Mutation t1m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_1, i + 1))); + t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_1, i).getBytes())); + bw.addMutation(t1m); + Mutation t2m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_2, i + 1))); + t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_2, i).getBytes())); + bw2.addMutation(t2m); } bw.close(); - - MRTester.main(new String[] {"root", "", TEST_TABLE_1}); + bw2.close(); + + MRTester.main(new String[] {"root", "", TEST_TABLE_1, TEST_TABLE_2}); assertNull(e1); assertNull(e2); } + + /** + * Asserts that the configuration contains the expected ranges for the tables. + */ + @Test + public void testMultitableRangeSerialization() throws Throwable { + List<String> tables = Arrays.asList("t1", "t2", "t3"); + Job job = new Job(new Configuration()); + job.setInputFormatClass(AccumuloInputFormat.class); + job.setMapperClass(MRTester.TestMapper.class); + job.setNumReduceTasks(0); + AccumuloInputFormat.setConnectorInfo(job, "root", new PasswordToken(new byte[0])); + AccumuloInputFormat.setScanAuthorizations(job, new Authorizations()); + AccumuloInputFormat.setMockInstance(job, "testmapinstance"); + + HashMap<String,Collection<Range>> tblRanges = new HashMap<String,Collection<Range>>(); + for (String tbl : tables) { + List<Range> ranges = Arrays.asList(new Range("a", "b"), new Range("c", "d"), new Range("e", "f")); + tblRanges.put(tbl, ranges); + } + + Range defaultRange = new Range("0", "1"); + + try { + AccumuloInputFormat.setRanges(job, tblRanges); + fail("Exception should have been thrown."); + } catch(IllegalStateException e) {} + + AccumuloInputFormat.setInputTableNames(job, tables); + + // set a default range + AccumuloInputFormat.setRanges(job, Collections.singleton(defaultRange)); + AccumuloInputFormat.setRanges(job, tblRanges); + Map<String,List<Range>> configuredRanges = AccumuloInputFormat.getRanges(job); + + for (Map.Entry<String,List<Range>> cfgRange : configuredRanges.entrySet()) { + String tbl = cfgRange.getKey(); + HashSet<Range> originalRanges = new HashSet<Range>(tblRanges.remove(tbl)); + originalRanges.add(defaultRange); + HashSet<Range> retrievedRanges = new HashSet<Range>(cfgRange.getValue()); + assertEquals(originalRanges.size(), retrievedRanges.size()); + assertTrue(originalRanges.containsAll(retrievedRanges)); + assertTrue(retrievedRanges.containsAll(originalRanges)); + } + } + + /** + * Asserts that the configuration contains the expected iterators for the tables. + */ + @Test + public void testMultitableIteratorSerialization() throws Throwable { + HashSet<String> tables = new HashSet<String>(Arrays.asList("t1", "t2")); + Job job = new Job(new Configuration()); + job.setInputFormatClass(AccumuloInputFormat.class); + job.setMapperClass(MRTester.TestMapper.class); + job.setNumReduceTasks(0); + AccumuloInputFormat.setConnectorInfo(job, "root", new PasswordToken(new byte[0])); + AccumuloInputFormat.setScanAuthorizations(job, new Authorizations()); + + // create + set iterators on configuration and build expected reference set + IteratorSetting isetting1 = new IteratorSetting(1, "name1", "class1"); + IteratorSetting isetting2 = new IteratorSetting(2, "name2", "class2"); + IteratorSetting isetting3 = new IteratorSetting(2, "name3", "class3"); + + try { + AccumuloInputFormat.addIterator(job, "t1", isetting1); + fail("Exception should have been thrown."); + } catch(IllegalStateException e) {} + + AccumuloInputFormat.setInputTableNames(job, tables); + + AccumuloInputFormat.addIterator(job, "t1", isetting1); + AccumuloInputFormat.addIterator(job, "t2", isetting2); + AccumuloInputFormat.addIterator(job, isetting3); + + // verify per-table iterators + List<IteratorSetting> t1iters = AccumuloInputFormat.getIterators(job, "t1"); + List<IteratorSetting> t2iters = AccumuloInputFormat.getIterators(job, "t2"); + assertFalse(t1iters.isEmpty()); + assertEquals(isetting1, t1iters.get(1)); + assertEquals(isetting3, t1iters.get(0)); + assertEquals(isetting2, t2iters.get(1)); + assertEquals(isetting3, t2iters.get(0)); + } + + @Test + public void testMultitableColumnSerialization() throws IOException, AccumuloSecurityException { + HashSet<String> tables = new HashSet<String>(Arrays.asList("t1", "t2")); + Job job = new Job(new Configuration()); + job.setInputFormatClass(AccumuloInputFormat.class); + job.setMapperClass(MRTester.TestMapper.class); + job.setNumReduceTasks(0); + AccumuloInputFormat.setConnectorInfo(job, "root", new PasswordToken(new byte[0])); + + AccumuloInputFormat.setScanAuthorizations(job, new Authorizations()); + + Map<String,Collection<Pair<Text,Text>>> columns = new HashMap<String,Collection<Pair<Text,Text>>>(); + HashSet<Pair<Text,Text>> t1cols = new HashSet<Pair<Text,Text>>(); + t1cols.add(new Pair(new Text("a"), new Text("b"))); + HashSet<Pair<Text,Text>> t2cols = new HashSet<Pair<Text,Text>>(); + t2cols.add(new Pair(new Text("b"), new Text("c"))); + columns.put("t1", t1cols); + columns.put("t2", t2cols); + + Pair<Text,Text> defaultColumn = new Pair(new Text("c"), new Text("d")); + + try { + AccumuloInputFormat.fetchColumns(job, columns); + fail("Exception should have been thrown."); + } catch(IllegalStateException e) {} + + AccumuloInputFormat.setInputTableNames(job, tables); + + AccumuloInputFormat.fetchColumns(job, Collections.singleton(defaultColumn)); + AccumuloInputFormat.fetchColumns(job, columns); + + columns.get("t1").add(defaultColumn); + columns.get("t2").add(defaultColumn); + + Collection<Pair<Text,Text>> t1actual = AccumuloInputFormat.getFetchedColumns(job, "t1"); + assertEquals(columns.get("t1"), t1actual); + Collection<Pair<Text,Text>> t2actual = AccumuloInputFormat.getFetchedColumns(job, "t2"); + assertEquals(columns.get("t2"), t2actual); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/d41fdb19/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java b/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java new file mode 100644 index 0000000..1d5d187 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java @@ -0,0 +1,93 @@ +package org.apache.accumulo.core.conf; + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.util.Pair; +import org.apache.hadoop.io.Text; +import org.junit.Before; +import org.junit.Test; + +public class TableQueryConfigTest { + + private static final String TEST_TABLE = "TEST_TABLE"; + private TableQueryConfig tableQueryConfig; + + @Before + public void setUp() { + tableQueryConfig = new TableQueryConfig(TEST_TABLE); + } + + @Test + public void testSerialization_OnlyTable() throws IOException { + byte[] serialized = serialize(tableQueryConfig); + ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + TableQueryConfig actualConfig = new TableQueryConfig(new DataInputStream(bais)); + bais.close(); + + assertEquals(tableQueryConfig, actualConfig); + } + + @Test + public void testSerialization_ranges() throws IOException { + List<Range> ranges = new ArrayList<Range>(); + ranges.add(new Range("a", "b")); + ranges.add(new Range("c", "d")); + tableQueryConfig.setRanges(ranges); + + byte[] serialized = serialize(tableQueryConfig); + ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + TableQueryConfig actualConfig = new TableQueryConfig(new DataInputStream(bais)); + bais.close(); + + assertEquals(ranges, actualConfig.getRanges()); + } + + @Test + public void testSerialization_columns() throws IOException { + Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>(); + columns.add(new Pair<Text,Text>(new Text("cf1"), new Text("cq1"))); + columns.add(new Pair<Text,Text>(new Text("cf2"), null)); + tableQueryConfig.setColumns(columns); + + byte[] serialized = serialize(tableQueryConfig); + ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + TableQueryConfig actualConfig = new TableQueryConfig(new DataInputStream(bais)); + bais.close(); + + assertEquals(actualConfig.getColumns(), columns); + } + + @Test + public void testSerialization_iterators() throws IOException { + List<IteratorSetting> settings = new ArrayList<IteratorSetting>(); + settings.add(new IteratorSetting(50, "iter", "iterclass")); + settings.add(new IteratorSetting(55, "iter2", "iterclass2")); + tableQueryConfig.setIterators(settings); + byte[] serialized = serialize(tableQueryConfig); + ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + TableQueryConfig actualConfig = new TableQueryConfig(new DataInputStream(bais)); + bais.close(); + + assertEquals(actualConfig.getIterators(), settings); + } + + private byte[] serialize(TableQueryConfig tableQueryConfig) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + tableQueryConfig.write(new DataOutputStream(baos)); + baos.close(); + + return baos.toByteArray(); + } +}
