Author: kturner
Date: Thu Jan 17 16:28:48 2013
New Revision: 1434762

URL: http://svn.apache.org/viewvc?rev=1434762&view=rev
Log:
ACCUMULO-956 checkin of patch from Brain Loss

Added:
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java
    
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java

Added: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java?rev=1434762&view=auto
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java
 (added)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java
 Thu Jan 17 16:28:48 2013
@@ -0,0 +1,599 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.OptionDescriber;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.security.VisibilityEvaluator;
+import org.apache.accumulo.core.security.VisibilityParseException;
+import org.apache.accumulo.core.util.BadArgumentException;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+/**
+ * The KeyTransformingIterator allows portions of a key (except for the row)
+ * to be transformed.  This iterator handles the details that come with 
modifying
+ * keys (i.e., that the sort order could change).  In order to do so, however,
+ * the iterator must put all keys sharing the same prefix in memory.  Prefix
+ * is defined as the parts of the key that are not modified by this iterator.
+ * That is, if the iterator modifies column qualifier and timestamp, then the
+ * prefix is row and column family.  In that case, the iterator must load all
+ * column qualifiers for each row/column family pair into memory.  Given this
+ * constraint, care must be taken by users of this iterator to ensure it is
+ * not run in such a way that will overrun memory in a tablet server.
+ * <p>
+ * If the implementing iterator is transforming column families, then it
+ * must also override {@link #untransformColumnFamilies(Collection)} to handle
+ * the case when column families are fetched at scan time.  The fetched column
+ * families will/must be in the transformed space, and the untransformed column
+ * families need to be passed to this iterator's source.  If it is not possible
+ * to write a reverse transformation (e.g., the column family transformation
+ * depends on the row value or something like that), then the iterator must
+ * not fetch specific column families (or only fetch column families that are
+ * known to not transform at all).
+ * <p>
+ * If the implementing iterator is transforming column visibilities, then
+ * users must be careful NOT to fetch column qualifiers from the scanner.
+ * The reason for this is due to ACCUMULO-??? (insert issue number).
+ * <p>
+ * If the implementing iterator is transforming column visibilities, then the
+ * user should be sure to supply authorizations via the {@link #AUTH_OPT}
+ * iterator option (note that this is only necessary for scan scope iterators).
+ * The supplied authorizations should be in the transformed space, but the
+ * authorizations supplied to the scanner should be in the untransformed
+ * space.  That is, if the iterator transforms A to 1, B to 2, C to 3, etc,
+ * then the auths supplied when the scanner is constructed should be A,B,C,...
+ * and the auths supplied to the iterator should be 1,2,3,...  The reason
+ * for this is that the scanner performs security filtering before this
+ * iterator is called, so the authorizations need to be in the original
+ * untransformed space.  Since the iterator can transform visibilities, it is
+ * possible that it could produce visibilities that the user cannot see,
+ * so the transformed keys must be tested to ensure the user is allowed to 
view 
+ * them.  Note that this test is not necessary when the iterator is not used
+ * in the scan scope since no security filtering is performed during major and
+ * minor compactions.  It should also be noted that this iterator implements 
the
+ * security filtering rather than relying on a follow-on iterator to do it so
+ * that we ensure the test is performed.
+ */
+abstract public class KeyTransformingIterator extends WrappingIterator 
implements OptionDescriber {
+  public static final String AUTH_OPT = "authorizations";
+  protected Logger log = Logger.getLogger(getClass());
+  
+  protected ArrayList<Pair<Key,Value>> keys = new ArrayList<Pair<Key,Value>>();
+  protected int keyPos = -1;
+  protected boolean scanning;
+  protected Range seekRange;
+  protected Collection<ByteSequence> seekColumnFamilies;
+  protected boolean seekColumnFamiliesInclusive;
+
+  private VisibilityEvaluator ve = null;
+  private LRUMap visibleCache = null;
+  private LRUMap parsedVisibilitiesCache = null;
+  
+  private static Comparator<Pair<Key,Value>> keyComparator = new 
Comparator<Pair<Key,Value>>() {    
+    @Override
+    public int compare(Pair<Key,Value> o1, Pair<Key,Value> o2) {
+      return o1.getFirst().compareTo(o2.getFirst());
+    }
+  };
+  
+  public KeyTransformingIterator() {
+  }
+  
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options, IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    scanning = IteratorScope.scan.equals(env.getIteratorScope());
+    if (scanning) {
+      String auths = options.get(AUTH_OPT);
+      if (auths != null && !auths.isEmpty()) {
+        ve = new VisibilityEvaluator(new Authorizations(auths.split(",")));
+        visibleCache = new LRUMap(100);
+      }
+    }
+    
+    parsedVisibilitiesCache = new LRUMap(100);
+  }
+  
+  @Override
+  public IteratorOptions describeOptions() {
+    String desc = "This iterator allows keys to be transformed.";
+    String authDesc = "Comma-separated list of user's scan authorizations.  " +
+               "If excluded or empty, then no visibility check is performed on 
transformed keys.";
+    return new IteratorOptions(getClass().getSimpleName(), desc, 
Collections.singletonMap(AUTH_OPT, authDesc), null);
+  }
+  
+  @Override
+  public boolean validateOptions(Map<String,String> options) {
+    return true;
+  }
+  
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    KeyTransformingIterator copy;
+    
+    try {
+      copy = getClass().newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    copy.setSource(getSource().deepCopy(env));
+    
+    copy.scanning = scanning;
+    copy.keyPos = keyPos;
+    copy.keys.addAll(keys);
+    copy.seekRange = (seekRange == null) ? null : new Range(seekRange);
+    copy.seekColumnFamilies = (seekColumnFamilies == null) ? null : new 
HashSet<ByteSequence>(seekColumnFamilies);
+    copy.seekColumnFamiliesInclusive = seekColumnFamiliesInclusive;
+
+    copy.ve = ve;
+    if (visibleCache != null) {
+      copy.visibleCache = new LRUMap(visibleCache.maxSize());
+      copy.visibleCache.putAll(visibleCache);
+    }
+    
+    if (parsedVisibilitiesCache != null) {
+      copy.parsedVisibilitiesCache = new 
LRUMap(parsedVisibilitiesCache.maxSize());
+      copy.parsedVisibilitiesCache.putAll(parsedVisibilitiesCache);
+    }
+    
+    return copy;
+  }
+  
+  @Override
+  public boolean hasTop() {
+    return keyPos >= 0 && keyPos < keys.size();
+  }
+  
+  @Override
+  public Key getTopKey() {
+    return hasTop() ? keys.get(keyPos).getFirst() : null;
+  }
+  
+  @Override
+  public Value getTopValue() {
+    return hasTop() ? keys.get(keyPos).getSecond() : null;
+  }
+  
+  @Override
+  public void next() throws IOException {
+    // Move on to the next entry since we returned the entry at keyPos before
+    if (keyPos >= 0)
+      keyPos++;
+
+    // If we emptied out the transformed key map then transform the next key 
+    // set from the source.  It’s possible that transformation could produce 
keys
+    // that are outside of our range or are not visible to the end user, so 
after the
+    // call below we might not have added any keys to the map.  Keep going 
until
+    // we either get some keys in the map or exhaust the source iterator.
+    while (!hasTop() && super.hasTop())
+      transformKeys();  
+  }
+  
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive) throws IOException {
+    seekRange = (range != null) ? new Range(range) : null;
+    seekColumnFamilies = columnFamilies;
+    seekColumnFamiliesInclusive = inclusive;
+
+    // Seek the source iterator, but use a recalculated range that ensures
+    // we see all keys with the same "prefix."  We need to do this since
+    // transforming could change the sort order and transformed keys that
+    // are before the range start could be inside the range after 
transformation.
+    super.seek(computeReseekRange(range), 
untransformColumnFamilies(columnFamilies), inclusive);
+
+    // Range clipping could cause us to trim out all the keys we transformed.
+    // Keep looping until we either have some keys in the output range, or have
+    // exhausted the source iterator.
+    keyPos = -1;  // “Clear” list so hasTop returns false to get us into 
the loop (transformKeys actually clears)
+    while (!hasTop() && super.hasTop()) {
+      // Build up a sorted list of all keys for the same prefix.  When
+      // people ask for keys, return from this list first until it is empty
+      // before incrementing the source iterator.
+      transformKeys();
+    }
+  }
+  
+  /**
+   * Reads all keys matching the first key's prefix from the source
+   * iterator, transforms them, and sorts the resulting keys.  Transformed
+   * keys that fall outside of our seek range or can't be seen by the
+   * user are excluded.
+   */
+  protected void transformKeys() throws IOException {
+    keyPos = -1;
+    keys.clear();
+    Key prefixKey = super.hasTop() ? new Key(super.getTopKey()) : null;
+
+    while (super.hasTop()) {
+      Key sourceTopKey = super.getTopKey();
+
+      // If the source key equals our prefix key (up to the prefix), then
+      // we have a key that needs transformed.  Otherwise, we're done.
+      if (sourceTopKey.equals(prefixKey, getKeyPrefix())) {
+        Key transformedKey = transformKey(sourceTopKey);
+        
+        // If the transformed key didn't actually change, then we need
+        // to make a copy since we're caching it in our keys list.
+        if (transformedKey == sourceTopKey)
+          transformedKey = new Key(sourceTopKey);
+        // We could check that the transformed key didn't transform anything
+        // in the key prefix here...
+
+        // Transformation could have produced a key that falls outside
+        // of the seek range, or one that the user cannot see.  Check
+        // these before adding it to the output list.
+        if (includeTransformedKey(transformedKey))
+          keys.add(new Pair<Key,Value>(transformedKey, new 
Value(super.getTopValue())));
+      } else {
+        break;
+      }
+
+      super.next();
+    }
+
+    if (!keys.isEmpty()) {
+      Collections.sort(keys, keyComparator);
+      keyPos = 0;
+    }
+  }
+  
+  /**
+   * Determines whether or not to include {@code transformedKey} in the
+   * output.  It is possible that transformation could have produced a key
+   * that falls outside of the seek range, a key with a visibility the user
+   * can't see, a key with a visibility that doesn't parse, or a key with a
+   * column family that wasn't fetched.  We only do some checks (outside the
+   * range, user can see) if we're scanning.  The range check is not done for
+   * major/minor compaction since seek ranges won't be in our transformed key
+   * space and we will never change the row so we can't produce keys that
+   * would fall outside the tablet anyway.  
+   * 
+   * @param transformedKey the key to check
+   * @return {@code true} if the key should be included and {@code false} if 
not
+   */
+  protected boolean includeTransformedKey(Key transformedKey) {
+    boolean include = canSee(transformedKey);
+    if (scanning && seekRange != null) {
+      include = include && seekRange.contains(transformedKey);
+    }
+    return include;
+  }
+  
+  /**
+   * Indicates whether or not the user is able to see {@code key}.  If the
+   * user has not supplied authorizations, or the iterator is not in the
+   * scan scope, then this method simply returns {@code true}.  Otherwise,
+   * {@code key}'s column visibility is tested against the user-supplied
+   * authorizations, and the test result is returned.  For performance,
+   * the test results are cached so that the same visibility is not tested
+   * multiple times.
+   * 
+   * @param key the key to test 
+   * @return {@code true} if the key is visible or iterator is not scanning,
+   *         and {@code false} if not
+   */
+  protected boolean canSee(Key key) {
+    // Ensure that the visibility (which could have been transformed) parses.
+    ByteSequence visibility = key.getColumnVisibilityData();
+    ColumnVisibility colVis = (ColumnVisibility) 
parsedVisibilitiesCache.get(visibility);
+    if (colVis == null) {
+      try {
+        colVis = new ColumnVisibility(visibility.toArray());
+      } catch (BadArgumentException e) {
+        log.error("Transformation produced an invalid visibility: " + 
visibility);
+        throw e;
+      }
+    }
+    
+    Boolean visible = canSeeColumnFamily(key);
+    
+    if (!scanning || !visible || ve == null || visibleCache == null)
+      return visible;
+
+    visible = (Boolean) visibleCache.get(visibility);
+    if (visible == null) {
+      try {
+        visible = ve.evaluate(colVis);
+        visibleCache.put(visibility, visible);
+      } catch (VisibilityParseException e) {
+        log.error("Parse Error", e);
+        visible = Boolean.FALSE;
+      } catch (BadArgumentException e) {
+        log.error("Parse Error", e);
+        visible = Boolean.FALSE;
+      }
+    }
+
+    return visible;
+  }
+  
+  /**
+   * Indicates whether or not {@code key} can be seen, according to the
+   * fetched column families for this iterator.
+   * 
+   * @param key the key whose column family is to be tested
+   * @return {@code true} if {@code key}'s column family is one of those 
fetched
+   *         in the set passed to our {@link #seek(Range, Collection, 
boolean)} method
+   */
+  protected boolean canSeeColumnFamily(Key key) {
+    boolean visible = true;
+    if (seekColumnFamilies != null) {
+      ByteSequence columnFamily = key.getColumnFamilyData();
+      if (seekColumnFamiliesInclusive)
+        visible = seekColumnFamilies.contains(columnFamily);
+      else
+        visible = !seekColumnFamilies.contains(columnFamily);
+    }
+    return visible;
+  }
+
+  /**
+   * Possibly expand {@code range} to include everything for the key prefix
+   * we are working with.  That is, if our prefix is ROW_COLFAM, then we
+   * need to expand the range so we're sure to include all entries having
+   * the same row and column family as the start/end of the range.
+   * 
+   * @param range the range to expand
+   * @return the modified range
+   */
+  protected Range computeReseekRange(Range range) {
+    Key startKey = range.getStartKey();
+    boolean startKeyInclusive = range.isStartKeyInclusive();
+    // If anything after the prefix is set, then clip the key so we include
+    // everything for the prefix.
+    if (isSetAfterPart(startKey, getKeyPrefix())) {
+      startKey = copyPartialKey(startKey, getKeyPrefix());
+      startKeyInclusive = true;
+    }
+    Key endKey = range.getEndKey();
+    boolean endKeyInclusive = range.isEndKeyInclusive();
+    if (isSetAfterPart(endKey, getKeyPrefix())) {
+      endKey = endKey.followingKey(getKeyPrefix());
+      endKeyInclusive = true;
+    }
+    return new Range(startKey, startKeyInclusive, endKey, endKeyInclusive);
+  }
+  
+  /**
+   * Indicates whether or not any part of {@code key} excluding
+   * {@code part} is set.  For example, if part is ROW_COLFAM_COLQUAL,
+   * then this method determines whether or not the column visibility,
+   * timestamp, or delete flag is set on {@code key}.
+   * 
+   * @param key the key to check
+   * @param part the part of the key that doesn't need to be checked 
(everything after does)
+   * @return {@code true} if anything after {@code part} is set on {@code 
key}, and {@code false} if not
+   */
+  protected boolean isSetAfterPart(Key key, PartialKey part) {
+    boolean isSet = false;
+    if (key != null) {        
+      // Breaks excluded on purpose.
+      switch (part) {
+        case ROW:
+          isSet = isSet || key.getColumnFamilyData().length() > 0;
+        case ROW_COLFAM:
+          isSet = isSet || key.getColumnQualifierData().length() > 0;
+        case ROW_COLFAM_COLQUAL:
+          isSet = isSet || key.getColumnVisibilityData().length() > 0;
+        case ROW_COLFAM_COLQUAL_COLVIS:
+          isSet = isSet || key.getTimestamp() < Long.MAX_VALUE;
+        case ROW_COLFAM_COLQUAL_COLVIS_TIME:
+          isSet = isSet || key.isDeleted();
+        case ROW_COLFAM_COLQUAL_COLVIS_TIME_DEL:
+          break;
+      }
+    }
+    return isSet;
+  }
+  
+  /**
+   * Creates a copy of {@code key}, copying only the parts of the key specified
+   * in {@code part}.  For example, if {@code part} is ROW_COLFAM_COLQUAL, then
+   * this method would copy the row, column family, and column qualifier from
+   * {@code key} into a new key.
+   * 
+   * @param key the key to copy
+   * @param part the parts of {@code key} to copy
+   * @return the new key containing {@code part} of {@code key}
+   */
+  protected Key copyPartialKey(Key key, PartialKey part) {
+    Key keyCopy;
+    switch (part) {
+      case ROW:
+        keyCopy = new Key(key.getRow());
+        break;
+      case ROW_COLFAM:
+        keyCopy = new Key(key.getRow(), key.getColumnFamily());
+        break;
+      case ROW_COLFAM_COLQUAL:
+        keyCopy = new Key(key.getRow(), key.getColumnFamily(), 
key.getColumnQualifier());
+        break;
+      case ROW_COLFAM_COLQUAL_COLVIS:
+        keyCopy = new Key(key.getRow(), key.getColumnFamily(), 
key.getColumnQualifier(), key.getColumnVisibility());
+        break;
+      case ROW_COLFAM_COLQUAL_COLVIS_TIME:
+        keyCopy = new Key(key.getRow(), key.getColumnFamily(), 
key.getColumnQualifier(),
+          key.getColumnVisibility(), key.getTimestamp());
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported key part: " + part);
+    }
+    return keyCopy;
+  }
+  
+  /**
+   * Make a new key with all parts (including delete flag) coming from {@code 
originalKey} but
+   * use {@code newColFam} as the column family.
+   */
+  protected Key replaceColumnFamily(Key originalKey, Text newColFam) {
+    byte[] row = originalKey.getRowData().toArray();
+    byte[] cf = newColFam.getBytes();
+    byte[] cq = originalKey.getColumnQualifierData().toArray();
+    byte[] cv = originalKey.getColumnVisibilityData().toArray();
+    long timestamp = originalKey.getTimestamp();
+    Key newKey = new Key(row, 0, row.length,
+        cf, 0, newColFam.getLength(), 
+        cq, 0, cq.length, 
+        cv, 0, cv.length,
+        timestamp);
+    newKey.setDeleted(originalKey.isDeleted());
+    return newKey;
+  }
+  
+  /**
+   * Make a new key with all parts (including delete flag) coming from {@code 
originalKey} but
+   * use {@code newColQual} as the column qualifier.
+   */
+  protected Key replaceColumnQualifier(Key originalKey, Text newColQual) {
+    byte[] row = originalKey.getRowData().toArray();
+    byte[] cf = originalKey.getColumnFamilyData().toArray();
+    byte[] cq = newColQual.getBytes();
+    byte[] cv = originalKey.getColumnVisibilityData().toArray();
+    long timestamp = originalKey.getTimestamp();
+    Key newKey = new Key(row, 0, row.length,
+        cf, 0, cf.length, 
+        cq, 0, newColQual.getLength(), 
+        cv, 0, cv.length,
+        timestamp);
+    newKey.setDeleted(originalKey.isDeleted());
+    return newKey;
+  }
+  
+  /**
+   * Make a new key with all parts (including delete flag) coming from {@code 
originalKey} but
+   * use {@code newColVis} as the column visibility.
+   */
+  protected Key replaceColumnVisibility(Key originalKey, Text newColVis) {
+    byte[] row = originalKey.getRowData().toArray();
+    byte[] cf = originalKey.getColumnFamilyData().toArray();
+    byte[] cq = originalKey.getColumnQualifierData().toArray();
+    byte[] cv = newColVis.getBytes();
+    long timestamp = originalKey.getTimestamp();
+    Key newKey = new Key(row, 0, row.length,
+        cf, 0, cf.length, 
+        cq, 0, cq.length, 
+        cv, 0, newColVis.getLength(),
+        timestamp);
+    newKey.setDeleted(originalKey.isDeleted());
+    return newKey;
+  }
+  
+  /**
+   * Make a new key with a column family, column qualifier, and column 
visibility.  
+   * Copy the rest of the parts of the key (including delete flag) from {@code 
originalKey}.
+   */
+  protected Key replaceKeyParts(Key originalKey, Text newColFam, Text 
newColQual, Text newColVis) {
+    byte[] row = originalKey.getRowData().toArray();
+    byte[] cf = newColFam.getBytes();
+    byte[] cq = newColQual.getBytes();
+    byte[] cv = newColVis.getBytes();
+    long timestamp = originalKey.getTimestamp();
+    Key newKey = new Key(row, 0, row.length,
+        cf, 0, newColFam.getLength(), 
+        cq, 0, newColQual.getLength(), 
+        cv, 0, newColVis.getLength(),
+        timestamp);
+    newKey.setDeleted(originalKey.isDeleted());
+    return newKey;
+  }
+  
+  /**
+   * Make a new key with a column qualifier, and column visibility.  Copy the 
rest
+   * of the parts of the key (including delete flag) from {@code originalKey}.
+   */
+  protected Key replaceKeyParts(Key originalKey, Text newColQual, Text 
newColVis) {
+    byte[] row = originalKey.getRowData().toArray();
+    byte[] cf = originalKey.getColumnFamilyData().toArray();
+    byte[] cq = newColQual.getBytes();
+    byte[] cv = newColVis.getBytes();
+    long timestamp = originalKey.getTimestamp();
+    Key newKey = new Key(row, 0, row.length,
+        cf, 0, cf.length, 
+        cq, 0, newColQual.getLength(), 
+        cv, 0, newColVis.getLength(),
+        timestamp);
+    newKey.setDeleted(originalKey.isDeleted());
+    return newKey;
+  }
+  
+  /**
+   * Reverses the transformation applied to column families that are fetched 
at seek
+   * time.  If this iterator is transforming column families, then this method
+   * should be overridden to reverse the transformation on the supplied 
collection
+   * of column families.  This is necessary since the fetch/seek will be 
performed
+   * in the transformed space, but when passing the column family set on to 
the source,
+   * the column families need to be in the untransformed space. 
+   * 
+   * @param columnFamilies the column families that have been fetched at seek 
time
+   * @return the untransformed column families that would transform info 
{@code columnFamilies}
+   */
+  protected Collection<ByteSequence> 
untransformColumnFamilies(Collection<ByteSequence> columnFamilies) {
+    return columnFamilies;
+  }
+  
+  /**
+   * Indicates the prefix of keys that will be transformed by this iterator.
+   * In other words, this is the part of the key that will <i>not</i> be
+   * transformed by this iterator.  For example, if this method returns
+   * ROW_COLFAM, then {@link #transformKey(Key)} may be changing the
+   * column qualifier, column visibility, or timestamp, but it won't be
+   * changing the row or column family.
+   *  
+   * @return the part of the key this iterator is not transforming
+   */
+  abstract protected PartialKey getKeyPrefix();
+  
+  /**
+   * Transforms {@code originalKey}.  This method must not change the row
+   * part of the key, and must only change the parts of the key after the
+   * return value of {@link #getKeyPrefix()}. Implementors must also remember
+   * to copy the delete flag from {@code originalKey} onto the new key.  Or,
+   * implementors should use one of the helper methods to produce the new
+   * key.  See any of the replaceKeyParts methods.
+   * 
+   * @param originalKey the key to be transformed
+   * @return the modified key
+   * @see #replaceColumnFamily(Key, Text)
+   * @see #replaceColumnQualifier(Key, Text)
+   * @see #replaceColumnVisibility(Key, Text)
+   * @see #replaceKeyParts(Key, Text, Text)
+   * @see #replaceKeyParts(Key, Text, Text, Text)
+   */
+  abstract protected Key transformKey(Key originalKey);
+}

