Author: stack Date: Wed Apr 16 20:21:11 2008 New Revision: 648940 URL: http://svn.apache.org/viewvc?rev=648940&view=rev Log: HBASE-532 Odd interaction between HRegion.get, HRegion.deleteAll and compactions Backport to branch.
Modified: hadoop/hbase/branches/0.1/CHANGES.txt hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HAbstractScanner.java hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HStore.java hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/TestHMemcache.java Modified: hadoop/hbase/branches/0.1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.1/CHANGES.txt?rev=648940&r1=648939&r2=648940&view=diff ============================================================================== --- hadoop/hbase/branches/0.1/CHANGES.txt (original) +++ hadoop/hbase/branches/0.1/CHANGES.txt Wed Apr 16 20:21:11 2008 @@ -14,6 +14,7 @@ HBASE-500 Regionserver stuck on exit HBASE-582 HBase 554 forgot to clear results on each iteration caused by a filter (Clint Morgan via Stack) + HBASE-532 Odd interaction between HRegion.get, HRegion.deleteAll and compactions IMPROVEMENTS HBASE-559 MR example job to count table rows Modified: hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HAbstractScanner.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HAbstractScanner.java?rev=648940&r1=648939&r2=648940&view=diff ============================================================================== --- hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HAbstractScanner.java (original) +++ hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HAbstractScanner.java Wed Apr 16 20:21:11 2008 @@ -33,7 +33,6 @@ /** * Abstract base class that implements the HScannerInterface. - * Used by the concrete HMemcacheScanner and HStoreScanners */ public abstract class HAbstractScanner implements HInternalScannerInterface { final Log LOG = LogFactory.getLog(this.getClass().getName()); @@ -111,11 +110,6 @@ protected boolean scannerClosed = false; // True when scanning is done - // Keys retrieved from the sources - protected HStoreKey keys[]; - // Values that correspond to those keys - protected byte [][] vals; - protected long timestamp; // The timestamp to match entries against private boolean wildcardMatch; private boolean multipleMatchers; @@ -145,18 +139,16 @@ } /** - * For a particular column i, find all the matchers defined for the column. + * For a particular column, find all the matchers defined for the column. * Compare the column family and column key using the matchers. The first one * that matches returns true. If no matchers are successful, return false. * - * @param i index into the keys array - * @return true - if any of the matchers for the column match the column family - * and the column key. - * + * @param column Column to test + * @return true if any of the matchers for the column match the column family + * and the column key. * @throws IOException */ - boolean columnMatch(int i) throws IOException { - Text column = keys[i].getColumn(); + protected boolean columnMatch(final Text column) throws IOException { Vector<ColumnMatcher> matchers = okCols.get(HStoreKey.extractFamily(column)); if(matchers == null) { @@ -170,18 +162,6 @@ return false; } - /** - * If the user didn't want to start scanning at the first row, this method - * seeks to the requested row. - */ - abstract boolean findFirstRow(int i, Text firstRow) throws IOException; - - /** The concrete implementations provide a mechanism to find the next set of values */ - abstract boolean getNext(int i) throws IOException; - - /** Mechanism used by concrete implementation to shut down a particular scanner */ - abstract void closeSubScanner(int i); - /** [EMAIL PROTECTED] */ public boolean isWildcardScanner() { return this.wildcardMatch; @@ -192,89 +172,11 @@ return this.multipleMatchers; } - /** - * Get the next set of values for this scanner. - * - * @param key The key that matched - * @param results All the results for <code>key</code> - * @return true if a match was found - * @throws IOException - * - * @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.SortedMap) - */ - public boolean next(HStoreKey key, SortedMap<Text, byte []> results) - throws IOException { - if (scannerClosed) { - return false; - } - // Find the next row label (and timestamp) - Text chosenRow = null; - long chosenTimestamp = -1; - for(int i = 0; i < keys.length; i++) { - if((keys[i] != null) - && (columnMatch(i)) - && (keys[i].getTimestamp() <= this.timestamp) - && ((chosenRow == null) - || (keys[i].getRow().compareTo(chosenRow) < 0) - || ((keys[i].getRow().compareTo(chosenRow) == 0) - && (keys[i].getTimestamp() > chosenTimestamp)))) { - chosenRow = new Text(keys[i].getRow()); - chosenTimestamp = keys[i].getTimestamp(); - } - } + public abstract boolean next(HStoreKey key, SortedMap<Text, byte []> results) + throws IOException; - // Grab all the values that match this row/timestamp - boolean insertedItem = false; - if(chosenRow != null) { - key.setRow(chosenRow); - key.setVersion(chosenTimestamp); - key.setColumn(new Text("")); - - for(int i = 0; i < keys.length; i++) { - // Fetch the data - while((keys[i] != null) - && (keys[i].getRow().compareTo(chosenRow) == 0)) { - - // If we are doing a wild card match or there are multiple matchers - // per column, we need to scan all the older versions of this row - // to pick up the rest of the family members - - if(!wildcardMatch - && !multipleMatchers - && (keys[i].getTimestamp() != chosenTimestamp)) { - break; - } - - if(columnMatch(i)) { - // We only want the first result for any specific family member - if(!results.containsKey(keys[i].getColumn())) { - results.put(new Text(keys[i].getColumn()), vals[i]); - insertedItem = true; - } - } - - if(!getNext(i)) { - closeSubScanner(i); - } - } - - // Advance the current scanner beyond the chosen row, to - // a valid timestamp, so we're ready next time. - - while((keys[i] != null) - && ((keys[i].getRow().compareTo(chosenRow) <= 0) - || (keys[i].getTimestamp() > this.timestamp) - || (! columnMatch(i)))) { - getNext(i); - } - } - } - return insertedItem; - } - - /** [EMAIL PROTECTED] */ public Iterator<Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator() { throw new UnsupportedOperationException("Unimplemented serverside. " + "next(HStoreKey, StortedMap(...) is more efficient"); } -} \ No newline at end of file +} Modified: hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HStore.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HStore.java?rev=648940&r1=648939&r2=648940&view=diff ============================================================================== --- hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HStore.java (original) +++ hadoop/hbase/branches/0.1/src/java/org/apache/hadoop/hbase/HStore.java Wed Apr 16 20:21:11 2008 @@ -21,12 +21,15 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.rmi.UnexpectedException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.Map.Entry; @@ -39,8 +42,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -69,77 +72,105 @@ static final Log LOG = LogFactory.getLog(HStore.class); /** - * The Memcache holds in-memory modifications to the HRegion. This is really a - * wrapper around a TreeMap that helps us when staging the Memcache out to disk. + * The Memcache holds in-memory modifications to the HRegion. + * Keeps a current map. When asked to flush the map, current map is moved + * to snapshot and is cleared. We continue to serve edits out of new map + * and backing snapshot until flusher reports in that the flush succeeded. At + * this point we let the snapshot go. */ static class Memcache { - // Note that since these structures are always accessed with a lock held, - // no additional synchronization is required. + // so no additional synchronization is required. - @SuppressWarnings("hiding") - private final SortedMap<HStoreKey, byte[]> memcache = - Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>()); - - volatile SortedMap<HStoreKey, byte[]> snapshot; - - @SuppressWarnings("hiding") - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + // The currently active sorted map of edits. + private volatile SortedMap<HStoreKey, byte[]> mc = + createSynchronizedSortedMap(); + + // Snapshot of memcache. Made for flusher. + private volatile SortedMap<HStoreKey, byte[]> snapshot = + createSynchronizedSortedMap(); - /** - * Constructor + private final ReentrantReadWriteLock mc_lock = new ReentrantReadWriteLock(); + + /* + * Utility method. + * @return sycnhronized sorted map of HStoreKey to byte arrays. */ - Memcache() { - snapshot = - Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>()); + private static SortedMap<HStoreKey, byte[]> createSynchronizedSortedMap() { + return Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>()); } /** * Creates a snapshot of the current Memcache */ - void snapshot() { - this.lock.writeLock().lock(); + SortedMap<HStoreKey, byte[]> snapshot() { + this.mc_lock.writeLock().lock(); try { - synchronized (memcache) { - if (memcache.size() != 0) { - snapshot.putAll(memcache); - memcache.clear(); - } + // If snapshot has entries, then flusher failed or didn't call cleanup. + if (this.snapshot.size() > 0) { + LOG.debug("Returning existing snapshot. Either the snapshot was run " + + "by the region -- normal operation but to be fixed -- or there is " + + "another ongoing flush or did we fail last attempt?"); + return this.snapshot; + } + // We used to synchronize on the memcache here but we're inside a + // write lock so removed it. Comment is left in case removal was a + // mistake. St.Ack + if (this.mc.size() != 0) { + this.snapshot = this.mc; + this.mc = createSynchronizedSortedMap(); } + return this.snapshot; } finally { - this.lock.writeLock().unlock(); + this.mc_lock.writeLock().unlock(); } } - - /** - * @return memcache snapshot - */ - SortedMap<HStoreKey, byte[]> getSnapshot() { - this.lock.writeLock().lock(); - try { - SortedMap<HStoreKey, byte[]> currentSnapshot = snapshot; - snapshot = - Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>()); - - return currentSnapshot; - } finally { - this.lock.writeLock().unlock(); - } - } + /** + * Return the current snapshot. + * @return Return snapshot. + * @see [EMAIL PROTECTED] #snapshot()} + * @see [EMAIL PROTECTED] #clearSnapshot(SortedMap)} + */ + SortedMap<HStoreKey, byte[]> getSnapshot() { + return this.snapshot; + } + + /** + * The passed snapshot was successfully persisted; it can be let go. + * @param ss The snapshot to clean out. + * @throws UnexpectedException + * @see [EMAIL PROTECTED] #snapshot()} + */ + void clearSnapshot(final SortedMap<HStoreKey, byte []> ss) + throws UnexpectedException { + this.mc_lock.writeLock().lock(); + try { + if (this.snapshot != ss) { + throw new UnexpectedException("Current snapshot is " + + this.snapshot + ", was passed " + ss); + } + // OK. Passed in snapshot is same as current snapshot. If not-empty, + // create a new snapshot and let the old one go. + if (ss.size() != 0) { + this.snapshot = createSynchronizedSortedMap(); + } + } finally { + this.mc_lock.writeLock().unlock(); + } + } /** - * Store a value. + * Write an update * @param key * @param value */ void add(final HStoreKey key, final byte[] value) { - this.lock.readLock().lock(); + this.mc_lock.readLock().lock(); try { - memcache.put(key, value); - + mc.put(key, value); } finally { - this.lock.readLock().unlock(); + this.mc_lock.readLock().unlock(); } } @@ -150,22 +181,83 @@ * @return An array of byte arrays ordered by timestamp. */ List<byte[]> get(final HStoreKey key, final int numVersions) { - this.lock.readLock().lock(); + this.mc_lock.readLock().lock(); try { List<byte []> results; - synchronized (memcache) { - results = internalGet(memcache, key, numVersions); + // The synchronizations here are because internalGet iterates + synchronized (this.mc) { + results = internalGet(this.mc, key, numVersions); } synchronized (snapshot) { results.addAll(results.size(), internalGet(snapshot, key, numVersions - results.size())); } return results; - } finally { - this.lock.readLock().unlock(); + this.mc_lock.readLock().unlock(); } } + + + /** + * @param a + * @param b + * @return Return lowest of a or b or null if both a and b are null + */ + @SuppressWarnings("unchecked") + private WritableComparable getLowest(final WritableComparable a, + final WritableComparable b) { + if (a == null) { + return b; + } + if (b == null) { + return a; + } + return a.compareTo(b) <= 0? a: b; + } + + /** + * @param row Find the row that comes after this one. + * @return Next row or null if none found + */ + Text getNextRow(final Text row) { + this.mc_lock.readLock().lock(); + try { + return (Text)getLowest(getNextRow(row, this.mc), + getNextRow(row, this.snapshot)); + } finally { + this.mc_lock.readLock().unlock(); + } + } + + /* + * @param row Find row that follows this one. + * @param map Map to look in for a row beyond <code>row</code>. + * This method synchronizes on passed map while iterating it. + * @return Next row or null if none found. + */ + private Text getNextRow(final Text row, + final SortedMap<HStoreKey, byte []> map) { + Text result = null; + // Synchronize on the map to make the tailMap making 'safe'. + synchronized (map) { + // Make an HSK with maximum timestamp so we get past most of the current + // rows cell entries. + HStoreKey hsk = new HStoreKey(row, HConstants.LATEST_TIMESTAMP); + SortedMap<HStoreKey, byte []> tailMap = map.tailMap(hsk); + // Iterate until we fall into the next row; i.e. move off current row + for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) { + HStoreKey itKey = es.getKey(); + if (itKey.getRow().compareTo(row) <= 0) { + continue; + } + // Note: Not suppressing deletes. + result = itKey.getRow(); + break; + } + } + return result; + } /** * Return all the available columns for the given key. The key indicates a @@ -178,17 +270,17 @@ void getFull(HStoreKey key, Map<Text, Long> deletes, SortedMap<Text, byte[]> results) { - this.lock.readLock().lock(); + this.mc_lock.readLock().lock(); try { - synchronized (memcache) { - internalGetFull(memcache, key, deletes, results); + synchronized (mc) { + internalGetFull(mc, key, deletes, results); } synchronized (snapshot) { internalGetFull(snapshot, key, deletes, results); } } finally { - this.lock.readLock().unlock(); + this.mc_lock.readLock().unlock(); } } @@ -224,20 +316,23 @@ /** * Find the key that matches <i>row</i> exactly, or the one that immediately * preceeds it. + * @param row Row to look for. + * @param candidateKeys Map of candidate keys (Accumulation over lots of + * lookup over stores and memcaches) */ void getRowKeyAtOrBefore(final Text row, SortedMap<HStoreKey, Long> candidateKeys) { - this.lock.readLock().lock(); + this.mc_lock.readLock().lock(); try { - synchronized (memcache) { - internalGetRowKeyAtOrBefore(memcache, row, candidateKeys); + synchronized (mc) { + internalGetRowKeyAtOrBefore(mc, row, candidateKeys); } synchronized (snapshot) { internalGetRowKeyAtOrBefore(snapshot, row, candidateKeys); } } finally { - this.lock.readLock().unlock(); + this.mc_lock.readLock().unlock(); } } @@ -396,11 +491,11 @@ * @throws IOException */ List<HStoreKey> getKeys(final HStoreKey origin, final int versions) { - this.lock.readLock().lock(); + this.mc_lock.readLock().lock(); try { List<HStoreKey> results; - synchronized (memcache) { - results = internalGetKeys(this.memcache, origin, versions); + synchronized (mc) { + results = internalGetKeys(this.mc, origin, versions); } synchronized (snapshot) { results.addAll(results.size(), internalGetKeys(snapshot, origin, @@ -410,7 +505,7 @@ return results; } finally { - this.lock.readLock().unlock(); + this.mc_lock.readLock().unlock(); } } @@ -470,31 +565,20 @@ * the cell has been deleted. */ boolean isDeleted(final HStoreKey key) { - return HLogEdit.isDeleted(this.memcache.get(key)); + return HLogEdit.isDeleted(this.mc.get(key)); } /** * @return a scanner over the keys in the Memcache */ HInternalScannerInterface getScanner(long timestamp, - Text targetCols[], Text firstRow) throws IOException { - - // Here we rely on ReentrantReadWriteLock's ability to acquire multiple - // locks by the same thread and to be able to downgrade a write lock to - // a read lock. We need to hold a lock throughout this method, but only - // need the write lock while creating the memcache snapshot - - this.lock.writeLock().lock(); // hold write lock during memcache snapshot - snapshot(); // snapshot memcache - this.lock.readLock().lock(); // acquire read lock - this.lock.writeLock().unlock(); // downgrade to read lock + Text targetCols[], Text firstRow) + throws IOException { + this.mc_lock.readLock().lock(); try { - // Prevent a cache flush while we are constructing the scanner - return new MemcacheScanner(timestamp, targetCols, firstRow); - } finally { - this.lock.readLock().unlock(); + this.mc_lock.readLock().unlock(); } } @@ -503,115 +587,73 @@ // It lets the caller scan the contents of the Memcache. ////////////////////////////////////////////////////////////////////////////// - class MemcacheScanner extends HAbstractScanner { - SortedMap<HStoreKey, byte []> backingMap; - Iterator<HStoreKey> keyIterator; - - @SuppressWarnings("unchecked") + private class MemcacheScanner extends HAbstractScanner { + private Text currentRow; + private Set<Text> columns = null; + MemcacheScanner(final long timestamp, final Text targetCols[], - final Text firstRow) throws IOException { - + final Text firstRow) + throws IOException { + // Call to super will create ColumnMatchers and whether this is a regex + // scanner or not. Will also save away timestamp. Also sorts rows. super(timestamp, targetCols); - try { - this.backingMap = new TreeMap<HStoreKey, byte[]>(); - this.backingMap.putAll(snapshot); - this.keys = new HStoreKey[1]; - this.vals = new byte[1][]; - - // Generate list of iterators - - HStoreKey firstKey = new HStoreKey(firstRow); - if (firstRow != null && firstRow.getLength() != 0) { - keyIterator = - backingMap.tailMap(firstKey).keySet().iterator(); - - } else { - keyIterator = backingMap.keySet().iterator(); - } - - while (getNext(0)) { - if (!findFirstRow(0, firstRow)) { - continue; - } - if (columnMatch(0)) { - break; - } - } - } catch (RuntimeException ex) { - LOG.error("error initializing Memcache scanner: ", ex); - close(); - IOException e = new IOException("error initializing Memcache scanner"); - e.initCause(ex); - throw e; - - } catch(IOException ex) { - LOG.error("error initializing Memcache scanner: ", ex); - close(); - throw ex; - } - } - - /** - * The user didn't want to start scanning at the first row. This method - * seeks to the requested row. - * - * @param i which iterator to advance - * @param firstRow seek to this row - * @return true if this is the first row - */ - @Override - boolean findFirstRow(int i, Text firstRow) { - return firstRow.getLength() == 0 || - keys[i].getRow().compareTo(firstRow) >= 0; - } - - /** - * Get the next value from the specified iterator. - * - * @param i Which iterator to fetch next value from - * @return true if there is more data available - */ - @Override - boolean getNext(int i) { - boolean result = false; - while (true) { - if (!keyIterator.hasNext()) { - closeSubScanner(i); - break; - } - // Check key is < than passed timestamp for this scanner. - HStoreKey hsk = keyIterator.next(); - if (hsk == null) { - throw new NullPointerException("Unexpected null key"); - } - if (hsk.getTimestamp() <= this.timestamp) { - this.keys[i] = hsk; - this.vals[i] = backingMap.get(keys[i]); - result = true; - break; + this.currentRow = firstRow; + this.columns = null; + if (!isWildcardScanner()) { + this.columns = new HashSet<Text>(); + for (int i = 0; i < targetCols.length; i++) { + this.columns.add(targetCols[i]); } } - return result; - } - - /** Shut down an individual map iterator. */ - @Override - void closeSubScanner(int i) { - keyIterator = null; - keys[i] = null; - vals[i] = null; - backingMap = null; } - /** Shut down map iterators */ - public void close() { - if (!scannerClosed) { - if(keyIterator != null) { - closeSubScanner(0); - } - scannerClosed = true; - } - } + public boolean next(HStoreKey key, SortedMap<Text, byte []> results) + throws IOException { + if (this.scannerClosed) { + return false; + } + Map<Text, Long> deletes = new HashMap<Text, Long>(); + // Catch all row results in here. These results are ten filtered to + // ensure they match column name regexes, or if none, added to results. + SortedMap<Text, byte []> rowResults = new TreeMap<Text, byte[]>(); + if (results.size() > 0) { + results.clear(); + } + while (results.size() <= 0 && + (this.currentRow = getNextRow(this.currentRow)) != null) { + if (deletes.size() > 0) { + deletes.clear(); + } + if (rowResults.size() > 0) { + rowResults.clear(); + } + key.setRow(this.currentRow); + key.setVersion(this.timestamp); + getFull(key, deletes, rowResults); + for (Map.Entry<Text, byte[]> e: rowResults.entrySet()) { + Text column = e.getKey(); + byte [] c = e.getValue(); + if (isWildcardScanner()) { + // Check the results match. We only check columns, not timestamps. + // We presume that timestamps have been handled properly when we + // called getFull. + if (!columnMatch(column)) { + continue; + } + } else if (!this.columns.contains(column)) { + // Don't include columns not asked for. + continue; + } + results.put(column, c); + } + } + return results.size() > 0; + } + public void close() { + if (!scannerClosed) { + scannerClosed = true; + } + } } } @@ -1118,15 +1160,15 @@ ////////////////////////////////////////////////////////////////////////////// // Flush changes to disk ////////////////////////////////////////////////////////////////////////////// - /** - * Prior to doing a cache flush, we need to snapshot the memcache. Locking is - * handled by the memcache. + * Prior to doing a cache flush, we need to snapshot the memcache. + * TODO: This method is ugly. Why let client of HStore run snapshots. How + * do we know they'll be cleaned up? */ void snapshotMemcache() { this.memcache.snapshot(); } - + /** * Write out a brand-new set of items to the disk. * @@ -1141,17 +1183,24 @@ * @throws IOException */ void flushCache(final long logCacheFlushId) throws IOException { - internalFlushCache(memcache.getSnapshot(), logCacheFlushId); + SortedMap<HStoreKey, byte []> cache = this.memcache.snapshot(); + internalFlushCache(cache, logCacheFlushId); + // If an exception happens flushing, we let it out without clearing + // the memcache snapshot. The old snapshot will be returned when we say + // 'snapshot', the next time flush comes around. + this.memcache.clearSnapshot(cache); } private void internalFlushCache(SortedMap<HStoreKey, byte []> cache, long logCacheFlushId) throws IOException { + // TODO: We can fail in the below block before we complete adding this + // flush to list of store files. Add cleanup of anything put on filesystem + // if we fail. synchronized(flushLock) { // A. Write the Maps out to the disk HStoreFile flushedFile = new HStoreFile(conf, fs, basedir, info.getEncodedName(), family.getFamilyName(), -1L, null); - String name = flushedFile.toString(); MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression, this.bloomFilter); @@ -1198,7 +1247,7 @@ flushedFile.getReader(this.fs, this.bloomFilter)); this.storefiles.put(flushid, flushedFile); if(LOG.isDebugEnabled()) { - LOG.debug("Added " + name + " with " + entries + + LOG.debug("Added " + flushedFile.toString() + " with " + entries + " entries, sequence id " + logCacheFlushId + ", and size " + StringUtils.humanReadableInt(flushedFile.length()) + " for " + this.storeName); @@ -1919,7 +1968,7 @@ } // finally, check the memcache - memcache.getRowKeyAtOrBefore(row, candidateKeys); + this.memcache.getRowKeyAtOrBefore(row, candidateKeys); // return the best key from candidateKeys if (!candidateKeys.isEmpty()) { @@ -2290,10 +2339,14 @@ } /** - * A scanner that iterates through the HStore files + * A scanner that iterates through HStore files */ private class StoreFileScanner extends HAbstractScanner { - @SuppressWarnings("hiding") + // Keys retrieved from the sources + private HStoreKey keys[]; + // Values that correspond to those keys + private byte [][] vals; + private MapFile.Reader[] readers; StoreFileScanner(long timestamp, Text[] targetCols, Text firstRow) @@ -2335,6 +2388,99 @@ throw e; } } + + /** + * For a particular column i, find all the matchers defined for the column. + * Compare the column family and column key using the matchers. The first one + * that matches returns true. If no matchers are successful, return false. + * + * @param i index into the keys array + * @return true if any of the matchers for the column match the column family + * and the column key. + * @throws IOException + */ + boolean columnMatch(int i) throws IOException { + return columnMatch(keys[i].getColumn()); + } + + /** + * @param key The key that matched + * @param results All the results for <code>key</code> + * @return true if a match was found + * @throws IOException + * + * @see org.apache.hadoop.hbase.regionserver.InternalScanner#next(org.apache.hadoop.hbase.HStoreKey, java.util.SortedMap) + */ + @Override + public boolean next(HStoreKey key, SortedMap<Text, byte []> results) + throws IOException { + if (scannerClosed) { + return false; + } + // Find the next row label (and timestamp) + Text chosenRow = null; + long chosenTimestamp = -1; + for(int i = 0; i < keys.length; i++) { + if((keys[i] != null) + && (columnMatch(i)) + && (keys[i].getTimestamp() <= this.timestamp) + && ((chosenRow == null) + || (keys[i].getRow().compareTo(chosenRow) < 0) + || ((keys[i].getRow().compareTo(chosenRow) == 0) + && (keys[i].getTimestamp() > chosenTimestamp)))) { + chosenRow = new Text(keys[i].getRow()); + chosenTimestamp = keys[i].getTimestamp(); + } + } + + // Grab all the values that match this row/timestamp + boolean insertedItem = false; + if(chosenRow != null) { + key.setRow(chosenRow); + key.setVersion(chosenTimestamp); + key.setColumn(new Text("")); + + for(int i = 0; i < keys.length; i++) { + // Fetch the data + while((keys[i] != null) + && (keys[i].getRow().compareTo(chosenRow) == 0)) { + + // If we are doing a wild card match or there are multiple matchers + // per column, we need to scan all the older versions of this row + // to pick up the rest of the family members + + if(!isWildcardScanner() + && !isMultipleMatchScanner() + && (keys[i].getTimestamp() != chosenTimestamp)) { + break; + } + + if(columnMatch(i)) { + // We only want the first result for any specific family member + if(!results.containsKey(keys[i].getColumn())) { + results.put(new Text(keys[i].getColumn()), vals[i]); + insertedItem = true; + } + } + + if(!getNext(i)) { + closeSubScanner(i); + } + } + + // Advance the current scanner beyond the chosen row, to + // a valid timestamp, so we're ready next time. + + while((keys[i] != null) + && ((keys[i].getRow().compareTo(chosenRow) <= 0) + || (keys[i].getTimestamp() > this.timestamp) + || (! columnMatch(i)))) { + getNext(i); + } + } + } + return insertedItem; + } /** * The user didn't want to start scanning at the first row. This method @@ -2344,7 +2490,6 @@ * @param firstRow - seek to this row * @return - true if this is the first row or if the row was not found */ - @Override boolean findFirstRow(int i, Text firstRow) throws IOException { ImmutableBytesWritable ibw = new ImmutableBytesWritable(); HStoreKey firstKey @@ -2367,7 +2512,6 @@ * @param i - which reader to fetch next value from * @return - true if there is more data available */ - @Override boolean getNext(int i) throws IOException { boolean result = false; ImmutableBytesWritable ibw = new ImmutableBytesWritable(); @@ -2386,7 +2530,6 @@ } /** Close down the indicated reader. */ - @Override void closeSubScanner(int i) { try { if(readers[i] != null) { Modified: hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/TestHMemcache.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/TestHMemcache.java?rev=648940&r1=648939&r2=648940&view=diff ============================================================================== --- hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/TestHMemcache.java (original) +++ hadoop/hbase/branches/0.1/src/test/org/apache/hadoop/hbase/TestHMemcache.java Wed Apr 16 20:21:11 2008 @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.rmi.UnexpectedException; import java.util.Map; import java.util.TreeMap; import java.util.SortedMap; @@ -72,13 +73,14 @@ } } - private void runSnapshot(final HStore.Memcache hmc) { + private void runSnapshot(final HStore.Memcache hmc) + throws UnexpectedException { // Save off old state. - int oldHistorySize = hmc.snapshot.size(); - hmc.snapshot(); + int oldHistorySize = hmc.getSnapshot().size(); + SortedMap<HStoreKey, byte[]> ss = hmc.snapshot(); // Make some assertions about what just happened. - assertTrue("History size has not increased", - oldHistorySize < hmc.snapshot.size()); + assertTrue("History size has not increased", oldHistorySize < ss.size()); + hmc.clearSnapshot(ss); } /** @@ -91,9 +93,8 @@ for (int i = 0; i < snapshotCount; i++) { addRows(this.hmemcache); runSnapshot(this.hmemcache); - this.hmemcache.getSnapshot(); - assertEquals("History not being cleared", 0, - this.hmemcache.snapshot.size()); + SortedMap<HStoreKey, byte[]> ss = this.hmemcache.getSnapshot(); + assertEquals("History not being cleared", 0, ss.size()); } } @@ -128,7 +129,41 @@ isExpectedRow(i, all); } } - + + /** Test getNextRow from memcache + * @throws UnsupportedEncodingException + */ + public void testGetNextRow() throws UnsupportedEncodingException { + addRows(this.hmemcache); + Text closestToEmpty = this.hmemcache.getNextRow(HConstants.EMPTY_TEXT); + assertEquals(closestToEmpty, getRowName(0)); + for (int i = 0; i < ROW_COUNT; i++) { + Text nr = this.hmemcache.getNextRow(getRowName(i)); + if (i + 1 == ROW_COUNT) { + assertEquals(nr, null); + } else { + assertEquals(nr, getRowName(i + 1)); + } + } + } + + /** Test getClosest from memcache + * @throws UnsupportedEncodingException + */ + public void testGetClosest() throws UnsupportedEncodingException { + addRows(this.hmemcache); + Text closestToEmpty = this.hmemcache.getNextRow(HConstants.EMPTY_TEXT); + assertEquals(closestToEmpty, getRowName(0)); + for (int i = 0; i < ROW_COUNT; i++) { + Text nr = this.hmemcache.getNextRow(getRowName(i)); + if (i + 1 == ROW_COUNT) { + assertEquals(nr, null); + } else { + assertEquals(nr, getRowName(i + 1)); + } + } + } + /** * Test memcache scanner * @throws IOException