Author: chirino
Date: Mon Sep 22 13:27:24 2008
New Revision: 697976
URL: http://svn.apache.org/viewvc?rev=697976&view=rev
Log:
Fixed issue in message recovery where the indexes were inconsisent due to a
message getting assinged 2 sequence ids.
added some improved toString() methods to aid durring debugging
The Btree was not properly linking leaf nodes together on a split in some
situations.
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java?rev=697976&r1=697975&r2=697976&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
Mon Sep 22 13:27:24 2008
@@ -420,6 +420,7 @@
} else {
rNode.setLeafData(rightKeys, rightValues);
lNode.setLeafData(leftKeys, leftValues);
+ lNode.setNext(rNode.getPageId());
}
Key[] v = createKeyArray(1);
@@ -550,7 +551,7 @@
public Map.Entry<Key,Value> getLast(Transaction tx) throws IOException {
BTreeNode<Key, Value> node = this;
while( node.isBranch() ) {
- node = node.getChild(tx, children.length-1);
+ node = node.getChild(tx, node.children.length-1);
}
if( node.values.length>0 ) {
int idx = node.values.length-1;
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=697976&r1=697975&r2=697976&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
Mon Sep 22 13:27:24 2008
@@ -375,10 +375,13 @@
dataFile.unlink();
if (archiveDataLogs) {
dataFile.move(getDirectoryArchive());
- LOG.info("moved data file " + dataFile + " to " +
getDirectoryArchive());
+ LOG.debug("moved data file " + dataFile + " to " +
getDirectoryArchive());
} else {
- boolean result = dataFile.delete();
- LOG.info("discarding data file " + dataFile + (result ?
"successful " : "failed"));
+ if ( dataFile.delete() ) {
+ LOG.debug("Discarded data file " + dataFile);
+ } else {
+ LOG.warn("Failed to discard data file " + dataFile.getFile());
+ }
}
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java?rev=697976&r1=697975&r2=697976&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
Mon Sep 22 13:27:24 2008
@@ -104,9 +104,7 @@
}
public String toString() {
- String result = "offset = " + offset + ", file = " + dataFileId + ",
size = " + size + ", type = "
- + type;
- return result;
+ return dataFileId+":"+offset;
}
public void writeExternal(DataOutput dos) throws IOException {
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=697976&r1=697975&r2=697976&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
Mon Sep 22 13:27:24 2008
@@ -30,6 +30,7 @@
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -150,8 +151,8 @@
protected File directory;
protected boolean recovering;
protected Thread checkpointThread;
- protected boolean syncWrites;
- int checkpointInterval = 1000;
+ protected boolean syncWrites=true;
+ int checkpointInterval = 5*1000;
int cleanupInterval = 30*1000;
protected AtomicBoolean started = new AtomicBoolean();
@@ -590,21 +591,28 @@
// Add the message.
long id = sd.nextMessageId++;
- sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(),
location));
- sd.locationIndex.put(tx, location, id);
- sd.messageIdIndex.put(tx, command.getMessageId(), id);
+ Long previous = sd.locationIndex.put(tx, location, id);
+ if( previous == null ) {
+ sd.messageIdIndex.put(tx, command.getMessageId(), id);
+ sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(),
location));
+ } else {
+ // restore the previous value.. Looks like this was a redo of a
previously
+ // added message. We don't want to assing it a new id as the
other indexes would
+ // be wrong..
+ sd.locationIndex.put(tx, location, previous);
+ }
+
}
private void updateIndex(Transaction tx, KahaRemoveMessageCommand command,
Location ackLocation) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(),
tx);
if (!command.hasSubscriptionKey()) {
+
// In the queue case we just remove the message from the index..
Long sequenceId = sd.messageIdIndex.remove(tx,
command.getMessageId());
if (sequenceId != null) {
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
- if( keys!=null ) {
- sd.locationIndex.remove(tx, keys.location);
- }
+ sd.locationIndex.remove(tx, keys.location);
}
} else {
// In the topic case we need remove the message once it's been
acked
@@ -698,6 +706,7 @@
// Find empty journal files to remove.
final HashSet<Integer> inUseFiles = new HashSet<Integer>();
for (StoredDestination sd : storedDestinations.values()) {
+
// Use a visitor to cut down the number of pages that we load
sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
int last=-1;
@@ -739,7 +748,6 @@
LOG.debug("Checkpoint done.");
}
-
// /////////////////////////////////////////////////////////////////
// StoredDestination related implementation methods.
// /////////////////////////////////////////////////////////////////
@@ -761,6 +769,11 @@
this.messageId=messageId;
this.location=location;
}
+
+ @Override
+ public String toString() {
+ return "["+messageId+","+location+"]";
+ }
}
static protected class MessageKeysMarshaller implements
Marshaller<MessageKeys> {
Modified:
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java?rev=697976&r1=697975&r2=697976&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java
(original)
+++
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java
Mon Sep 22 13:27:24 2008
@@ -19,10 +19,13 @@
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -48,15 +51,14 @@
public class KahaBulkLoadingTest extends JmsTestSupport {
private static final Log LOG =
LogFactory.getLog(KahaBulkLoadingTest.class);
-
- protected int messageSize = 1024 * 64;
- protected int produceCount = 10000;
+
+ protected int messageSize = 1024 * 4;
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
KahaDBPersistenceAdaptor kaha = new KahaDBPersistenceAdaptor();
kaha.setDirectory(new File("target/activemq-data/kahadb"));
- kaha.deleteAllMessages();
+ // kaha.deleteAllMessages();
broker.setPersistenceAdapter(kaha);
broker.addConnector("tcp://localhost:0");
return broker;
@@ -69,40 +71,75 @@
}
public void testQueueSendThenAddConsumer() throws Exception {
- ProgressPrinter printer = new ProgressPrinter(produceCount, 20);
-
+ long start;
+ long end;
ActiveMQDestination destination = new ActiveMQQueue("TEST");
connection.setUseCompression(false);
connection.getPrefetchPolicy().setAll(10);
connection.start();
- Session session = connection.createSession(false,
Session.DUPS_OK_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- LOG.info("Sending " + produceCount + " messages that are " +
(messageSize / 1024.0) + "k large, for a total of " + (produceCount *
messageSize / (1024.0 * 1024.0))
- + " megs of data.");
- // Send a message to the broker.
- long start = System.currentTimeMillis();
- for (int i = 0; i < produceCount; i++) {
- printer.increment();
- BytesMessage msg = session.createBytesMessage();
- msg.writeBytes(new byte[messageSize]);
- producer.send(msg);
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ LOG.info("Receiving messages that are in the queue");
+ MessageConsumer consumer = session.createConsumer(destination);
+ BytesMessage msg = (BytesMessage)consumer.receive(2000);
+ int consumed = 0;
+ if( msg!=null ) {
+ consumed++;
}
- long end1 = System.currentTimeMillis();
+ while (true) {
+ int counter = 0;
+ if (msg == null) {
+ break;
+ }
+ end = start = System.currentTimeMillis();
+ int size = 0;
+ while ((end - start) < 5000) {
+ msg = (BytesMessage)consumer.receive(5000);
+ if (msg == null) {
+ break;
+ }
+ counter++;
+ consumed++;
+ end = System.currentTimeMillis();
+ size += msg.getBodyLength();
+ }
+ LOG.info("Consumed: " + (counter * 1000.0 / (end - start)) + " " +
" messages/sec, " + (1.0 * size / (1024.0 * 1024.0)) * ((1000.0 / (end -
start))) + " megs/sec ");
+ }
+ consumer.close();
+ LOG.info("Consumed " + consumed + " messages from the queue.");
- LOG.info("Produced messages/sec: " + (produceCount * 1000.0 / (end1 -
start)));
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- printer = new ProgressPrinter(produceCount, 10);
+ LOG.info("Sending messages that are " + (messageSize / 1024.0) + "k
large");
+ // Send a message to the broker.
start = System.currentTimeMillis();
- MessageConsumer consumer = session.createConsumer(destination);
- for (int i = 0; i < produceCount; i++) {
- printer.increment();
- assertNotNull("Getting message: " + i, consumer.receive(20000));
+
+ final AtomicBoolean stop = new AtomicBoolean();
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ stop.set(true);
+ }
+ });
+
+ int produced = 0;
+ while (!stop.get()) {
+ end = start = System.currentTimeMillis();
+ int produceCount = 0;
+ while ((end - start) < 5000 && !stop.get()) {
+ BytesMessage bm = session.createBytesMessage();
+ bm.writeBytes(new byte[messageSize]);
+ producer.send(bm);
+ produceCount++;
+ produced++;
+ end = System.currentTimeMillis();
+ }
+ LOG.info("Produced: " + (produceCount * 1000.0 / (end - start)) +
" messages/sec, " + (1.0 * produceCount * messageSize / (1024.0 * 1024.0)) *
((1000.0 / (end - start))) + " megs/sec");
}
- end1 = System.currentTimeMillis();
- LOG.info("Consumed messages/sec: " + (produceCount * 1000.0 / (end1 -
start)));
+ LOG.info("Prodcued " + produced + " messages to the queue.");
}