Author: jdcryans Date: Wed Apr 7 23:41:58 2010 New Revision: 931723 URL: http://svn.apache.org/viewvc?rev=931723&view=rev Log: HBASE-2286 [Transactional Contrib] Correctly handle or avoid cases where writes occur in same millisecond (Clint Morgan via J-D)
Modified: hadoop/hbase/branches/0.20/CHANGES.txt hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java Modified: hadoop/hbase/branches/0.20/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/CHANGES.txt?rev=931723&r1=931722&r2=931723&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/CHANGES.txt (original) +++ hadoop/hbase/branches/0.20/CHANGES.txt Wed Apr 7 23:41:58 2010 @@ -114,6 +114,8 @@ Release 0.20.4 - Unreleased HBASE-2252 Mapping a very big table kills region servers HBASE-2411 Findbugs target HBASE-2412 [stargate] PerformanceEvaluation + HBASE-2286 [Transactional Contrib] Correctly handle or avoid cases where + writes occur in same millisecond (Clint Morgan via J-D) NEW FEATURES HBASE-2257 [stargate] multiuser mode Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java?rev=931723&r1=931722&r2=931723&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java (original) +++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java Wed Apr 7 23:41:58 2010 @@ -175,6 +175,9 @@ public class IndexedTable extends Transa Result row = indexResult[i]; byte[] baseRow = row.getValue(INDEX_BASE_ROW_COLUMN); + if (baseRow == null) { + throw new IllegalStateException("Missing base row for indexed row: ["+Bytes.toString(row.getRow())+"]"); + } LOG.debug("next index row [" + Bytes.toString(row.getRow()) + "] -> base row [" + Bytes.toString(baseRow) + "]"); Result baseResult = null; @@ -195,7 +198,10 @@ public class IndexedTable extends Transa } if (baseResult != null) { - results.addAll(baseResult.list()); + List<KeyValue> list = baseResult.list(); + if (list != null) { + results.addAll(list); + } } result[i] = new Result(results); Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java?rev=931723&r1=931722&r2=931723&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java (original) +++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java Wed Apr 7 23:41:58 2010 @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Leases; import org.apache.hadoop.hbase.client.Delete; @@ -141,10 +142,32 @@ class IndexedRegion extends Transactiona SortedMap<byte[], byte[]> oldColumnValues = convertToValueMap(oldResult); for (IndexSpecification indexSpec : indexesToUpdate) { - removeOldIndexEntry(indexSpec, put.getRow(), oldColumnValues); - updateIndex(indexSpec, put.getRow(), newColumnValues); + updateIndex(indexSpec, put, newColumnValues, oldColumnValues); } } + + // FIXME: This call takes place in an RPC, and requires an RPC. This makes for + // a likely deadlock if the number of RPCs we are trying to serve is >= the + // number of handler threads. + private void updateIndex(IndexSpecification indexSpec, Put put, + NavigableMap<byte[], byte[]> newColumnValues, + SortedMap<byte[], byte[]> oldColumnValues) throws IOException { + Delete indexDelete = makeDeleteToRemoveOldIndexEntry(indexSpec, put.getRow(), oldColumnValues); + Put indexPut = makeIndexUpdate(indexSpec, put.getRow(), newColumnValues); + + HTable indexTable = getIndexTable(indexSpec); + if (indexDelete != null && !Bytes.equals(indexDelete.getRow(), indexPut.getRow())) { + // Only do the delete if the row changed. This way we save the put after delete issues in HBASE-2256 + LOG.debug("Deleting old index row ["+Bytes.toString(indexDelete.getRow())+"]. New row is ["+Bytes.toString(indexPut.getRow())+"]."); + indexTable.delete(indexDelete); + } else if (indexDelete != null){ + LOG.debug("Skipping deleting index row ["+Bytes.toString(indexDelete.getRow())+"] because it has not changed."); + } + indexTable.put(indexPut); + } + + + /** Return the columns needed for the update. */ private NavigableSet<byte[]> getColumnsForIndexes(Collection<IndexSpecification> indexes) { @@ -157,7 +180,7 @@ class IndexedRegion extends Transactiona return neededColumns; } - private void removeOldIndexEntry(IndexSpecification indexSpec, byte[] row, + private Delete makeDeleteToRemoveOldIndexEntry(IndexSpecification indexSpec, byte[] row, SortedMap<byte[], byte[]> oldColumnValues) throws IOException { for (byte[] indexedCol : indexSpec.getIndexedColumns()) { if (!oldColumnValues.containsKey(indexedCol)) { @@ -165,7 +188,7 @@ class IndexedRegion extends Transactiona + "] not trying to remove old entry for row [" + Bytes.toString(row) + "] because col [" + Bytes.toString(indexedCol) + "] is missing"); - return; + return null; } } @@ -173,7 +196,7 @@ class IndexedRegion extends Transactiona oldColumnValues); LOG.debug("Index [" + indexSpec.getIndexId() + "] removing old entry [" + Bytes.toString(oldIndexRow) + "]"); - getIndexTable(indexSpec).delete(new Delete(oldIndexRow)); + return new Delete(oldIndexRow); } private NavigableMap<byte[], byte[]> getColumnsFromPut(Put put) { @@ -204,23 +227,23 @@ class IndexedRegion extends Transactiona return false; } - // FIXME: This call takes place in an RPC, and requires an RPC. This makes for - // a likely deadlock if the number of RPCs we are trying to serve is >= the - // number of handler threads. - private void updateIndex(IndexSpecification indexSpec, byte[] row, + private Put makeIndexUpdate(IndexSpecification indexSpec, byte[] row, SortedMap<byte[], byte[]> columnValues) throws IOException { Put indexUpdate = IndexMaintenanceUtils.createIndexUpdate(indexSpec, row, columnValues); - getIndexTable(indexSpec).put(indexUpdate); LOG.debug("Index [" + indexSpec.getIndexId() + "] adding new entry [" + Bytes.toString(indexUpdate.getRow()) + "] for row [" + Bytes.toString(row) + "]"); + return indexUpdate; + } + // FIXME we can be smarter about this and avoid the base gets and index maintenance in many cases. @Override public void delete(Delete delete, final Integer lockid, boolean writeToWAL) throws IOException { - // First remove the existing indexes. + // First look at the current (to be the old) state. + SortedMap<byte[], byte[]> oldColumnValues = null; if (!getIndexes().isEmpty()) { // Need all columns NavigableSet<byte[]> neededColumns = getColumnsForIndexes(getIndexes()); @@ -231,12 +254,7 @@ class IndexedRegion extends Transactiona } Result oldRow = super.get(get, lockid); - SortedMap<byte[], byte[]> oldColumnValues = convertToValueMap(oldRow); - - - for (IndexSpecification indexSpec : getIndexes()) { - removeOldIndexEntry(indexSpec, delete.getRow(), oldColumnValues); - } + oldColumnValues = convertToValueMap(oldRow); } super.delete(delete, lockid, writeToWAL); @@ -246,17 +264,64 @@ class IndexedRegion extends Transactiona // Rebuild index if there is still a version visible. Result currentRow = super.get(get, lockid); - if (!currentRow.isEmpty()) { SortedMap<byte[], byte[]> currentColumnValues = convertToValueMap(currentRow); - for (IndexSpecification indexSpec : getIndexes()) { - if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec, currentColumnValues)) { - updateIndex(indexSpec, delete.getRow(), currentColumnValues); + + for (IndexSpecification indexSpec : getIndexes()) { + Delete indexDelete = null; + if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec, oldColumnValues)) { + indexDelete = makeDeleteToRemoveOldIndexEntry(indexSpec, delete + .getRow(), oldColumnValues); + } + Put indexPut = null; + if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec, + currentColumnValues)) { + indexPut = makeIndexUpdate(indexSpec, delete.getRow(), + currentColumnValues); + } + if (indexPut == null && indexDelete == null) { + continue; + } + + HTable indexTable = getIndexTable(indexSpec); + if (indexDelete != null + && (indexPut == null || !Bytes.equals(indexDelete.getRow(), + indexPut.getRow()))) { + // Only do the delete if the row changed. This way we save the put + // after delete issues in HBASE-2256 + LOG.debug("Deleting old index row [" + + Bytes.toString(indexDelete.getRow()) + "]."); + indexTable.delete(indexDelete); + } else if (indexDelete != null) { + LOG.debug("Skipping deleting index row [" + + Bytes.toString(indexDelete.getRow()) + + "] because it has not changed."); + + for (byte [] indexCol : indexSpec.getAdditionalColumns()) { + byte[][] parsed = HStoreKey.parseColumn(indexCol); + List<KeyValue> famDeletes = delete.getFamilyMap().get(parsed[0]); + if (famDeletes != null) { + for (KeyValue kv : famDeletes) { + if (Bytes.equals(indexCol, kv.getColumn())) { + LOG.debug("Need to delete this specific column: "+Bytes.toString(kv.getColumn())); + Delete columnDelete = new Delete(indexDelete.getRow()); + columnDelete.deleteColumns(indexCol); + indexTable.delete(columnDelete); + } + } + + } } } + + if (indexPut != null) { + getIndexTable(indexSpec).put(indexPut); + } } + } - } + + private SortedMap<byte[], byte[]> convertToValueMap(Result result) { SortedMap<byte[], byte[]> currentColumnValues = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR); Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java?rev=931723&r1=931722&r2=931723&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (original) +++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java Wed Apr 7 23:41:58 2010 @@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.regionse import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -138,17 +140,25 @@ class TransactionState { } void addWrite(final Put write) { + updateLatestTimestamp(write.getFamilyMap().values()); + puts.add(write); + } + + + // FIXME REVIEW not sure about this. Needed for log recovery? but broke other tests. + private void updateLatestTimestamp(Collection<List<KeyValue>> kvsCollection) { byte [] now = Bytes.toBytes(System.currentTimeMillis()); - // HAVE to manually set the KV timestamps - for (List<KeyValue> kvs : write.getFamilyMap().values()) { - for (KeyValue kv : kvs) { + // HAVE to manually set the KV timestamps + for (List<KeyValue> kvs : kvsCollection) { + for (KeyValue kv : kvs) { + if (kv.isLatestTimestamp()) { kv.updateLatestStamp(now); } - } - - puts.add(write); + } + } } + boolean hasWrite() { return puts.size() > 0 || deletes.size() > 0; } @@ -158,6 +168,7 @@ class TransactionState { } void addDelete(final Delete delete) { + //updateLatestTimestamp(delete.getFamilyMap().values()); deletes.add(delete); } @@ -371,13 +382,6 @@ class TransactionState { return deletes; } - /** Set deleteSet. - * @param deleteSet the deleteSet to set - */ - void setDeleteSet(List<Delete> deleteSet) { - this.deletes = deleteSet; - } - /** Get a scanner to go through the puts from this transaction. Used to weave together the local trx puts with the global state. * * @return scanner @@ -393,20 +397,56 @@ class TransactionState { */ private class PutScanner implements KeyValueScanner, InternalScanner { - private NavigableSet<KeyValue> kvSet; + private List<KeyValue> kvList; private Iterator<KeyValue> iterator; private boolean didHasNext = false; private KeyValue next = null; PutScanner() { - kvSet = new TreeSet<KeyValue>(KeyValue.COMPARATOR); + kvList = new ArrayList<KeyValue>(); + for (Put put : puts) { for (List<KeyValue> putKVs : put.getFamilyMap().values()) { - kvSet.addAll(putKVs); + kvList.addAll(putKVs); + } + } + + Collections.sort(kvList, new Comparator<KeyValue>() { + + /** We want to honor the order of the puts in the case where multiple have the same timestamp. + * + * @param o1 + * @param o2 + * @return + */ + public int compare(KeyValue o1, KeyValue o2) { + int result = KeyValue.COMPARATOR.compare(o1, o2); + if (result != 0) { + return result; + } + if (o1 == o2) { + return 0; + } + int put1Number = getPutNumber(o1); + int put2Number = getPutNumber(o2); + return put2Number - put1Number; + } + }); + + iterator = kvList.iterator(); + } + + private int getPutNumber(KeyValue kv) { + for (int i=0; i < puts.size(); i++) { + for (List<KeyValue> putKVs : puts.get(i).getFamilyMap().values()) { + for (KeyValue putKV : putKVs) + if (putKV == kv) { + return i; + } } } - iterator = kvSet.iterator(); + throw new IllegalStateException("Can not fine put KV in puts"); } public void close() { @@ -424,8 +464,18 @@ class TransactionState { return next; } + private void iteratorFrom(KeyValue key) { + iterator = kvList.iterator(); + while (iterator.hasNext()) { + KeyValue next = iterator.next(); + if (KeyValue.COMPARATOR.compare(next, key) >= 0) { + break; + } + } + } + public boolean seek(KeyValue key) { - iterator = kvSet.headSet(key).iterator(); + iteratorFrom(key); getNext(); return next != null; Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java?rev=931723&r1=931722&r2=931723&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java (original) +++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java Wed Apr 7 23:41:58 2010 @@ -128,6 +128,25 @@ public class TestTransactions extends HB Assert.assertEquals(newValue, Bytes.toInt(row1_A.value())); } + public void testGetAfterPutPut() throws IOException { + TransactionState transactionState = transactionManager.beginTransaction(); + + int originalValue = Bytes.toInt(table.get(transactionState, + new Get(ROW1).addColumn(COL_A)).value()); + int newValue = originalValue + 1; + + table.put(transactionState, new Put(ROW1).add(FAMILY, QUAL_A, Bytes + .toBytes(newValue))); + + newValue = newValue + 1; + + table.put(transactionState, new Put(ROW1).add(FAMILY, QUAL_A, Bytes + .toBytes(newValue))); + + Result row1_A = table.get(transactionState, new Get(ROW1).addColumn(COL_A)); + Assert.assertEquals(newValue, Bytes.toInt(row1_A.value())); + } + public void testScanAfterUpdatePut() throws IOException { TransactionState transactionState = transactionManager.beginTransaction(); @@ -174,14 +193,10 @@ public class TestTransactions extends HB public void testPutPutScan() throws IOException { TransactionState transactionState = transactionManager.beginTransaction(); + int row2Value = 199; table.put(transactionState, new Put(ROW2).add(FAMILY, QUAL_A, Bytes .toBytes(row2Value))); - try { - Thread.sleep(500); - } catch (InterruptedException ex) { - // just ignore - } row2Value = 299; table.put(transactionState, new Put(ROW2).add(FAMILY, QUAL_A, Bytes @@ -198,8 +213,18 @@ public class TestTransactions extends HB Assert.assertNotNull(result); Assert.assertEquals(Bytes.toString(ROW2), Bytes.toString(result.getRow())); Assert.assertEquals(row2Value, Bytes.toInt(result.value())); + + // TODO commit and verifty that we see second put. + } + + public void testPutPutScanOverAndOver() throws IOException { + // Do this test many times to try and hit two puts in the same millisecond + for (int i=0 ; i < 100; i++) { + testPutPutScan(); + } } + // Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA private TransactionState makeTransaction1() throws IOException { TransactionState transactionState = transactionManager.beginTransaction();