ACCUMULO-3208 Integration test for the OrIterator and cleanup The OrIterator was in very bad shape, with next-to-no documentation about what it actually does.
Closes apache/accumulo#247 Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5ac15742 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5ac15742 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5ac15742 Branch: refs/heads/master Commit: 5ac1574243f4445399dbc76da9392fb393f7f69e Parents: 8c0f03a Author: Josh Elser <els...@apache.org> Authored: Sun Apr 9 22:45:56 2017 -0400 Committer: Josh Elser <els...@apache.org> Committed: Thu Apr 13 12:42:38 2017 -0400 ---------------------------------------------------------------------- .../accumulo/core/iterators/OrIterator.java | 263 ++++++++----- .../org/apache/accumulo/test/OrIteratorIT.java | 389 +++++++++++++++++++ 2 files changed, 551 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/5ac15742/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java index 43ed5ed..c75bd54 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java @@ -18,9 +18,12 @@ package org.apache.accumulo.core.iterators; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.PriorityQueue; @@ -30,36 +33,78 @@ import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * An iterator that handles "OR" query constructs on the server side. This code has been adapted/merged from Heap and Multi Iterators. + * An iterator that provides a sorted-iteration of column qualifiers for a set of column families in a row. It is important to note that this iterator + * <em>does not</em> adhere to the contract set forth by the {@link SortedKeyValueIterator}. It returns Keys in {@code row+colqual} order instead of + * {@code row+colfam+colqual} order. This is required for the implementation of this iterator (to work in conjunction with the {@code IntersectingIterator}) but + * is a code-smell. This iterator should only be used at query time, never at compaction time. + * + * The table structure should have the following form: + * + * <pre> + * row term:docId => value + * </pre> + * + * Users configuring this iterator must set the option {@link #COLUMNS_KEY}. This value is a comma-separated list of column families that should be "OR"'ed + * together. + * + * For example, given the following data and a value of {@code or.iterator.columns="steve,bob"} in the iterator options map: + * + * <pre> + * row1 bob:4 + * row1 george:2 + * row1 steve:3 + * row2 bob:9 + * row2 frank:8 + * row2 steve:12 + * row3 michael:15 + * row3 steve:20 + * </pre> + * + * Would return: + * + * <pre> + * row1 steve:3 + * row1 bob:4 + * row2 bob:9 + * row2 steve:12 + * row3 steve:20 + * </pre> */ -public class OrIterator implements SortedKeyValueIterator<Key,Value> { +public class OrIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber { + private static final Logger LOG = LoggerFactory.getLogger(OrIterator.class); + public static final String COLUMNS_KEY = "or.iterator.columns"; private TermSource currentTerm; - private ArrayList<TermSource> sources; + private List<TermSource> sources; private PriorityQueue<TermSource> sorted = new PriorityQueue<>(5); - private static final Text nullText = new Text(); - private static final Key nullKey = new Key(); protected static class TermSource implements Comparable<TermSource> { - public SortedKeyValueIterator<Key,Value> iter; - public Text term; - public Collection<ByteSequence> seekColfams; + private final SortedKeyValueIterator<Key,Value> iter; + private final Text term; + private final Collection<ByteSequence> seekColfams; + private Range currentRange; public TermSource(TermSource other) { - this.iter = other.iter; - this.term = other.term; - this.seekColfams = other.seekColfams; + this.iter = Objects.requireNonNull(other.iter); + this.term = Objects.requireNonNull(other.term); + this.seekColfams = Objects.requireNonNull(other.seekColfams); + this.currentRange = Objects.requireNonNull(other.currentRange); } public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) { - this.iter = iter; - this.term = term; + this.iter = Objects.requireNonNull(iter); + this.term = Objects.requireNonNull(term); // The desired column families for this source is the term itself this.seekColfams = Collections.<ByteSequence> singletonList(new ArrayByteSequence(term.getBytes(), 0, term.getLength())); + // No current range until we're seek()'ed for the first time + this.currentRange = null; } @Override @@ -69,7 +114,7 @@ public class OrIterator implements SortedKeyValueIterator<Key,Value> { @Override public boolean equals(Object obj) { - return obj == this || (obj != null && obj instanceof TermSource && 0 == compareTo((TermSource) obj)); + return obj == this || (obj instanceof TermSource && 0 == compareTo((TermSource) obj)); } @Override @@ -80,17 +125,54 @@ public class OrIterator implements SortedKeyValueIterator<Key,Value> { // sorted after they have been determined to be valid. return this.iter.getTopKey().compareColumnQualifier(o.iter.getTopKey().getColumnQualifier()); } + + /** + * Converts the given {@code Range} into the correct {@code Range} for this TermSource (per this expected table structure) and then seeks this TermSource's + * SKVI. + */ + public void seek(Range originalRange) throws IOException { + // the infinite start key is equivalent to a null startKey on the Range. + if (!originalRange.isInfiniteStartKey()) { + Key originalStartKey = originalRange.getStartKey(); + // Pivot the provided range into the range for this term + Key newKey = new Key(originalStartKey.getRow(), term, originalStartKey.getColumnQualifier(), originalStartKey.getTimestamp()); + // Construct the new range, preserving the other attributes on the provided range. + currentRange = new Range(newKey, originalRange.isStartKeyInclusive(), originalRange.getEndKey(), originalRange.isEndKeyInclusive()); + } else { + currentRange = originalRange; + } + LOG.trace("Seeking {} to {}", this, currentRange); + iter.seek(currentRange, seekColfams, true); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("TermSource{term=").append(term).append(", currentRange=").append(currentRange).append("}"); + return sb.toString(); + } + + /** + * @return True if there is a valid topKey which falls into the range this TermSource's iterator was last seeked to, false otherwise. + */ + boolean hasEntryForTerm() { + if (!iter.hasTop()) { + return false; + } + return currentRange.contains(iter.getTopKey()); + } } public OrIterator() { - this.sources = new ArrayList<>(); + this.sources = Collections.emptyList(); } private OrIterator(OrIterator other, IteratorEnvironment env) { - this.sources = new ArrayList<>(); + ArrayList<TermSource> copiedSources = new ArrayList<>(); for (TermSource TS : other.sources) - this.sources.add(new TermSource(TS.iter.deepCopy(env), TS.term)); + copiedSources.add(new TermSource(TS.iter.deepCopy(env), new Text(TS.term))); + this.sources = Collections.unmodifiableList(copiedSources); } @Override @@ -98,41 +180,48 @@ public class OrIterator implements SortedKeyValueIterator<Key,Value> { return new OrIterator(this, env); } - public void addTerm(SortedKeyValueIterator<Key,Value> source, Text term, IteratorEnvironment env) { - this.sources.add(new TermSource(source.deepCopy(env), term)); + public void setTerms(SortedKeyValueIterator<Key,Value> source, Collection<String> terms, IteratorEnvironment env) { + ArrayList<TermSource> newTerms = new ArrayList<>(); + for (String term : terms) { + newTerms.add(new TermSource(source.deepCopy(env), new Text(term))); + } + this.sources = Collections.unmodifiableList(newTerms); } @Override final public void next() throws IOException { - + LOG.trace("next()"); if (currentTerm == null) return; // Advance currentTerm currentTerm.iter.next(); - // See if currentTerm is still valid, remove if not - if (!(currentTerm.iter.hasTop()) || ((currentTerm.term != null) && (currentTerm.term.compareTo(currentTerm.iter.getTopKey().getColumnFamily()) != 0))) - currentTerm = null; + // Avoid computing this multiple times + final boolean currentTermHasMoreEntries = currentTerm.hasEntryForTerm(); // optimization. // if size == 0, currentTerm is the only item left, // OR there are no items left. // In either case, we don't need to use the PriorityQueue - if (sorted.size() > 0) { - // sort the term back in - if (currentTerm != null) + if (!sorted.isEmpty()) { + // Add the currentTerm back to the heap to let it sort it with the rest + if (currentTermHasMoreEntries) { sorted.add(currentTerm); - // and get the current top item out. + } + // Let the heap return the next value to inspect currentTerm = sorted.poll(); - } + } else if (!currentTermHasMoreEntries) { + // This currentTerm source was our last TermSource and it ran out of results + currentTerm = null; + } // else, currentTerm is the last TermSource and it has more results } @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { - + LOG.trace("seek() range={}", range); // If sources.size is 0, there is nothing to process, so just return. - if (sources.size() == 0) { + if (sources.isEmpty()) { currentTerm = null; return; } @@ -141,32 +230,13 @@ public class OrIterator implements SortedKeyValueIterator<Key,Value> { // Yes, this is lots of duplicate code, but the speed works... // and we don't have a priority queue of size 0 or 1. if (sources.size() == 1) { + currentTerm = sources.get(0); + currentTerm.seek(range); - if (currentTerm == null) - currentTerm = sources.get(0); - Range newRange = null; - - if (range != null) { - if ((range.getStartKey() == null) || (range.getStartKey().getRow() == null)) - newRange = range; - else { - Key newKey = null; - if (range.getStartKey().getColumnQualifier() == null) - newKey = new Key(range.getStartKey().getRow(), (currentTerm.term == null) ? nullText : currentTerm.term); - else - newKey = new Key(range.getStartKey().getRow(), (currentTerm.term == null) ? nullText : currentTerm.term, range.getStartKey().getColumnQualifier()); - newRange = new Range((newKey == null) ? nullKey : newKey, true, range.getEndKey(), false); - } - } - currentTerm.iter.seek(newRange, currentTerm.seekColfams, true); - - // If there is no top key - // OR we are: - // 1) NOT an iterator - // 2) we have seeked into the next term (ie: seek man, get man001) - // then ignore it as a valid source - if (!(currentTerm.iter.hasTop()) || ((currentTerm.term != null) && (currentTerm.term.compareTo(currentTerm.iter.getTopKey().getColumnFamily()) != 0))) + if (!currentTerm.hasEntryForTerm()) { + // Signifies that there are no possible results for this range. currentTerm = null; + } // Otherwise, source is valid. return; @@ -175,78 +245,69 @@ public class OrIterator implements SortedKeyValueIterator<Key,Value> { // Clear the PriorityQueue so that we can re-populate it. sorted.clear(); - // This check is put in here to guard against the "initial seek" - // crashing us because the topkey term does not match. - // Note: It is safe to do the "sources.size() == 1" above - // because an Or must have at least two elements. - if (currentTerm == null) { - for (TermSource TS : sources) { - TS.iter.seek(range, TS.seekColfams, true); - - if ((TS.iter.hasTop()) && ((TS.term != null) && (TS.term.compareTo(TS.iter.getTopKey().getColumnFamily()) == 0))) - sorted.add(TS); - } - currentTerm = sorted.poll(); - return; - } - - TermSource TS = null; Iterator<TermSource> iter = sources.iterator(); // For each term, seek forward. // if a hit is not found, delete it from future searches. while (iter.hasNext()) { - TS = iter.next(); - Range newRange = null; - - if (range != null) { - if ((range.getStartKey() == null) || (range.getStartKey().getRow() == null)) - newRange = range; - else { - Key newKey = null; - if (range.getStartKey().getColumnQualifier() == null) - newKey = new Key(range.getStartKey().getRow(), (TS.term == null) ? nullText : TS.term); - else - newKey = new Key(range.getStartKey().getRow(), (TS.term == null) ? nullText : TS.term, range.getStartKey().getColumnQualifier()); - newRange = new Range((newKey == null) ? nullKey : newKey, true, range.getEndKey(), false); - } + TermSource ts = iter.next(); + // Pivot the provided range into the correct range for this TermSource and seek the TS. + ts.seek(range); + + if (ts.hasEntryForTerm()) { + LOG.trace("Retaining TermSource for {}", ts); + // Otherwise, source is valid. Add it to the sources. + sorted.add(ts); + } else { + LOG.trace("Not adding TermSource to heap for {}", ts); } - - // Seek only to the term for this source as a column family - TS.iter.seek(newRange, TS.seekColfams, true); - - // If there is no top key - // OR we are: - // 1) NOT an iterator - // 2) we have seeked into the next term (ie: seek man, get man001) - // then ignore it as a valid source - if (!(TS.iter.hasTop()) || ((TS.term != null) && (TS.term.compareTo(TS.iter.getTopKey().getColumnFamily()) != 0))) - iter.remove(); - - // Otherwise, source is valid. Add it to the sources. - sorted.add(TS); } // And set currentTerm = the next valid key/term. + // If the heap is empty, it returns null which signals iteration to cease currentTerm = sorted.poll(); } @Override final public Key getTopKey() { - return currentTerm.iter.getTopKey(); + final Key k = currentTerm.iter.getTopKey(); + LOG.trace("getTopKey() = {}", k); + return k; } @Override final public Value getTopValue() { - return currentTerm.iter.getTopValue(); + final Value v = currentTerm.iter.getTopValue(); + LOG.trace("getTopValue() = {}", v); + return v; } @Override final public boolean hasTop() { + LOG.trace("hasTop()"); return currentTerm != null; } @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { - throw new UnsupportedOperationException(); + LOG.trace("init()"); + String columnsValue = options.get(COLUMNS_KEY); + if (null == columnsValue) { + throw new IllegalArgumentException(COLUMNS_KEY + " was not provided in the iterator configuration"); + } + String[] columns = StringUtils.split(columnsValue, ','); + setTerms(source, Arrays.asList(columns), env); + LOG.trace("Set sources: {}", this.sources); + } + + @Override + public IteratorOptions describeOptions() { + Map<String,String> options = new HashMap<>(); + options.put(COLUMNS_KEY, "A comma-separated list of families"); + return new IteratorOptions("OrIterator", "Produces a sorted stream of qualifiers based on families", options, Collections.<String> emptyList()); + } + + @Override + public boolean validateOptions(Map<String,String> options) { + return null != options.get(COLUMNS_KEY); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5ac15742/test/src/test/java/org/apache/accumulo/test/OrIteratorIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/OrIteratorIT.java b/test/src/test/java/org/apache/accumulo/test/OrIteratorIT.java new file mode 100644 index 0000000..d0fb12c --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/OrIteratorIT.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.OrIterator; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterIT; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class OrIteratorIT extends AccumuloClusterIT { + private static final String EMPTY = ""; + + @Override + public int defaultTimeoutSeconds() { + return 60; + } + + @Test + public void testMultipleRowsInTablet() throws Exception { + final Connector conn = getConnector(); + final String tableName = getUniqueNames(1)[0]; + conn.tableOperations().create(tableName); + + BatchWriter bw = null; + try { + bw = conn.createBatchWriter(tableName, new BatchWriterConfig()); + Mutation m = new Mutation("row1"); + m.put("bob", "2", EMPTY); + m.put("frank", "3", EMPTY); + m.put("steve", "1", EMPTY); + bw.addMutation(m); + + m = new Mutation("row2"); + m.put("bob", "7", EMPTY); + m.put("eddie", "4", EMPTY); + m.put("mort", "6", EMPTY); + m.put("zed", "5", EMPTY); + bw.addMutation(m); + } finally { + if (null != bw) { + bw.close(); + } + } + + IteratorSetting is = new IteratorSetting(50, OrIterator.class); + is.addOption(OrIterator.COLUMNS_KEY, "mort,frank"); + Map<String,String> expectedData = new HashMap<>(); + expectedData.put("frank", "3"); + expectedData.put("mort", "6"); + + BatchScanner bs = null; + try { + bs = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1); + Set<Range> ranges = new HashSet<>(Arrays.asList(Range.exact("row1"), Range.exact("row2"))); + bs.setRanges(ranges); + bs.addScanIterator(is); + for (Entry<Key,Value> entry : bs) { + String term = entry.getKey().getColumnFamily().toString(); + String expectedDocId = expectedData.remove(term); + assertNotNull("Found unexpected term: " + term, expectedDocId); + assertEquals(expectedDocId, entry.getKey().getColumnQualifier().toString()); + } + assertTrue("Expected no leftover entries but saw " + expectedData, expectedData.isEmpty()); + } finally { + if (null != bs) { + bs.close(); + } + } + } + + @Test + public void testMultipleTablets() throws Exception { + final Connector conn = getConnector(); + final String tableName = getUniqueNames(1)[0]; + conn.tableOperations().create(tableName); + + BatchWriter bw = null; + try { + bw = conn.createBatchWriter(tableName, new BatchWriterConfig()); + Mutation m = new Mutation("row1"); + m.put("bob", "2", EMPTY); + m.put("frank", "3", EMPTY); + m.put("steve", "1", EMPTY); + bw.addMutation(m); + + m = new Mutation("row2"); + m.put("bob", "7", EMPTY); + m.put("eddie", "4", EMPTY); + m.put("mort", "6", EMPTY); + m.put("zed", "5", EMPTY); + bw.addMutation(m); + + m = new Mutation("row3"); + m.put("carl", "9", EMPTY); + m.put("george", "8", EMPTY); + m.put("nick", "3", EMPTY); + m.put("zed", "1", EMPTY); + bw.addMutation(m); + } finally { + if (null != bw) { + bw.close(); + } + } + + conn.tableOperations().addSplits(tableName, new TreeSet<>(Arrays.asList(new Text("row2"), new Text("row3")))); + + IteratorSetting is = new IteratorSetting(50, OrIterator.class); + is.addOption(OrIterator.COLUMNS_KEY, "mort,frank,nick"); + Map<String,String> expectedData = new HashMap<>(); + expectedData.put("frank", "3"); + expectedData.put("mort", "6"); + expectedData.put("nick", "3"); + + BatchScanner bs = null; + try { + bs = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1); + bs.setRanges(Collections.singleton(new Range())); + bs.addScanIterator(is); + for (Entry<Key,Value> entry : bs) { + String term = entry.getKey().getColumnFamily().toString(); + String expectedDocId = expectedData.remove(term); + assertNotNull("Found unexpected term: " + term, expectedDocId); + assertEquals(expectedDocId, entry.getKey().getColumnQualifier().toString()); + } + assertTrue("Expected no leftover entries but saw " + expectedData, expectedData.isEmpty()); + } finally { + if (null != bs) { + bs.close(); + } + } + } + + @Test + public void testSingleLargeRow() throws Exception { + final Connector conn = getConnector(); + final String tableName = getUniqueNames(1)[0]; + conn.tableOperations().create(tableName); + conn.tableOperations().setProperty(tableName, Property.TABLE_SCAN_MAXMEM.getKey(), "1"); + + BatchWriter bw = null; + try { + bw = conn.createBatchWriter(tableName, new BatchWriterConfig()); + Mutation m = new Mutation("row1"); + m.put("bob", "02", EMPTY); + m.put("carl", "07", EMPTY); + m.put("eddie", "04", EMPTY); + m.put("frank", "03", EMPTY); + m.put("greg", "15", EMPTY); + m.put("mort", "06", EMPTY); + m.put("nick", "12", EMPTY); + m.put("richard", "18", EMPTY); + m.put("steve", "01", EMPTY); + m.put("ted", "11", EMPTY); + m.put("zed", "05", EMPTY); + bw.addMutation(m); + } finally { + if (null != bw) { + bw.close(); + } + } + + IteratorSetting is = new IteratorSetting(50, OrIterator.class); + is.addOption(OrIterator.COLUMNS_KEY, "richard,carl,frank,nick,eddie,zed"); + Map<String,String> expectedData = new HashMap<>(); + expectedData.put("frank", "03"); + expectedData.put("eddie", "04"); + expectedData.put("zed", "05"); + expectedData.put("carl", "07"); + expectedData.put("nick", "12"); + expectedData.put("richard", "18"); + + BatchScanner bs = null; + try { + bs = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1); + bs.setRanges(Collections.singleton(new Range())); + bs.addScanIterator(is); + for (Entry<Key,Value> entry : bs) { + String term = entry.getKey().getColumnFamily().toString(); + String expectedDocId = expectedData.remove(term); + assertNotNull("Found unexpected term: " + term + " or the docId was unexpectedly null", expectedDocId); + assertEquals(expectedDocId, entry.getKey().getColumnQualifier().toString()); + } + assertTrue("Expected no leftover entries but saw " + expectedData, expectedData.isEmpty()); + } finally { + if (null != bs) { + bs.close(); + } + } + } + + @Test + public void testNoMatchesForTable() throws Exception { + final Connector conn = getConnector(); + final String tableName = getUniqueNames(1)[0]; + conn.tableOperations().create(tableName); + + BatchWriter bw = null; + try { + bw = conn.createBatchWriter(tableName, new BatchWriterConfig()); + Mutation m = new Mutation("row1"); + m.put("bob", "02", EMPTY); + m.put("carl", "07", EMPTY); + m.put("eddie", "04", EMPTY); + m.put("frank", "03", EMPTY); + m.put("greg", "15", EMPTY); + m.put("mort", "06", EMPTY); + m.put("nick", "12", EMPTY); + m.put("richard", "18", EMPTY); + m.put("steve", "01", EMPTY); + m.put("ted", "11", EMPTY); + m.put("zed", "05", EMPTY); + bw.addMutation(m); + } finally { + if (null != bw) { + bw.close(); + } + } + + IteratorSetting is = new IteratorSetting(50, OrIterator.class); + is.addOption(OrIterator.COLUMNS_KEY, "theresa,sally"); + Map<String,String> expectedData = Collections.emptyMap(); + + BatchScanner bs = null; + try { + bs = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1); + bs.setRanges(Collections.singleton(new Range())); + bs.addScanIterator(is); + for (Entry<Key,Value> entry : bs) { + String term = entry.getKey().getColumnFamily().toString(); + String expectedDocId = expectedData.remove(term); + assertNotNull("Found unexpected term: " + term + " or the docId was unexpectedly null", expectedDocId); + assertEquals(expectedDocId, entry.getKey().getColumnQualifier().toString()); + } + assertTrue("Expected no leftover entries but saw " + expectedData, expectedData.isEmpty()); + } finally { + if (null != bs) { + bs.close(); + } + } + } + + @Test + public void testNoMatchesInSingleTablet() throws Exception { + final Connector conn = getConnector(); + final String tableName = getUniqueNames(1)[0]; + conn.tableOperations().create(tableName); + + BatchWriter bw = null; + try { + bw = conn.createBatchWriter(tableName, new BatchWriterConfig()); + Mutation m = new Mutation("row1"); + m.put("bob", "02", EMPTY); + m.put("carl", "07", EMPTY); + m.put("eddie", "04", EMPTY); + bw.addMutation(m); + + m = new Mutation("row2"); + m.put("frank", "03", EMPTY); + m.put("greg", "15", EMPTY); + m.put("mort", "06", EMPTY); + m.put("nick", "12", EMPTY); + bw.addMutation(m); + + m = new Mutation("row3"); + m.put("richard", "18", EMPTY); + m.put("steve", "01", EMPTY); + m.put("ted", "11", EMPTY); + m.put("zed", "05", EMPTY); + bw.addMutation(m); + } finally { + if (null != bw) { + bw.close(); + } + } + + IteratorSetting is = new IteratorSetting(50, OrIterator.class); + is.addOption(OrIterator.COLUMNS_KEY, "bob,eddie,steve,zed"); + Map<String,String> expectedData = new HashMap<>(); + expectedData.put("bob", "02"); + expectedData.put("eddie", "04"); + expectedData.put("zed", "05"); + expectedData.put("steve", "01"); + + // Split each row into its own tablet + conn.tableOperations().addSplits(tableName, new TreeSet<>(Arrays.asList(new Text("row2"), new Text("row3")))); + + BatchScanner bs = null; + try { + bs = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1); + bs.setRanges(Collections.singleton(new Range())); + bs.addScanIterator(is); + for (Entry<Key,Value> entry : bs) { + String term = entry.getKey().getColumnFamily().toString(); + String expectedDocId = expectedData.remove(term); + assertNotNull("Found unexpected term: " + term + " or the docId was unexpectedly null", expectedDocId); + assertEquals(expectedDocId, entry.getKey().getColumnQualifier().toString()); + } + assertTrue("Expected no leftover entries but saw " + expectedData, expectedData.isEmpty()); + } finally { + if (null != bs) { + bs.close(); + } + } + } + + @Test + public void testResultOrder() throws Exception { + final Connector conn = getConnector(); + final String tableName = getUniqueNames(1)[0]; + conn.tableOperations().create(tableName); + + BatchWriter bw = null; + try { + bw = conn.createBatchWriter(tableName, new BatchWriterConfig()); + Mutation m = new Mutation("row1"); + m.put("bob", "2", EMPTY); + m.put("frank", "3", EMPTY); + m.put("steve", "1", EMPTY); + bw.addMutation(m); + } finally { + if (null != bw) { + bw.close(); + } + } + + IteratorSetting is = new IteratorSetting(50, OrIterator.class); + is.addOption(OrIterator.COLUMNS_KEY, "bob,steve"); + + Scanner s = null; + try { + s = conn.createScanner(tableName, Authorizations.EMPTY); + s.addScanIterator(is); + Iterator<Entry<Key,Value>> iter = s.iterator(); + assertTrue(iter.hasNext()); + Key k = iter.next().getKey(); + assertEquals("Actual key was " + k, 0, k.compareTo(new Key("row1", "steve", "1"), PartialKey.ROW_COLFAM_COLQUAL)); + assertTrue(iter.hasNext()); + k = iter.next().getKey(); + assertEquals("Actual key was " + k, 0, k.compareTo(new Key("row1", "bob", "2"), PartialKey.ROW_COLFAM_COLQUAL)); + assertFalse(iter.hasNext()); + } finally { + if (null != s) { + s.close(); + } + } + } +}