Author: tabish
Date: Wed Sep 14 21:15:39 2011
New Revision: 1170849
URL: http://svn.apache.org/viewvc?rev=1170849&view=rev
Log:
Some updates and changes to support some work on
https://issues.apache.org/jira/browse/AMQ-3467
Enhance the ListIndex to improve performance of the remove and put operations,
put is now a
real put and will update the element with the given key if it exists in the
list, otherwise
it will add it to the end. Also adds the ability for a single key/value pair
to span more
than one page when needed, multiple elements will still reside on one page
whenever possible.
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java?rev=1170849&r1=1170848&r2=1170849&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
Wed Sep 14 21:15:39 2011
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.Atomi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.kahadb.index.ListNode.ListIterator;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
@@ -103,6 +104,11 @@ public class ListIndex<Key,Value> implem
synchronized public boolean containsKey(Transaction tx, Key key) throws
IOException {
assertLoaded();
+
+ if (size.get() == 0) {
+ return false;
+ }
+
for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx);
iterator.hasNext(); ) {
Map.Entry<Key,Value> candidate = iterator.next();
if (key.equals(candidate.getKey())) {
@@ -112,11 +118,17 @@ public class ListIndex<Key,Value> implem
return false;
}
+ private ListNode<Key, Value> lastGetNodeCache = null;
+ private Map.Entry<Key, Value> lastGetEntryCache = null;
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
synchronized public Value get(Transaction tx, Key key) throws IOException {
assertLoaded();
for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx);
iterator.hasNext(); ) {
Map.Entry<Key,Value> candidate = iterator.next();
if (key.equals(candidate.getKey())) {
+ this.lastGetNodeCache = ((ListIterator) iterator).getCurrent();
+ this.lastGetEntryCache = candidate;
return candidate.getValue();
}
}
@@ -124,10 +136,52 @@ public class ListIndex<Key,Value> implem
}
/**
- * appends to the list
- * @return null
+ * Update the value of the item with the given key in the list if ot
exists, otherwise
+ * it appends the value to the end of the list.
+ *
+ * @return the old value contained in the list if one exists or null.
*/
+ @SuppressWarnings({ "rawtypes" })
synchronized public Value put(Transaction tx, Key key, Value value) throws
IOException {
+
+ Value oldValue = null;
+
+ if (lastGetNodeCache != null) {
+
+ if(lastGetEntryCache.getKey().equals(key)) {
+ oldValue = lastGetEntryCache.setValue(value);
+ lastGetEntryCache.setValue(value);
+ lastGetNodeCache.storeUpdate(tx);
+ return oldValue;
+ }
+
+ // This searches from the last location of a call to get for the
element to replace
+ // all the way to the end of the ListIndex.
+ Iterator<Map.Entry<Key, Value>> iterator =
lastGetNodeCache.iterator(tx);
+ while (iterator.hasNext()) {
+ Map.Entry<Key, Value> entry = iterator.next();
+ if (entry.getKey().equals(key)) {
+ oldValue = entry.setValue(value);
+ ((ListIterator) iterator).getCurrent().storeUpdate(tx);
+ return oldValue;
+ }
+ }
+ }
+
+ // Not found because the cache wasn't set or its not at the end of the
list so we
+ // start from the beginning and go to the cached location or the end,
then we do
+ // an add if its not found.
+ Iterator<Map.Entry<Key, Value>> iterator = iterator(tx);
+ while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() !=
lastGetNodeCache) {
+ Map.Entry<Key, Value> entry = iterator.next();
+ if (entry.getKey().equals(key)) {
+ oldValue = entry.setValue(value);
+ ((ListIterator) iterator).getCurrent().storeUpdate(tx);
+ return oldValue;
+ }
+ }
+
+ // Not found so add it last.
return add(tx, key, value);
}
@@ -145,15 +199,40 @@ public class ListIndex<Key,Value> implem
return null;
}
+ @SuppressWarnings("rawtypes")
synchronized public Value remove(Transaction tx, Key key) throws
IOException {
assertLoaded();
- for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx);
iterator.hasNext(); ) {
- Map.Entry<Key,Value> candidate = iterator.next();
- if (key.equals(candidate.getKey())) {
+
+ if (size.get() == 0) {
+ return null;
+ }
+
+ if (lastGetNodeCache != null) {
+
+ // This searches from the last location of a call to get for the
element to remove
+ // all the way to the end of the ListIndex.
+ Iterator<Map.Entry<Key, Value>> iterator =
lastGetNodeCache.iterator(tx);
+ while (iterator.hasNext()) {
+ Map.Entry<Key, Value> entry = iterator.next();
+ if (entry.getKey().equals(key)) {
+ iterator.remove();
+ return entry.getValue();
+ }
+ }
+ }
+
+ // Not found because the cache wasn't set or its not at the end of the
list so we
+ // start from the beginning and go to the cached location or the end
to find the
+ // element to remove.
+ Iterator<Map.Entry<Key, Value>> iterator = iterator(tx);
+ while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() !=
lastGetNodeCache) {
+ Map.Entry<Key, Value> entry = iterator.next();
+ if (entry.getKey().equals(key)) {
iterator.remove();
- return candidate.getValue();
+ return entry.getValue();
}
}
+
return null;
}
@@ -227,6 +306,7 @@ public class ListIndex<Key,Value> implem
public void storeNode(Transaction tx, ListNode<Key,Value> node, boolean
overflow) throws IOException {
tx.store(node.getPage(), marshaller, overflow);
+ flushCache();
}
public PageFile getPageFile() {
@@ -270,4 +350,9 @@ public class ListIndex<Key,Value> implem
public long size() {
return size.get();
}
+
+ private void flushCache() {
+ this.lastGetEntryCache = null;
+ this.lastGetNodeCache = null;
+ }
}
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java?rev=1170849&r1=1170848&r2=1170849&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
Wed Sep 14 21:15:39 2011
@@ -57,7 +57,7 @@ public final class ListNode<Key,Value> {
static final class KeyValueEntry<Key, Value> extends
LinkedNode<KeyValueEntry<Key, Value>> implements Entry<Key, Value>
{
private final Key key;
- private final Value value;
+ private Value value;
public KeyValueEntry(Key key, Value value) {
this.key = key;
@@ -73,7 +73,9 @@ public final class ListNode<Key,Value> {
}
public Value setValue(Value value) {
- throw new UnsupportedOperationException();
+ Value oldValue = this.value;
+ this.value = value;
+ return oldValue;
}
@Override
@@ -121,7 +123,7 @@ public final class ListNode<Key,Value> {
}
}
- private final class ListIterator implements Iterator<Entry<Key, Value>> {
+ final class ListIterator implements Iterator<Entry<Key, Value>> {
private final Transaction tx;
private final ListIndex<Key,Value> targetList;
@@ -220,6 +222,10 @@ public final class ListNode<Key,Value> {
throw e;
}
}
+
+ ListNode<Key, Value> getCurrent() {
+ return this.currentNode;
+ }
}
/**
@@ -285,9 +291,28 @@ public final class ListNode<Key,Value> {
return null;
}
+ public void storeUpdate(Transaction tx) throws IOException {
+ try {
+ if (this.entries.size() == 1) {
+ getContainingList().storeNode(tx, this, true);
+ } else {
+ getContainingList().storeNode(tx, this, false);
+ }
+ } catch ( Transaction.PageOverflowIOException e ) {
+ split(tx, ADD_FIRST);
+ }
+ }
+
private void store(Transaction tx, boolean addFirst) throws IOException {
try {
- getContainingList().storeNode(tx, this, false);
+ // When we split to a node of one element we can span multiple
+ // pages for that entry, otherwise we keep the entries on one
+ // page to avoid fragmented reads and segment the list traversal.
+ if (this.entries.size() == 1) {
+ getContainingList().storeNode(tx, this, true);
+ } else {
+ getContainingList().storeNode(tx, this, false);
+ }
} catch ( Transaction.PageOverflowIOException e ) {
// If we get an overflow
split(tx, addFirst);
@@ -295,7 +320,11 @@ public final class ListNode<Key,Value> {
}
private void store(Transaction tx) throws IOException {
- getContainingList().storeNode(tx, this, false);
+ if (this.entries.size() == 1) {
+ getContainingList().storeNode(tx, this, true);
+ } else {
+ getContainingList().storeNode(tx, this, false);
+ }
}
private void split(Transaction tx, boolean isAddFirst) throws IOException {
@@ -311,7 +340,7 @@ public final class ListNode<Key,Value> {
getContainingList().setTailPageId(extension.getPageId());
}
extension.store(tx, isAddFirst);
- store(tx);
+ store(tx, true);
}
// called after a split
Modified:
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java?rev=1170849&r1=1170848&r2=1170849&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java
(original)
+++
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java
Wed Sep 14 21:15:39 2011
@@ -16,12 +16,24 @@
*/
package org.apache.kahadb.index;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.text.NumberFormat;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
+
+import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.util.LongMarshaller;
import org.apache.kahadb.util.StringMarshaller;
+import org.apache.kahadb.util.VariableMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,6 +96,38 @@ public class ListIndexTest extends Index
tx.commit();
}
+ public void testPut() throws Exception {
+ createPageFileAndIndex(100);
+
+ ListIndex<String, Long> listIndex = ((ListIndex<String, Long>)
this.index);
+ this.index.load(tx);
+ tx.commit();
+
+ int count = 30;
+ tx = pf.tx();
+ doInsert(count);
+ tx.commit();
+ assertEquals("correct size", count, listIndex.size());
+
+ tx = pf.tx();
+ Long value = listIndex.get(tx, key(10));
+ assertNotNull(value);
+ listIndex.put(tx, key(10), Long.valueOf(1024));
+ tx.commit();
+
+ tx = pf.tx();
+ value = listIndex.get(tx, key(10));
+ assertEquals(1024L, value.longValue());
+ assertTrue(listIndex.size() == 30);
+ tx.commit();
+
+ tx = pf.tx();
+ value = listIndex.put(tx, key(31), Long.valueOf(2048));
+ assertNull(value);
+ assertTrue(listIndex.size() == 31);
+ tx.commit();
+ }
+
public void testAddFirst() throws Exception {
createPageFileAndIndex(100);
@@ -273,7 +317,7 @@ public class ListIndexTest extends Index
final int COUNT = 50000;
long start = System.currentTimeMillis();
for (int i = 0; i < COUNT; i++) {
- listIndex.put(tx, key(i), (long) i);
+ listIndex.add(tx, key(i), (long) i);
tx.commit();
}
LOG.info("Time to add " + COUNT + ": " + (System.currentTimeMillis() -
start) + " mills");
@@ -295,9 +339,85 @@ public class ListIndexTest extends Index
LOG.info("Page free count: " +
listIndex.getPageFile().getFreePageCount());
}
+ private int getMessageSize(int min, int max) {
+ return min + (int)(Math.random() * ((max - min) + 1));
+ }
+
+ public void testLargeValueOverflow() throws Exception {
+ pf = new PageFile(directory, getClass().getName());
+ pf.setPageSize(4*1024);
+ pf.setEnablePageCaching(false);
+ pf.setWriteBatchSize(1);
+ pf.load();
+ tx = pf.tx();
+ long id = tx.allocate().getPageId();
+
+ ListIndex<Long, String> test = new ListIndex<Long, String>(pf, id);
+ test.setKeyMarshaller(LongMarshaller.INSTANCE);
+ test.setValueMarshaller(StringMarshaller.INSTANCE);
+ test.load(tx);
+ tx.commit();
+
+ final long NUM_ADDITIONS = 32L;
+
+ LinkedList<Long> expected = new LinkedList<Long>();
+
+ tx = pf.tx();
+ for (long i = 0; i < NUM_ADDITIONS; ++i) {
+ final int stringSize = getMessageSize(1, 4096);
+ String val = new String(new byte[stringSize]);
+ expected.add(Long.valueOf(stringSize));
+ test.add(tx, i, val);
+ }
+ tx.commit();
+
+ tx = pf.tx();
+ for (long i = 0; i < NUM_ADDITIONS; i++) {
+ String s = test.get(tx, i);
+ assertEquals("string length did not match expected",
expected.get((int)i), Long.valueOf(s.length()));
+ }
+ tx.commit();
+
+ expected.clear();
+
+ tx = pf.tx();
+ for (long i = 0; i < NUM_ADDITIONS; ++i) {
+ final int stringSize = getMessageSize(1, 4096);
+ String val = new String(new byte[stringSize]);
+ expected.add(Long.valueOf(stringSize));
+ test.addFirst(tx, i+NUM_ADDITIONS, val);
+ }
+ tx.commit();
+
+ tx = pf.tx();
+ for (long i = 0; i < NUM_ADDITIONS; i++) {
+ String s = test.get(tx, i+NUM_ADDITIONS);
+ assertEquals("string length did not match expected",
expected.get((int)i), Long.valueOf(s.length()));
+ }
+ tx.commit();
+
+ expected.clear();
+
+ tx = pf.tx();
+ for (long i = 0; i < NUM_ADDITIONS; ++i) {
+ final int stringSize = getMessageSize(1, 4096);
+ String val = new String(new byte[stringSize]);
+ expected.add(Long.valueOf(stringSize));
+ test.put(tx, i, val);
+ }
+ tx.commit();
+
+ tx = pf.tx();
+ for (long i = 0; i < NUM_ADDITIONS; i++) {
+ String s = test.get(tx, i);
+ assertEquals("string length did not match expected",
expected.get((int)i), Long.valueOf(s.length()));
+ }
+ tx.commit();
+ }
+
void doInsertReverse(int count) throws Exception {
for (int i = count - 1; i >= 0; i--) {
- ((ListIndex) index).addFirst(tx, key(i), (long) i);
+ ((ListIndex<String, Long>) index).addFirst(tx, key(i), (long) i);
tx.commit();
}
}
@@ -306,4 +426,35 @@ public class ListIndexTest extends Index
protected String key(int i) {
return "key:" + nf.format(i);
}
+
+ static class HashSetStringMarshaller extends
VariableMarshaller<HashSet<String>> {
+ final static HashSetStringMarshaller INSTANCE = new
HashSetStringMarshaller();
+
+ public void writePayload(HashSet<String> object, DataOutput dataOut)
throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oout = new ObjectOutputStream(baos);
+ oout.writeObject(object);
+ oout.flush();
+ oout.close();
+ byte[] data = baos.toByteArray();
+ dataOut.writeInt(data.length);
+ dataOut.write(data);
+ }
+
+ @SuppressWarnings("unchecked")
+ public HashSet<String> readPayload(DataInput dataIn) throws
IOException {
+ int dataLen = dataIn.readInt();
+ byte[] data = new byte[dataLen];
+ dataIn.readFully(data);
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ ObjectInputStream oin = new ObjectInputStream(bais);
+ try {
+ return (HashSet<String>) oin.readObject();
+ } catch (ClassNotFoundException cfe) {
+ IOException ioe = new IOException("Failed to read
HashSet<String>: " + cfe);
+ ioe.initCause(cfe);
+ throw ioe;
+ }
+ }
+ }
}