Author: kturner Date: Thu Jan 17 16:28:48 2013 New Revision: 1434762 URL: http://svn.apache.org/viewvc?rev=1434762&view=rev Log: ACCUMULO-956 checkin of patch from Brain Loss
Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java?rev=1434762&view=auto ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java (added) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java Thu Jan 17 16:28:48 2013 @@ -0,0 +1,599 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.iterators.user; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Map; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.OptionDescriber; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.security.VisibilityEvaluator; +import org.apache.accumulo.core.security.VisibilityParseException; +import org.apache.accumulo.core.util.BadArgumentException; +import org.apache.accumulo.core.util.Pair; +import org.apache.commons.collections.map.LRUMap; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +/** + * The KeyTransformingIterator allows portions of a key (except for the row) + * to be transformed. This iterator handles the details that come with modifying + * keys (i.e., that the sort order could change). In order to do so, however, + * the iterator must put all keys sharing the same prefix in memory. Prefix + * is defined as the parts of the key that are not modified by this iterator. + * That is, if the iterator modifies column qualifier and timestamp, then the + * prefix is row and column family. In that case, the iterator must load all + * column qualifiers for each row/column family pair into memory. Given this + * constraint, care must be taken by users of this iterator to ensure it is + * not run in such a way that will overrun memory in a tablet server. + * <p> + * If the implementing iterator is transforming column families, then it + * must also override {@link #untransformColumnFamilies(Collection)} to handle + * the case when column families are fetched at scan time. The fetched column + * families will/must be in the transformed space, and the untransformed column + * families need to be passed to this iterator's source. If it is not possible + * to write a reverse transformation (e.g., the column family transformation + * depends on the row value or something like that), then the iterator must + * not fetch specific column families (or only fetch column families that are + * known to not transform at all). + * <p> + * If the implementing iterator is transforming column visibilities, then + * users must be careful NOT to fetch column qualifiers from the scanner. + * The reason for this is due to ACCUMULO-??? (insert issue number). + * <p> + * If the implementing iterator is transforming column visibilities, then the + * user should be sure to supply authorizations via the {@link #AUTH_OPT} + * iterator option (note that this is only necessary for scan scope iterators). + * The supplied authorizations should be in the transformed space, but the + * authorizations supplied to the scanner should be in the untransformed + * space. That is, if the iterator transforms A to 1, B to 2, C to 3, etc, + * then the auths supplied when the scanner is constructed should be A,B,C,... + * and the auths supplied to the iterator should be 1,2,3,... The reason + * for this is that the scanner performs security filtering before this + * iterator is called, so the authorizations need to be in the original + * untransformed space. Since the iterator can transform visibilities, it is + * possible that it could produce visibilities that the user cannot see, + * so the transformed keys must be tested to ensure the user is allowed to view + * them. Note that this test is not necessary when the iterator is not used + * in the scan scope since no security filtering is performed during major and + * minor compactions. It should also be noted that this iterator implements the + * security filtering rather than relying on a follow-on iterator to do it so + * that we ensure the test is performed. + */ +abstract public class KeyTransformingIterator extends WrappingIterator implements OptionDescriber { + public static final String AUTH_OPT = "authorizations"; + protected Logger log = Logger.getLogger(getClass()); + + protected ArrayList<Pair<Key,Value>> keys = new ArrayList<Pair<Key,Value>>(); + protected int keyPos = -1; + protected boolean scanning; + protected Range seekRange; + protected Collection<ByteSequence> seekColumnFamilies; + protected boolean seekColumnFamiliesInclusive; + + private VisibilityEvaluator ve = null; + private LRUMap visibleCache = null; + private LRUMap parsedVisibilitiesCache = null; + + private static Comparator<Pair<Key,Value>> keyComparator = new Comparator<Pair<Key,Value>>() { + @Override + public int compare(Pair<Key,Value> o1, Pair<Key,Value> o2) { + return o1.getFirst().compareTo(o2.getFirst()); + } + }; + + public KeyTransformingIterator() { + } + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + super.init(source, options, env); + scanning = IteratorScope.scan.equals(env.getIteratorScope()); + if (scanning) { + String auths = options.get(AUTH_OPT); + if (auths != null && !auths.isEmpty()) { + ve = new VisibilityEvaluator(new Authorizations(auths.split(","))); + visibleCache = new LRUMap(100); + } + } + + parsedVisibilitiesCache = new LRUMap(100); + } + + @Override + public IteratorOptions describeOptions() { + String desc = "This iterator allows keys to be transformed."; + String authDesc = "Comma-separated list of user's scan authorizations. " + + "If excluded or empty, then no visibility check is performed on transformed keys."; + return new IteratorOptions(getClass().getSimpleName(), desc, Collections.singletonMap(AUTH_OPT, authDesc), null); + } + + @Override + public boolean validateOptions(Map<String,String> options) { + return true; + } + + @Override + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + KeyTransformingIterator copy; + + try { + copy = getClass().newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + copy.setSource(getSource().deepCopy(env)); + + copy.scanning = scanning; + copy.keyPos = keyPos; + copy.keys.addAll(keys); + copy.seekRange = (seekRange == null) ? null : new Range(seekRange); + copy.seekColumnFamilies = (seekColumnFamilies == null) ? null : new HashSet<ByteSequence>(seekColumnFamilies); + copy.seekColumnFamiliesInclusive = seekColumnFamiliesInclusive; + + copy.ve = ve; + if (visibleCache != null) { + copy.visibleCache = new LRUMap(visibleCache.maxSize()); + copy.visibleCache.putAll(visibleCache); + } + + if (parsedVisibilitiesCache != null) { + copy.parsedVisibilitiesCache = new LRUMap(parsedVisibilitiesCache.maxSize()); + copy.parsedVisibilitiesCache.putAll(parsedVisibilitiesCache); + } + + return copy; + } + + @Override + public boolean hasTop() { + return keyPos >= 0 && keyPos < keys.size(); + } + + @Override + public Key getTopKey() { + return hasTop() ? keys.get(keyPos).getFirst() : null; + } + + @Override + public Value getTopValue() { + return hasTop() ? keys.get(keyPos).getSecond() : null; + } + + @Override + public void next() throws IOException { + // Move on to the next entry since we returned the entry at keyPos before + if (keyPos >= 0) + keyPos++; + + // If we emptied out the transformed key map then transform the next key + // set from the source. Itâs possible that transformation could produce keys + // that are outside of our range or are not visible to the end user, so after the + // call below we might not have added any keys to the map. Keep going until + // we either get some keys in the map or exhaust the source iterator. + while (!hasTop() && super.hasTop()) + transformKeys(); + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + seekRange = (range != null) ? new Range(range) : null; + seekColumnFamilies = columnFamilies; + seekColumnFamiliesInclusive = inclusive; + + // Seek the source iterator, but use a recalculated range that ensures + // we see all keys with the same "prefix." We need to do this since + // transforming could change the sort order and transformed keys that + // are before the range start could be inside the range after transformation. + super.seek(computeReseekRange(range), untransformColumnFamilies(columnFamilies), inclusive); + + // Range clipping could cause us to trim out all the keys we transformed. + // Keep looping until we either have some keys in the output range, or have + // exhausted the source iterator. + keyPos = -1; // âClearâ list so hasTop returns false to get us into the loop (transformKeys actually clears) + while (!hasTop() && super.hasTop()) { + // Build up a sorted list of all keys for the same prefix. When + // people ask for keys, return from this list first until it is empty + // before incrementing the source iterator. + transformKeys(); + } + } + + /** + * Reads all keys matching the first key's prefix from the source + * iterator, transforms them, and sorts the resulting keys. Transformed + * keys that fall outside of our seek range or can't be seen by the + * user are excluded. + */ + protected void transformKeys() throws IOException { + keyPos = -1; + keys.clear(); + Key prefixKey = super.hasTop() ? new Key(super.getTopKey()) : null; + + while (super.hasTop()) { + Key sourceTopKey = super.getTopKey(); + + // If the source key equals our prefix key (up to the prefix), then + // we have a key that needs transformed. Otherwise, we're done. + if (sourceTopKey.equals(prefixKey, getKeyPrefix())) { + Key transformedKey = transformKey(sourceTopKey); + + // If the transformed key didn't actually change, then we need + // to make a copy since we're caching it in our keys list. + if (transformedKey == sourceTopKey) + transformedKey = new Key(sourceTopKey); + // We could check that the transformed key didn't transform anything + // in the key prefix here... + + // Transformation could have produced a key that falls outside + // of the seek range, or one that the user cannot see. Check + // these before adding it to the output list. + if (includeTransformedKey(transformedKey)) + keys.add(new Pair<Key,Value>(transformedKey, new Value(super.getTopValue()))); + } else { + break; + } + + super.next(); + } + + if (!keys.isEmpty()) { + Collections.sort(keys, keyComparator); + keyPos = 0; + } + } + + /** + * Determines whether or not to include {@code transformedKey} in the + * output. It is possible that transformation could have produced a key + * that falls outside of the seek range, a key with a visibility the user + * can't see, a key with a visibility that doesn't parse, or a key with a + * column family that wasn't fetched. We only do some checks (outside the + * range, user can see) if we're scanning. The range check is not done for + * major/minor compaction since seek ranges won't be in our transformed key + * space and we will never change the row so we can't produce keys that + * would fall outside the tablet anyway. + * + * @param transformedKey the key to check + * @return {@code true} if the key should be included and {@code false} if not + */ + protected boolean includeTransformedKey(Key transformedKey) { + boolean include = canSee(transformedKey); + if (scanning && seekRange != null) { + include = include && seekRange.contains(transformedKey); + } + return include; + } + + /** + * Indicates whether or not the user is able to see {@code key}. If the + * user has not supplied authorizations, or the iterator is not in the + * scan scope, then this method simply returns {@code true}. Otherwise, + * {@code key}'s column visibility is tested against the user-supplied + * authorizations, and the test result is returned. For performance, + * the test results are cached so that the same visibility is not tested + * multiple times. + * + * @param key the key to test + * @return {@code true} if the key is visible or iterator is not scanning, + * and {@code false} if not + */ + protected boolean canSee(Key key) { + // Ensure that the visibility (which could have been transformed) parses. + ByteSequence visibility = key.getColumnVisibilityData(); + ColumnVisibility colVis = (ColumnVisibility) parsedVisibilitiesCache.get(visibility); + if (colVis == null) { + try { + colVis = new ColumnVisibility(visibility.toArray()); + } catch (BadArgumentException e) { + log.error("Transformation produced an invalid visibility: " + visibility); + throw e; + } + } + + Boolean visible = canSeeColumnFamily(key); + + if (!scanning || !visible || ve == null || visibleCache == null) + return visible; + + visible = (Boolean) visibleCache.get(visibility); + if (visible == null) { + try { + visible = ve.evaluate(colVis); + visibleCache.put(visibility, visible); + } catch (VisibilityParseException e) { + log.error("Parse Error", e); + visible = Boolean.FALSE; + } catch (BadArgumentException e) { + log.error("Parse Error", e); + visible = Boolean.FALSE; + } + } + + return visible; + } + + /** + * Indicates whether or not {@code key} can be seen, according to the + * fetched column families for this iterator. + * + * @param key the key whose column family is to be tested + * @return {@code true} if {@code key}'s column family is one of those fetched + * in the set passed to our {@link #seek(Range, Collection, boolean)} method + */ + protected boolean canSeeColumnFamily(Key key) { + boolean visible = true; + if (seekColumnFamilies != null) { + ByteSequence columnFamily = key.getColumnFamilyData(); + if (seekColumnFamiliesInclusive) + visible = seekColumnFamilies.contains(columnFamily); + else + visible = !seekColumnFamilies.contains(columnFamily); + } + return visible; + } + + /** + * Possibly expand {@code range} to include everything for the key prefix + * we are working with. That is, if our prefix is ROW_COLFAM, then we + * need to expand the range so we're sure to include all entries having + * the same row and column family as the start/end of the range. + * + * @param range the range to expand + * @return the modified range + */ + protected Range computeReseekRange(Range range) { + Key startKey = range.getStartKey(); + boolean startKeyInclusive = range.isStartKeyInclusive(); + // If anything after the prefix is set, then clip the key so we include + // everything for the prefix. + if (isSetAfterPart(startKey, getKeyPrefix())) { + startKey = copyPartialKey(startKey, getKeyPrefix()); + startKeyInclusive = true; + } + Key endKey = range.getEndKey(); + boolean endKeyInclusive = range.isEndKeyInclusive(); + if (isSetAfterPart(endKey, getKeyPrefix())) { + endKey = endKey.followingKey(getKeyPrefix()); + endKeyInclusive = true; + } + return new Range(startKey, startKeyInclusive, endKey, endKeyInclusive); + } + + /** + * Indicates whether or not any part of {@code key} excluding + * {@code part} is set. For example, if part is ROW_COLFAM_COLQUAL, + * then this method determines whether or not the column visibility, + * timestamp, or delete flag is set on {@code key}. + * + * @param key the key to check + * @param part the part of the key that doesn't need to be checked (everything after does) + * @return {@code true} if anything after {@code part} is set on {@code key}, and {@code false} if not + */ + protected boolean isSetAfterPart(Key key, PartialKey part) { + boolean isSet = false; + if (key != null) { + // Breaks excluded on purpose. + switch (part) { + case ROW: + isSet = isSet || key.getColumnFamilyData().length() > 0; + case ROW_COLFAM: + isSet = isSet || key.getColumnQualifierData().length() > 0; + case ROW_COLFAM_COLQUAL: + isSet = isSet || key.getColumnVisibilityData().length() > 0; + case ROW_COLFAM_COLQUAL_COLVIS: + isSet = isSet || key.getTimestamp() < Long.MAX_VALUE; + case ROW_COLFAM_COLQUAL_COLVIS_TIME: + isSet = isSet || key.isDeleted(); + case ROW_COLFAM_COLQUAL_COLVIS_TIME_DEL: + break; + } + } + return isSet; + } + + /** + * Creates a copy of {@code key}, copying only the parts of the key specified + * in {@code part}. For example, if {@code part} is ROW_COLFAM_COLQUAL, then + * this method would copy the row, column family, and column qualifier from + * {@code key} into a new key. + * + * @param key the key to copy + * @param part the parts of {@code key} to copy + * @return the new key containing {@code part} of {@code key} + */ + protected Key copyPartialKey(Key key, PartialKey part) { + Key keyCopy; + switch (part) { + case ROW: + keyCopy = new Key(key.getRow()); + break; + case ROW_COLFAM: + keyCopy = new Key(key.getRow(), key.getColumnFamily()); + break; + case ROW_COLFAM_COLQUAL: + keyCopy = new Key(key.getRow(), key.getColumnFamily(), key.getColumnQualifier()); + break; + case ROW_COLFAM_COLQUAL_COLVIS: + keyCopy = new Key(key.getRow(), key.getColumnFamily(), key.getColumnQualifier(), key.getColumnVisibility()); + break; + case ROW_COLFAM_COLQUAL_COLVIS_TIME: + keyCopy = new Key(key.getRow(), key.getColumnFamily(), key.getColumnQualifier(), + key.getColumnVisibility(), key.getTimestamp()); + break; + default: + throw new IllegalArgumentException("Unsupported key part: " + part); + } + return keyCopy; + } + + /** + * Make a new key with all parts (including delete flag) coming from {@code originalKey} but + * use {@code newColFam} as the column family. + */ + protected Key replaceColumnFamily(Key originalKey, Text newColFam) { + byte[] row = originalKey.getRowData().toArray(); + byte[] cf = newColFam.getBytes(); + byte[] cq = originalKey.getColumnQualifierData().toArray(); + byte[] cv = originalKey.getColumnVisibilityData().toArray(); + long timestamp = originalKey.getTimestamp(); + Key newKey = new Key(row, 0, row.length, + cf, 0, newColFam.getLength(), + cq, 0, cq.length, + cv, 0, cv.length, + timestamp); + newKey.setDeleted(originalKey.isDeleted()); + return newKey; + } + + /** + * Make a new key with all parts (including delete flag) coming from {@code originalKey} but + * use {@code newColQual} as the column qualifier. + */ + protected Key replaceColumnQualifier(Key originalKey, Text newColQual) { + byte[] row = originalKey.getRowData().toArray(); + byte[] cf = originalKey.getColumnFamilyData().toArray(); + byte[] cq = newColQual.getBytes(); + byte[] cv = originalKey.getColumnVisibilityData().toArray(); + long timestamp = originalKey.getTimestamp(); + Key newKey = new Key(row, 0, row.length, + cf, 0, cf.length, + cq, 0, newColQual.getLength(), + cv, 0, cv.length, + timestamp); + newKey.setDeleted(originalKey.isDeleted()); + return newKey; + } + + /** + * Make a new key with all parts (including delete flag) coming from {@code originalKey} but + * use {@code newColVis} as the column visibility. + */ + protected Key replaceColumnVisibility(Key originalKey, Text newColVis) { + byte[] row = originalKey.getRowData().toArray(); + byte[] cf = originalKey.getColumnFamilyData().toArray(); + byte[] cq = originalKey.getColumnQualifierData().toArray(); + byte[] cv = newColVis.getBytes(); + long timestamp = originalKey.getTimestamp(); + Key newKey = new Key(row, 0, row.length, + cf, 0, cf.length, + cq, 0, cq.length, + cv, 0, newColVis.getLength(), + timestamp); + newKey.setDeleted(originalKey.isDeleted()); + return newKey; + } + + /** + * Make a new key with a column family, column qualifier, and column visibility. + * Copy the rest of the parts of the key (including delete flag) from {@code originalKey}. + */ + protected Key replaceKeyParts(Key originalKey, Text newColFam, Text newColQual, Text newColVis) { + byte[] row = originalKey.getRowData().toArray(); + byte[] cf = newColFam.getBytes(); + byte[] cq = newColQual.getBytes(); + byte[] cv = newColVis.getBytes(); + long timestamp = originalKey.getTimestamp(); + Key newKey = new Key(row, 0, row.length, + cf, 0, newColFam.getLength(), + cq, 0, newColQual.getLength(), + cv, 0, newColVis.getLength(), + timestamp); + newKey.setDeleted(originalKey.isDeleted()); + return newKey; + } + + /** + * Make a new key with a column qualifier, and column visibility. Copy the rest + * of the parts of the key (including delete flag) from {@code originalKey}. + */ + protected Key replaceKeyParts(Key originalKey, Text newColQual, Text newColVis) { + byte[] row = originalKey.getRowData().toArray(); + byte[] cf = originalKey.getColumnFamilyData().toArray(); + byte[] cq = newColQual.getBytes(); + byte[] cv = newColVis.getBytes(); + long timestamp = originalKey.getTimestamp(); + Key newKey = new Key(row, 0, row.length, + cf, 0, cf.length, + cq, 0, newColQual.getLength(), + cv, 0, newColVis.getLength(), + timestamp); + newKey.setDeleted(originalKey.isDeleted()); + return newKey; + } + + /** + * Reverses the transformation applied to column families that are fetched at seek + * time. If this iterator is transforming column families, then this method + * should be overridden to reverse the transformation on the supplied collection + * of column families. This is necessary since the fetch/seek will be performed + * in the transformed space, but when passing the column family set on to the source, + * the column families need to be in the untransformed space. + * + * @param columnFamilies the column families that have been fetched at seek time + * @return the untransformed column families that would transform info {@code columnFamilies} + */ + protected Collection<ByteSequence> untransformColumnFamilies(Collection<ByteSequence> columnFamilies) { + return columnFamilies; + } + + /** + * Indicates the prefix of keys that will be transformed by this iterator. + * In other words, this is the part of the key that will <i>not</i> be + * transformed by this iterator. For example, if this method returns + * ROW_COLFAM, then {@link #transformKey(Key)} may be changing the + * column qualifier, column visibility, or timestamp, but it won't be + * changing the row or column family. + * + * @return the part of the key this iterator is not transforming + */ + abstract protected PartialKey getKeyPrefix(); + + /** + * Transforms {@code originalKey}. This method must not change the row + * part of the key, and must only change the parts of the key after the + * return value of {@link #getKeyPrefix()}. Implementors must also remember + * to copy the delete flag from {@code originalKey} onto the new key. Or, + * implementors should use one of the helper methods to produce the new + * key. See any of the replaceKeyParts methods. + * + * @param originalKey the key to be transformed + * @return the modified key + * @see #replaceColumnFamily(Key, Text) + * @see #replaceColumnQualifier(Key, Text) + * @see #replaceColumnVisibility(Key, Text) + * @see #replaceKeyParts(Key, Text, Text) + * @see #replaceKeyParts(Key, Text, Text, Text) + */ + abstract protected Key transformKey(Key originalKey); +} Added: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java?rev=1434762&view=auto ============================================================================== --- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java (added) +++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java Thu Jan 17 16:28:48 2013 @@ -0,0 +1,539 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.iterators.user; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.io.Text; +import org.junit.Before; +import org.junit.Test; + +public class KeyTransformingIteratorTest { + private static final String TABLE_NAME = "test_table"; + private static Authorizations authorizations = new Authorizations("vis0", "vis1", "vis2", "vis3", "vis4"); + private Connector connector; + private Scanner scanner; + + @Before + public void setUpMockAccumulo() throws Exception { + MockInstance instance = new MockInstance("test"); + connector = instance.getConnector("user", "password"); + connector.securityOperations().changeUserAuthorizations("user", authorizations); + + if (connector.tableOperations().exists(TABLE_NAME)) + connector.tableOperations().delete(TABLE_NAME); + connector.tableOperations().create(TABLE_NAME); + BatchWriterConfig bwCfg = new BatchWriterConfig(); + bwCfg.setMaxWriteThreads(1); + + BatchWriter bw = connector.createBatchWriter(TABLE_NAME, bwCfg); + bw.addMutation(createDefaultMutation("row1")); + bw.addMutation(createDefaultMutation("row2")); + bw.addMutation(createDefaultMutation("row3")); + + bw.flush(); + bw.close(); + + scanner = connector.createScanner(TABLE_NAME, authorizations); + scanner.addScanIterator(new IteratorSetting(20, ReuseIterator.class)); + } + + private void setUpTransformIterator(Class<? extends KeyTransformingIterator> clazz) { + IteratorSetting cfg = new IteratorSetting(21, clazz); + cfg.setName("keyTransformIter"); + cfg.addOption(KeyTransformingIterator.AUTH_OPT, "vis0, vis1, vis2, vis3"); + scanner.addScanIterator(cfg); + } + + @Test + public void testIdentityScan() throws Exception { + setUpTransformIterator(IdentityKeyTransformingIterator.class); + + // This is just an identity scan, but with the "reuse" iterator that reuses + // the same key/value pair for every getTopKey/getTopValue call. The code + // will always return the final key/value if we didn't copy the original key + // in the iterator. + TreeMap<Key,Value> expected = new TreeMap<Key,Value>(); + for (int row = 1; row <= 3; ++row) { + for (int cf = 1; cf <= 3; ++cf) { + for (int cq = 1; cq <= 3; ++cq) { + for (int cv = 1; cv <= 3; ++cv) { + putExpected(expected, row, cf, cq, cv, null); + } + } + } + } + + checkExpected(expected); + } + + @Test + public void testNoRangeScan() throws Exception { + @SuppressWarnings("unchecked") + List<Class<? extends ReversingKeyTransformingIterator>> classes = Arrays.asList(ColFamReversingKeyTransformingIterator.class, ColQualReversingKeyTransformingIterator.class, ColVisReversingKeyTransformingIterator.class); + + // Test transforming col fam, col qual, col vis + for (Class<? extends ReversingKeyTransformingIterator> clazz : classes) { + scanner.removeScanIterator("keyTransformIter"); + setUpTransformIterator(clazz); + + // All rows with visibilities reversed + KeyTransformingIterator iter = clazz.newInstance(); + TreeMap<Key,Value> expected = new TreeMap<Key,Value>(); + for (int row = 1; row <= 3; ++row) { + for (int cf = 1; cf <= 3; ++cf) { + for (int cq = 1; cq <= 3; ++cq) { + for (int cv = 1; cv <= 3; ++cv) { + putExpected(expected, row, cf, cq, cv, iter.getKeyPrefix()); + } + } + } + } + + checkExpected(expected); + } + } + + @Test + public void testVisbilityFiltering() throws Exception { + // Should return nothing since we produced visibilities that can't be seen + setUpTransformIterator(BadVisKeyTransformingIterator.class); + checkExpected(new TreeMap<Key,Value>()); + + // Do a "reverse" on the visibility (vis1 -> vis2, vis2 -> vis3, vis3 -> vis0) + // Source data has vis1, vis2, vis3 so vis0 is a new one that is introduced. + // Make sure it shows up in the output with the default test auths which include + // vis0. + scanner.removeScanIterator("keyTransformIter"); + setUpTransformIterator(ColVisReversingKeyTransformingIterator.class); + TreeMap<Key,Value> expected = new TreeMap<Key,Value>(); + for (int row = 1; row <= 3; ++row) { + for (int cf = 1; cf <= 3; ++cf) { + for (int cq = 1; cq <= 3; ++cq) { + for (int cv = 1; cv <= 3; ++cv) { + putExpected(expected, row, cf, cq, cv, PartialKey.ROW_COLFAM_COLQUAL); + } + } + } + } + checkExpected(expected); + } + + @Test + public void testRangeStart() throws Exception { + setUpTransformIterator(ColVisReversingKeyTransformingIterator.class); + scanner.setRange(new Range(new Key("row1", "cf2", "cq2", "vis1"), true, new Key("row1", "cf2", "cq3"), false)); + + TreeMap<Key,Value> expected = new TreeMap<Key,Value>(); + putExpected(expected, 1, 2, 2, 1, PartialKey.ROW_COLFAM_COLQUAL); // before the range start, but transforms in the range + putExpected(expected, 1, 2, 2, 2, PartialKey.ROW_COLFAM_COLQUAL); + + checkExpected(expected); + } + + @Test + public void testRangeEnd() throws Exception { + setUpTransformIterator(ColVisReversingKeyTransformingIterator.class); + scanner.setRange(new Range(new Key("row1", "cf2", "cq2"), true, new Key("row1", "cf2", "cq2", "vis2"), false)); + + TreeMap<Key,Value> expected = new TreeMap<Key,Value>(); + //putExpected(expected, 1, 2, 2, 1, part); // transforms vis outside range end + putExpected(expected, 1, 2, 2, 2, PartialKey.ROW_COLFAM_COLQUAL); + putExpected(expected, 1, 2, 2, 3, PartialKey.ROW_COLFAM_COLQUAL); + + checkExpected(expected); + } + + @Test + public void testPrefixRange() throws Exception { + setUpTransformIterator(ColFamReversingKeyTransformingIterator.class); + // Set a range that is before all of the untransformed data. However, + // the data with untransformed col fam cf3 will transform to cf0 and + // be inside the range. + scanner.setRange(new Range(new Key("row1", "cf0"), true, new Key("row1", "cf1"), false)); + + TreeMap<Key,Value> expected = new TreeMap<Key,Value>(); + for (int cq = 1; cq <= 3; ++cq) + for (int cv = 1; cv <= 3; ++cv) + putExpected(expected, 1, 3, cq, cv, PartialKey.ROW); + checkExpected(expected); + } + + @Test + public void testPostfixRange() throws Exception { + // Set a range that's after all data and make sure we don't + // somehow return something. + setUpTransformIterator(ColFamReversingKeyTransformingIterator.class); + scanner.setRange(new Range(new Key("row4"), null)); + checkExpected(new TreeMap<Key,Value>()); + } + + @Test + public void testReplaceKeyParts() throws Exception { + KeyTransformingIterator it = new IdentityKeyTransformingIterator(); + Key originalKey = new Key("r", "cf", "cq", "cv", 42); + originalKey.setDeleted(true); + + Key newKey = it.replaceColumnFamily(originalKey, new Text("test")); + assertEquals(createDeleteKey("r","test","cq","cv",42), newKey); + + newKey = it.replaceColumnQualifier(originalKey, new Text("test")); + assertEquals(createDeleteKey("r","cf","test","cv",42), newKey); + + newKey = it.replaceColumnVisibility(originalKey, new Text("test")); + assertEquals(createDeleteKey("r","cf","cq","test",42), newKey); + + newKey = it.replaceKeyParts(originalKey, new Text("testCQ"), new Text("testCV")); + assertEquals(createDeleteKey("r","cf","testCQ","testCV",42), newKey); + + newKey = it.replaceKeyParts(originalKey, new Text("testCF"), new Text("testCQ"), new Text("testCV")); + assertEquals(createDeleteKey("r","testCF","testCQ","testCV",42), newKey); + } + + @Test + public void testFetchColumnFamilites() throws Exception { + // In this test, we are fetching column family cf2, which is in + // the transformed space. The source column family that will + // transform into cf2 is cf1, so that is the column family we + // put in the expectations. + int expectedCF = 1; + setUpTransformIterator(ColFamReversingKeyTransformingIterator.class); + scanner.fetchColumnFamily(new Text("cf2")); + + TreeMap<Key,Value> expected = new TreeMap<Key,Value>(); + for (int row = 1; row <= 3; ++row) + for (int cq = 1; cq <= 3; ++cq) + for (int cv = 1; cv <= 3; ++cv) + putExpected(expected, row, expectedCF, cq, cv, PartialKey.ROW); + checkExpected(expected); + } + + @Test + public void testDeepCopy() throws Exception { + + } + + @Test + public void testCompactionScanFetchingColumnFamilies() throws Exception { + // In this test, we are fetching column family cf2, which is in + // the transformed space. The source column family that will + // transform into cf2 is cf1, so that is the column family we + // put in the expectations. + int expectedCF = 1; + setUpTransformIterator(ColFamReversingCompactionKeyTransformingIterator.class); + scanner.fetchColumnFamily(new Text("cf2")); + + TreeMap<Key,Value> expected = new TreeMap<Key,Value>(); + for (int row = 1; row <= 3; ++row) + for (int cq = 1; cq <= 3; ++cq) + for (int cv = 1; cv <= 3; ++cv) + putExpected(expected, row, expectedCF, cq, cv, PartialKey.ROW); + checkExpected(expected); + } + + @Test + public void testCompactionDoesntFilterVisibilities() throws Exception { + // In scan mode, this should return nothing since it produces visibilites + // the user can't see. In compaction mode, however, the visibilites + // should still show up. + setUpTransformIterator(BadVisCompactionKeyTransformingIterator.class); + + TreeMap<Key,Value> expected = new TreeMap<Key,Value>(); + for (int rowID = 1; rowID <= 3; ++rowID) { + for (int cfID = 1; cfID <= 3; ++cfID) { + for (int cqID = 1; cqID <= 3; ++cqID) { + for (int cvID = 1; cvID <= 3; ++cvID) { + String row = "row" + rowID; + String cf = "cf" + cfID; + String cq = "cq" + cqID; + String cv = "badvis"; + long ts = 100*cfID + 10*cqID + cvID; + String val = "val" + ts; + expected.put(new Key(row, cf, cq, cv, ts), new Value(val.getBytes())); + } + } + } + } + + checkExpected(expected); + } + + private Key createDeleteKey(String row, String colFam, String colQual, String colVis, long timestamp) { + Key key = new Key(row, colFam, colQual, colVis, timestamp); + key.setDeleted(true); + return key; + } + + private void checkExpected(TreeMap<Key,Value> expectedEntries) { + for (Entry<Key,Value> entry : scanner) { + Entry<Key,Value> expected = expectedEntries.pollFirstEntry(); + Key actualKey = entry.getKey(); + Value actualValue = entry.getValue(); + + assertNotNull("Ran out of expected entries on: " + entry, expected); + assertEquals("Key mismatch", expected.getKey(), actualKey); + assertEquals("Value mismatch", expected.getValue(), actualValue); + } + + assertTrue("Scanner did not return all expected entries: " + expectedEntries, expectedEntries.isEmpty()); + } + + private static void putExpected(SortedMap<Key,Value> expected, int rowID, int cfID, int cqID, int cvID, PartialKey part) { + String row = "row" + rowID; + String cf = "cf" + cfID; + String cq = "cq" + cqID; + String cv = "vis" + cvID; + long ts = 100*cfID + 10*cqID + cvID; + String val = "val" + ts; + + if (part != null) { + switch (part) { + case ROW: + cf = transform(new Text(cf)).toString(); break; + case ROW_COLFAM: + cq = transform(new Text(cq)).toString(); break; + case ROW_COLFAM_COLQUAL: + cv = transform(new Text(cv)).toString(); break; + default: + break; + } + } + + expected.put(new Key(row, cf, cq, cv, ts), new Value(val.getBytes())); + } + + private static Text transform(Text val) { + String s = val.toString(); + // Reverse the order of the number at the end, and subtract one + int i = 3 - Integer.parseInt(s.substring(s.length()-1)); + StringBuilder sb = new StringBuilder(); + sb.append(s.substring(0, s.length() - 1)); + sb.append(i); + return new Text(sb.toString()); + } + + private static Mutation createDefaultMutation(String row) { + Mutation m = new Mutation(row); + for (int cfID = 1; cfID <= 3; ++cfID) { + for (int cqID = 1; cqID <= 3; ++cqID) { + for (int cvID = 1; cvID <= 3; ++cvID) { + String cf = "cf" + cfID; + String cq = "cq" + cqID; + String cv = "vis" + cvID; + long ts = 100*cfID + 10*cqID + cvID; + String val = "val" + ts; + + m.put(cf, cq, new ColumnVisibility(cv), ts, val); + } + } + } + return m; + } + + private static Key reverseKeyPart(Key originalKey, PartialKey part) { + Text row = originalKey.getRow(); + Text cf = originalKey.getColumnFamily(); + Text cq = originalKey.getColumnQualifier(); + Text cv = originalKey.getColumnVisibility(); + long ts = originalKey.getTimestamp(); + switch (part) { + case ROW: + cf = transform(cf); break; + case ROW_COLFAM: + cq = transform(cq); break; + case ROW_COLFAM_COLQUAL: + cv = transform(cv); break; + default: + break; + } + return new Key(row, cf, cq, cv, ts); + } + + public static class IdentityKeyTransformingIterator extends KeyTransformingIterator { + @Override + protected PartialKey getKeyPrefix() { + return PartialKey.ROW; + } + + @Override + protected Key transformKey(Key originalKey) { + return originalKey; + }; + } + + public static abstract class ReversingKeyTransformingIterator extends KeyTransformingIterator { + @Override + protected Key transformKey(Key originalKey) { + return reverseKeyPart(originalKey, getKeyPrefix()); + }; + } + + public static class ColFamReversingKeyTransformingIterator extends ReversingKeyTransformingIterator { + @Override + protected PartialKey getKeyPrefix() { + return PartialKey.ROW; + } + + @Override + protected Collection<ByteSequence> untransformColumnFamilies(Collection<ByteSequence> columnFamilies) { + HashSet<ByteSequence> untransformed = new HashSet<ByteSequence>(); + for (ByteSequence cf : columnFamilies) + untransformed.add(untransformColumnFamily(cf)); + return untransformed; + } + + protected ByteSequence untransformColumnFamily(ByteSequence colFam) { + Text transformed = transform(new Text(colFam.toArray())); + byte[] bytes = transformed.getBytes(); + return new ArrayByteSequence(bytes, 0, transformed.getLength()); + } + } + + public static class ColFamReversingCompactionKeyTransformingIterator extends ColFamReversingKeyTransformingIterator { + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + env = new MajCIteratorEnvironmentAdapter(env); + super.init(source, options, env); + } + } + + public static class ColQualReversingKeyTransformingIterator extends ReversingKeyTransformingIterator { + @Override + protected PartialKey getKeyPrefix() { + return PartialKey.ROW_COLFAM; + } + } + + public static class ColVisReversingKeyTransformingIterator extends ReversingKeyTransformingIterator { + @Override + protected PartialKey getKeyPrefix() { + return PartialKey.ROW_COLFAM_COLQUAL; + } + } + + public static class BadVisKeyTransformingIterator extends KeyTransformingIterator { + @Override + protected PartialKey getKeyPrefix() { + return PartialKey.ROW_COLFAM_COLQUAL; + } + + @Override + protected Key transformKey(Key originalKey) { + return new Key(originalKey.getRow(), originalKey.getColumnFamily(), originalKey.getColumnQualifier(), new Text("badvis"), originalKey.getTimestamp()); + } + } + + public static class BadVisCompactionKeyTransformingIterator extends BadVisKeyTransformingIterator { + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + env = new MajCIteratorEnvironmentAdapter(env); + super.init(source, options, env); + } + } + + public static class ReuseIterator extends WrappingIterator { + private Key topKey = new Key(); + private Value topValue = new Value(); + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + super.seek(range, columnFamilies, inclusive); + loadTop(); + } + + @Override + public void next() throws IOException { + super.next(); + loadTop(); + } + + @Override + public Key getTopKey() { + return topKey; + } + + @Override + public Value getTopValue() { + return topValue; + } + + private void loadTop() { + if (hasTop()) { + topKey.set(super.getTopKey()); + topValue.set(super.getTopValue().get()); + } + } + } + + private static class MajCIteratorEnvironmentAdapter implements IteratorEnvironment { + private IteratorEnvironment delegate; + + public MajCIteratorEnvironmentAdapter(IteratorEnvironment delegate) { + this.delegate = delegate; + } + + public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException { + return delegate.reserveMapFileReader(mapFileName); + } + + public AccumuloConfiguration getConfig() { + return delegate.getConfig(); + } + + public IteratorScope getIteratorScope() { + return IteratorScope.majc; + } + + public boolean isFullMajorCompaction() { + return delegate.isFullMajorCompaction(); + } + + public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) { + delegate.registerSideChannel(iter); + } + } +}