Added: 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java?rev=1434762&view=auto
==============================================================================
--- 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java
 (added)
+++ 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java
 Thu Jan 17 16:28:48 2013
@@ -0,0 +1,539 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KeyTransformingIteratorTest {
+  private static final String TABLE_NAME = "test_table";
+  private static Authorizations authorizations = new Authorizations("vis0", 
"vis1", "vis2", "vis3", "vis4");
+  private Connector connector;
+  private Scanner scanner;
+  
+  @Before
+  public void setUpMockAccumulo() throws Exception {
+    MockInstance instance = new MockInstance("test");
+    connector = instance.getConnector("user", "password");
+    connector.securityOperations().changeUserAuthorizations("user", 
authorizations);
+    
+    if (connector.tableOperations().exists(TABLE_NAME))
+      connector.tableOperations().delete(TABLE_NAME);
+    connector.tableOperations().create(TABLE_NAME);
+    BatchWriterConfig bwCfg = new BatchWriterConfig();
+    bwCfg.setMaxWriteThreads(1);
+    
+    BatchWriter bw = connector.createBatchWriter(TABLE_NAME, bwCfg);
+    bw.addMutation(createDefaultMutation("row1"));
+    bw.addMutation(createDefaultMutation("row2"));
+    bw.addMutation(createDefaultMutation("row3"));
+    
+    bw.flush();
+    bw.close();
+    
+    scanner = connector.createScanner(TABLE_NAME, authorizations);
+    scanner.addScanIterator(new IteratorSetting(20, ReuseIterator.class));
+  }
+  
+  private void setUpTransformIterator(Class<? extends KeyTransformingIterator> 
clazz) {
+    IteratorSetting cfg = new IteratorSetting(21, clazz);
+    cfg.setName("keyTransformIter");
+    cfg.addOption(KeyTransformingIterator.AUTH_OPT, "vis0, vis1, vis2, vis3");
+    scanner.addScanIterator(cfg);
+  }
+  
+  @Test
+  public void testIdentityScan() throws Exception {
+    setUpTransformIterator(IdentityKeyTransformingIterator.class); 
+    
+    // This is just an identity scan, but with the "reuse" iterator that reuses
+    // the same key/value pair for every getTopKey/getTopValue call.  The code
+    // will always return the final key/value if we didn't copy the original 
key
+    // in the iterator.
+    TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+    for (int row = 1; row <= 3; ++row) {
+      for (int cf = 1; cf <= 3; ++cf) {
+        for (int cq = 1; cq <= 3; ++cq) {
+          for (int cv = 1; cv <= 3; ++cv) {
+            putExpected(expected, row, cf, cq, cv, null);
+          }
+        }
+      }
+    }
+    
+    checkExpected(expected);
+  }
+  
+  @Test
+  public void testNoRangeScan() throws Exception {
+    @SuppressWarnings("unchecked")
+    List<Class<? extends ReversingKeyTransformingIterator>> classes = 
Arrays.asList(ColFamReversingKeyTransformingIterator.class, 
ColQualReversingKeyTransformingIterator.class, 
ColVisReversingKeyTransformingIterator.class);
+    
+    // Test transforming col fam, col qual, col vis
+    for (Class<? extends ReversingKeyTransformingIterator> clazz : classes) {
+      scanner.removeScanIterator("keyTransformIter");
+      setUpTransformIterator(clazz);
+      
+      // All rows with visibilities reversed
+      KeyTransformingIterator iter = clazz.newInstance();
+      TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+      for (int row = 1; row <= 3; ++row) {
+        for (int cf = 1; cf <= 3; ++cf) {
+          for (int cq = 1; cq <= 3; ++cq) {
+            for (int cv = 1; cv <= 3; ++cv) {
+              putExpected(expected, row, cf, cq, cv, iter.getKeyPrefix());
+            }
+          }
+        }
+      }
+      
+      checkExpected(expected);    
+    }
+  }
+  
+  @Test
+  public void testVisbilityFiltering() throws Exception {
+    // Should return nothing since we produced visibilities that can't be seen
+    setUpTransformIterator(BadVisKeyTransformingIterator.class);     
+    checkExpected(new TreeMap<Key,Value>());
+    
+    // Do a "reverse" on the visibility (vis1 -> vis2, vis2 -> vis3, vis3 -> 
vis0)
+    // Source data has vis1, vis2, vis3 so vis0 is a new one that is 
introduced.
+    // Make sure it shows up in the output with the default test auths which 
include
+    // vis0.
+    scanner.removeScanIterator("keyTransformIter");
+    setUpTransformIterator(ColVisReversingKeyTransformingIterator.class);
+    TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+    for (int row = 1; row <= 3; ++row) {
+      for (int cf = 1; cf <= 3; ++cf) {
+        for (int cq = 1; cq <= 3; ++cq) {
+          for (int cv = 1; cv <= 3; ++cv) {
+            putExpected(expected, row, cf, cq, cv, 
PartialKey.ROW_COLFAM_COLQUAL);
+          }
+        }
+      }
+    }    
+    checkExpected(expected);
+  }
+  
+  @Test
+  public void testRangeStart() throws Exception {
+    setUpTransformIterator(ColVisReversingKeyTransformingIterator.class); 
+    scanner.setRange(new Range(new Key("row1", "cf2", "cq2", "vis1"), true, 
new Key("row1", "cf2", "cq3"), false));
+    
+    TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+    putExpected(expected, 1, 2, 2, 1, PartialKey.ROW_COLFAM_COLQUAL); // 
before the range start, but transforms in the range
+    putExpected(expected, 1, 2, 2, 2, PartialKey.ROW_COLFAM_COLQUAL);
+    
+    checkExpected(expected);
+  }
+  
+  @Test
+  public void testRangeEnd() throws Exception {
+    setUpTransformIterator(ColVisReversingKeyTransformingIterator.class); 
+    scanner.setRange(new Range(new Key("row1", "cf2", "cq2"), true, new 
Key("row1", "cf2", "cq2", "vis2"), false));
+    
+    TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+    //putExpected(expected, 1, 2, 2, 1, part); // transforms vis outside range 
end
+    putExpected(expected, 1, 2, 2, 2, PartialKey.ROW_COLFAM_COLQUAL);
+    putExpected(expected, 1, 2, 2, 3, PartialKey.ROW_COLFAM_COLQUAL);
+    
+    checkExpected(expected);
+  }
+  
+  @Test
+  public void testPrefixRange() throws Exception {
+    setUpTransformIterator(ColFamReversingKeyTransformingIterator.class);
+    // Set a range that is before all of the untransformed data.  However,
+    // the data with untransformed col fam cf3 will transform to cf0 and
+    // be inside the range.
+    scanner.setRange(new Range(new Key("row1", "cf0"), true, new Key("row1", 
"cf1"), false));
+    
+    TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+    for (int cq = 1; cq <= 3; ++cq)
+      for (int cv = 1; cv <= 3; ++cv)
+        putExpected(expected, 1, 3, cq, cv, PartialKey.ROW);
+    checkExpected(expected);
+  }
+  
+  @Test
+  public void testPostfixRange() throws Exception {
+    // Set a range that's after all data and make sure we don't
+    // somehow return something.
+    setUpTransformIterator(ColFamReversingKeyTransformingIterator.class);
+    scanner.setRange(new Range(new Key("row4"), null));
+    checkExpected(new TreeMap<Key,Value>());
+  }
+  
+  @Test
+  public void testReplaceKeyParts() throws Exception {
+    KeyTransformingIterator it = new IdentityKeyTransformingIterator();
+    Key originalKey = new Key("r", "cf", "cq", "cv", 42);
+    originalKey.setDeleted(true);
+    
+    Key newKey = it.replaceColumnFamily(originalKey, new Text("test"));
+    assertEquals(createDeleteKey("r","test","cq","cv",42), newKey);
+    
+    newKey = it.replaceColumnQualifier(originalKey, new Text("test"));
+    assertEquals(createDeleteKey("r","cf","test","cv",42), newKey);
+    
+    newKey = it.replaceColumnVisibility(originalKey, new Text("test"));
+    assertEquals(createDeleteKey("r","cf","cq","test",42), newKey);
+    
+    newKey = it.replaceKeyParts(originalKey, new Text("testCQ"), new 
Text("testCV"));
+    assertEquals(createDeleteKey("r","cf","testCQ","testCV",42), newKey);
+    
+    newKey = it.replaceKeyParts(originalKey, new Text("testCF"), new 
Text("testCQ"), new Text("testCV"));
+    assertEquals(createDeleteKey("r","testCF","testCQ","testCV",42), newKey);
+  }
+  
+  @Test
+  public void testFetchColumnFamilites() throws Exception {
+    // In this test, we are fetching column family cf2, which is in
+    // the transformed space.  The source column family that will
+    // transform into cf2 is cf1, so that is the column family we
+    // put in the expectations.
+    int expectedCF = 1;
+    setUpTransformIterator(ColFamReversingKeyTransformingIterator.class);
+    scanner.fetchColumnFamily(new Text("cf2"));
+    
+    TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+    for (int row = 1; row <= 3; ++row)
+      for (int cq = 1; cq <= 3; ++cq)
+        for (int cv = 1; cv <= 3; ++cv)
+          putExpected(expected, row, expectedCF, cq, cv, PartialKey.ROW);
+    checkExpected(expected);
+  }
+  
+  @Test
+  public void testDeepCopy() throws Exception {
+    
+  }
+  
+  @Test
+  public void testCompactionScanFetchingColumnFamilies() throws Exception {
+    // In this test, we are fetching column family cf2, which is in
+    // the transformed space.  The source column family that will
+    // transform into cf2 is cf1, so that is the column family we
+    // put in the expectations.
+    int expectedCF = 1;
+    
setUpTransformIterator(ColFamReversingCompactionKeyTransformingIterator.class);
+    scanner.fetchColumnFamily(new Text("cf2"));
+    
+    TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+    for (int row = 1; row <= 3; ++row)
+      for (int cq = 1; cq <= 3; ++cq)
+        for (int cv = 1; cv <= 3; ++cv)
+          putExpected(expected, row, expectedCF, cq, cv, PartialKey.ROW);
+    checkExpected(expected);
+  }  
+  
+  @Test
+  public void testCompactionDoesntFilterVisibilities() throws Exception {
+    // In scan mode, this should return nothing since it produces visibilites
+    // the user can't see.  In compaction mode, however, the visibilites
+    // should still show up.
+    setUpTransformIterator(BadVisCompactionKeyTransformingIterator.class);     
+
+    TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+    for (int rowID = 1; rowID <= 3; ++rowID) {
+      for (int cfID = 1; cfID <= 3; ++cfID) {
+        for (int cqID = 1; cqID <= 3; ++cqID) {
+          for (int cvID = 1; cvID <= 3; ++cvID) {
+            String row = "row" + rowID;
+            String cf = "cf" + cfID;
+            String cq = "cq" + cqID;
+            String cv = "badvis";
+            long ts = 100*cfID + 10*cqID + cvID;
+            String val = "val" + ts;
+            expected.put(new Key(row, cf, cq, cv, ts), new 
Value(val.getBytes()));
+          }
+        }
+      }
+    }
+    
+    checkExpected(expected);    
+  }
+  
+  private Key createDeleteKey(String row, String colFam, String colQual, 
String colVis, long timestamp) {
+    Key key = new Key(row, colFam, colQual, colVis, timestamp);
+    key.setDeleted(true);
+    return key;
+  }
+  
+  private void checkExpected(TreeMap<Key,Value> expectedEntries) {
+    for (Entry<Key,Value> entry : scanner) {
+      Entry<Key,Value> expected = expectedEntries.pollFirstEntry();
+      Key actualKey = entry.getKey();
+      Value actualValue = entry.getValue();
+      
+      assertNotNull("Ran out of expected entries on: " + entry, expected);
+      assertEquals("Key mismatch", expected.getKey(), actualKey);
+      assertEquals("Value mismatch", expected.getValue(), actualValue);
+    }
+    
+    assertTrue("Scanner did not return all expected entries: " + 
expectedEntries, expectedEntries.isEmpty());
+  }
+  
+  private static void putExpected(SortedMap<Key,Value> expected, int rowID, 
int cfID, int cqID, int cvID, PartialKey part) {
+    String row = "row" + rowID;
+    String cf = "cf" + cfID;
+    String cq = "cq" + cqID;
+    String cv = "vis" + cvID;
+    long ts = 100*cfID + 10*cqID + cvID;
+    String val = "val" + ts;
+    
+    if (part != null) {
+      switch (part) {
+        case ROW:
+          cf = transform(new Text(cf)).toString(); break;
+        case ROW_COLFAM:
+          cq = transform(new Text(cq)).toString(); break;
+        case ROW_COLFAM_COLQUAL:
+          cv = transform(new Text(cv)).toString(); break;
+        default:
+          break;
+      }
+    }
+    
+    expected.put(new Key(row, cf, cq, cv, ts), new Value(val.getBytes()));
+  }
+  
+  private static Text transform(Text val) {
+    String s = val.toString();
+    // Reverse the order of the number at the end, and subtract one
+    int i = 3 - Integer.parseInt(s.substring(s.length()-1));
+    StringBuilder sb = new StringBuilder();
+    sb.append(s.substring(0, s.length() - 1));
+    sb.append(i);
+    return new Text(sb.toString());
+  }
+  
+  private static Mutation createDefaultMutation(String row) {
+    Mutation m = new Mutation(row);
+    for (int cfID = 1; cfID <= 3; ++cfID) {
+      for (int cqID = 1; cqID <= 3; ++cqID) {
+        for (int cvID = 1; cvID <= 3; ++cvID) {
+          String cf = "cf" + cfID;
+          String cq = "cq" + cqID;
+          String cv = "vis" + cvID;
+          long ts = 100*cfID + 10*cqID + cvID;
+          String val = "val" + ts;
+          
+          m.put(cf, cq, new ColumnVisibility(cv), ts, val);
+        }
+      }
+    }
+    return m;
+  }
+  
+  private static Key reverseKeyPart(Key originalKey, PartialKey part) {
+    Text row = originalKey.getRow();
+    Text cf = originalKey.getColumnFamily();
+    Text cq = originalKey.getColumnQualifier();
+    Text cv = originalKey.getColumnVisibility();
+    long ts = originalKey.getTimestamp();
+    switch (part) {
+      case ROW:
+        cf = transform(cf); break;
+      case ROW_COLFAM:
+        cq = transform(cq); break;
+      case ROW_COLFAM_COLQUAL:
+        cv = transform(cv); break;
+      default:
+        break;
+    }
+    return new Key(row, cf, cq, cv, ts);
+  }
+  
+  public static class IdentityKeyTransformingIterator extends 
KeyTransformingIterator {
+    @Override
+    protected PartialKey getKeyPrefix() {
+      return PartialKey.ROW;
+    }
+    
+    @Override
+    protected Key transformKey(Key originalKey) {
+      return originalKey;
+    };
+  }
+  
+  public static abstract class ReversingKeyTransformingIterator extends 
KeyTransformingIterator {
+    @Override
+    protected Key transformKey(Key originalKey) {
+      return reverseKeyPart(originalKey, getKeyPrefix());
+    };
+  }
+  
+  public static class ColFamReversingKeyTransformingIterator extends 
ReversingKeyTransformingIterator {
+    @Override
+    protected PartialKey getKeyPrefix() {
+      return PartialKey.ROW;
+    }
+    
+    @Override
+    protected Collection<ByteSequence> 
untransformColumnFamilies(Collection<ByteSequence> columnFamilies) {
+      HashSet<ByteSequence> untransformed = new HashSet<ByteSequence>();
+      for (ByteSequence cf : columnFamilies) 
+        untransformed.add(untransformColumnFamily(cf));
+      return untransformed;
+    }
+    
+    protected ByteSequence untransformColumnFamily(ByteSequence colFam) {
+      Text transformed = transform(new Text(colFam.toArray()));
+      byte[] bytes = transformed.getBytes();
+      return new ArrayByteSequence(bytes, 0, transformed.getLength());
+    }
+  }
+  
+  public static class ColFamReversingCompactionKeyTransformingIterator extends 
ColFamReversingKeyTransformingIterator {
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options, IteratorEnvironment env) throws IOException {
+      env = new MajCIteratorEnvironmentAdapter(env);
+      super.init(source, options, env);
+    }
+  }
+  
+  public static class ColQualReversingKeyTransformingIterator extends 
ReversingKeyTransformingIterator {
+    @Override
+    protected PartialKey getKeyPrefix() {
+      return PartialKey.ROW_COLFAM;
+    }
+  }
+  
+  public static class ColVisReversingKeyTransformingIterator extends 
ReversingKeyTransformingIterator {
+    @Override
+    protected PartialKey getKeyPrefix() {
+      return PartialKey.ROW_COLFAM_COLQUAL;
+    }
+  }
+  
+  public static class BadVisKeyTransformingIterator extends 
KeyTransformingIterator {
+    @Override
+    protected PartialKey getKeyPrefix() {
+      return PartialKey.ROW_COLFAM_COLQUAL;
+    }
+    
+    @Override
+    protected Key transformKey(Key originalKey) {
+      return new Key(originalKey.getRow(), originalKey.getColumnFamily(), 
originalKey.getColumnQualifier(), new Text("badvis"), 
originalKey.getTimestamp());
+    }
+  }
+  
+  public static class BadVisCompactionKeyTransformingIterator extends 
BadVisKeyTransformingIterator {
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options, IteratorEnvironment env) throws IOException {
+      env = new MajCIteratorEnvironmentAdapter(env);
+      super.init(source, options, env);
+    }
+  }
+  
+  public static class ReuseIterator extends WrappingIterator {
+    private Key topKey = new Key();
+    private Value topValue = new Value();
+    
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive) throws IOException {
+      super.seek(range, columnFamilies, inclusive);
+      loadTop();
+    }
+    
+    @Override
+    public void next() throws IOException {
+      super.next();
+      loadTop();
+    }
+    
+    @Override
+    public Key getTopKey() {
+      return topKey;
+    }
+    
+    @Override
+    public Value getTopValue() {
+      return topValue;
+    }
+    
+    private void loadTop() {
+      if (hasTop()) {
+        topKey.set(super.getTopKey());
+        topValue.set(super.getTopValue().get());
+      }
+    }
+  }
+  
+  private static class MajCIteratorEnvironmentAdapter implements 
IteratorEnvironment {
+    private IteratorEnvironment delegate;
+    
+    public MajCIteratorEnvironmentAdapter(IteratorEnvironment delegate) {
+      this.delegate = delegate;
+    }
+
+    public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String 
mapFileName) throws IOException {
+      return delegate.reserveMapFileReader(mapFileName);
+    }
+
+    public AccumuloConfiguration getConfig() {
+      return delegate.getConfig();
+    }
+
+    public IteratorScope getIteratorScope() {
+      return IteratorScope.majc;
+    }
+
+    public boolean isFullMajorCompaction() {
+      return delegate.isFullMajorCompaction();
+    }
+
+    public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
+      delegate.registerSideChannel(iter);
+    }
+  }
+}


Reply via email to