Author: chirino
Date: Mon Apr 29 22:33:26 2013
New Revision: 1477387
URL: http://svn.apache.org/r1477387
Log:
Added an new ElectingLevelDBStore which handles the M/S election bits using
ZooKeeper.
Added:
activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
Modified:
activemq/trunk/activemq-leveldb-store/pom.xml
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
Modified: activemq/trunk/activemq-leveldb-store/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/pom.xml?rev=1477387&r1=1477386&r2=1477387&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/pom.xml (original)
+++ activemq/trunk/activemq-leveldb-store/pom.xml Mon Apr 29 22:33:26 2013
@@ -124,6 +124,18 @@
<artifactId>fabric-zookeeper</artifactId>
<version>7.2.0.redhat-024</version>
</dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ <version>4.3.1</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ <version>4.3.1</version>
+ <scope>provided</scope>
+ </dependency>
<!-- For Optional Snappy Compression -->
<dependency>
Added:
activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala?rev=1477387&view=auto
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala
(added)
+++
activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreTrait.scala
Mon Apr 29 22:33:26 2013
@@ -0,0 +1,44 @@
+package org.apache.activemq.leveldb.replicated
+
+import scala.reflect.BeanProperty
+import java.util.UUID
+import org.apache.activemq.leveldb.LevelDBStore
+import org.apache.activemq.leveldb.util.FileSupport._
+
+/**
+ */
+trait ReplicatedLevelDBStoreTrait extends LevelDBStore {
+
+ @BeanProperty
+ var securityToken = ""
+
+ def replicaId:String = {
+ val replicaid_file = directory / "replicaid.txt"
+ if( replicaid_file.exists() ) {
+ replicaid_file.readText()
+ } else {
+ val rc = create_uuid
+ replicaid_file.getParentFile.mkdirs()
+ replicaid_file.writeText(rc)
+ rc
+ }
+ }
+
+ def create_uuid = UUID.randomUUID().toString
+
+ def storeId:String = {
+ val storeid_file = directory / "storeid.txt"
+ if( storeid_file.exists() ) {
+ storeid_file.readText()
+ } else {
+ null
+ }
+ }
+
+ def storeId_=(value:String) {
+ val storeid_file = directory / "storeid.txt"
+ storeid_file.writeText(value)
+ }
+
+
+}
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1477387&r1=1477386&r2=1477387&view=diff
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
(original)
+++
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
Mon Apr 29 22:33:26 2013
@@ -91,25 +91,39 @@ object UowCompleted extends UowState {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-case class CountDownFuture(completed:CountDownLatch=new CountDownLatch(1))
extends java.util.concurrent.Future[Object] {
- def countDown = completed.countDown()
+class CountDownFuture[T <: AnyRef]() extends java.util.concurrent.Future[T] {
+
+ private val latch:CountDownLatch=new CountDownLatch(1)
+ @volatile
+ var value:T = _
+
def cancel(mayInterruptIfRunning: Boolean) = false
def isCancelled = false
+
+ def completed = latch.getCount()==0
+ def await() = latch.await()
+ def await(p1: Long, p2: TimeUnit) = latch.await(p1, p2)
+
+ def set(v:T) = {
+ value = v
+ latch.countDown()
+ }
+
def get() = {
- completed.await()
- null
+ latch.await()
+ value
}
def get(p1: Long, p2: TimeUnit) = {
- if(completed.await(p1, p2)) {
- null
+ if(latch.await(p1, p2)) {
+ value
} else {
throw new TimeoutException
}
}
- def isDone = completed.await(0, TimeUnit.SECONDS);
+ def isDone = latch.await(0, TimeUnit.SECONDS);
}
object UowManagerConstants {
@@ -125,7 +139,7 @@ object UowManagerConstants {
import UowManagerConstants._
class DelayableUOW(val manager:DBManager) extends BaseRetained {
- val countDownFuture = CountDownFuture()
+ val countDownFuture = new CountDownFuture[AnyRef]()
var canceled = false;
val uowId:Int = manager.lastUowId.incrementAndGet()
@@ -310,7 +324,7 @@ class DelayableUOW(val manager:DBManager
val s = size
if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) {
asyncCapacityUsed = s
- countDownFuture.countDown
+ countDownFuture.set(null)
manager.parent.blocking_executor.execute(^{
complete_listeners.foreach(_())
})
@@ -332,7 +346,7 @@ class DelayableUOW(val manager:DBManager
asyncCapacityUsed = 0
} else {
manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
- countDownFuture.countDown
+ countDownFuture.set(null)
manager.parent.blocking_executor.execute(^{
complete_listeners.foreach(_())
})
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1477387&r1=1477386&r2=1477387&view=diff
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
(original)
+++
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
Mon Apr 29 22:33:26 2013
@@ -748,7 +748,9 @@ class LevelDBClient(store: LevelDBStore)
loadMap(LOG_REF_INDEX_KEY, logRefs)
loadMap(COLLECTION_META_KEY, collectionMeta)
}
-
+
+ var wal_append_position = 0L
+
def stop() = {
if( writeExecutor!=null ) {
writeExecutor.shutdown
@@ -765,6 +767,7 @@ class LevelDBClient(store: LevelDBStore)
if (log.isOpen) {
log.close
copyDirtyIndexToSnapshot
+ wal_append_position = log.appender_limit
}
if( plist!=null ) {
plist.close
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1477387&r1=1477386&r2=1477387&view=diff
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
(original)
+++
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Mon Apr 29 22:33:26 2013
@@ -17,41 +17,24 @@
package org.apache.activemq.leveldb
-import org.apache.activemq.broker.{LockableServiceSupport, BrokerService,
BrokerServiceAware, ConnectionContext}
+import org.apache.activemq.broker.{LockableServiceSupport, BrokerServiceAware,
ConnectionContext}
import org.apache.activemq.command._
import org.apache.activemq.openwire.OpenWireFormat
import org.apache.activemq.usage.SystemUsage
import java.io.File
import java.io.IOException
import java.util.concurrent._
-import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
+import java.util.concurrent.atomic.AtomicLong
import reflect.BeanProperty
import org.apache.activemq.store._
import java.util._
import collection.mutable.ListBuffer
-import javax.management.ObjectName
import org.apache.activemq.broker.jmx.{BrokerMBeanSupport, AnnotatedMBean}
import org.apache.activemq.util._
-import org.apache.activemq.leveldb.util.{RetrySupport, FileSupport, Log}
+import org.apache.activemq.leveldb.util.{RetrySupport, Log}
import org.apache.activemq.store.PList.PListIterator
import java.lang
-import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream, Buffer}
-import scala.Some
-import org.apache.activemq.leveldb.CountDownFuture
-import org.apache.activemq.leveldb.XaAckRecord
-import org.apache.activemq.leveldb.DurableSubscription
-import scala.Some
-import org.apache.activemq.leveldb.CountDownFuture
-import org.apache.activemq.leveldb.XaAckRecord
-import org.apache.activemq.leveldb.DurableSubscription
-import scala.Some
-import org.apache.activemq.leveldb.CountDownFuture
-import org.apache.activemq.leveldb.XaAckRecord
-import org.apache.activemq.leveldb.DurableSubscription
-import scala.Some
-import org.apache.activemq.leveldb.CountDownFuture
-import org.apache.activemq.leveldb.XaAckRecord
-import org.apache.activemq.leveldb.DurableSubscription
+import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream}
object LevelDBStore extends Log {
val DEFAULT_DIRECTORY = new File("LevelDB");
@@ -64,8 +47,8 @@ object LevelDBStore extends Log {
}
})
- val DONE = new CountDownFuture();
- DONE.countDown
+ val DONE = new CountDownFuture[AnyRef]();
+ DONE.set(null)
def toIOException(e: Throwable): IOException = {
if (e.isInstanceOf[ExecutionException]) {
@@ -208,7 +191,6 @@ class LevelDBStore extends LockableServi
var snappyCompressLogs = false
def doStart: Unit = {
- import FileSupport._
snappyCompressLogs = logCompression.toLowerCase == "snappy" && Snappy !=
null
debug("starting")
@@ -583,7 +565,7 @@ class LevelDBStore extends LockableServi
lastSeq.set(db.getLastQueueEntrySeq(key))
- def doAdd(uow: DelayableUOW, message: Message, delay:Boolean):
CountDownFuture = {
+ def doAdd(uow: DelayableUOW, message: Message, delay:Boolean):
CountDownFuture[AnyRef] = {
uow.enqueue(key, lastSeq.incrementAndGet, message, delay)
}
@@ -606,7 +588,7 @@ class LevelDBStore extends LockableServi
waitOn(asyncAddQueueMessage(context, message, delay))
}
- def doRemove(uow: DelayableUOW, id: MessageId): CountDownFuture = {
+ def doRemove(uow: DelayableUOW, id: MessageId): CountDownFuture[AnyRef] = {
uow.dequeue(key, id)
}
Added:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala?rev=1477387&view=auto
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
(added)
+++
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
Mon Apr 29 22:33:26 2013
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.replicated
+
+import org.fusesource.fabric.groups._
+import org.fusesource.fabric.zookeeper.internal.ZKClient
+import org.linkedin.util.clock.Timespan
+import scala.reflect.BeanProperty
+import org.apache.activemq.util.{ServiceStopper, ServiceSupport}
+import org.apache.activemq.leveldb.{LevelDBClient, RecordLog, LevelDBStore}
+import java.net.{NetworkInterface, InetAddress}
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.broker.Locker
+import org.apache.activemq.store.PersistenceAdapter
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.activemq.leveldb.util.Log
+import java.io.File
+
+object ElectingLevelDBStore extends Log {
+
+ def machine_hostname: String = {
+ import collection.JavaConversions._
+ // Get the host name of the first non loop-back interface..
+ for (interface <- NetworkInterface.getNetworkInterfaces; if
(!interface.isLoopback); inet <- interface.getInetAddresses) {
+ var address = inet.getHostAddress
+ var name = inet.getCanonicalHostName
+ if( address!= name ) {
+ return name
+ }
+ }
+ // Or else just go the simple route.
+ return InetAddress.getLocalHost.getCanonicalHostName;
+ }
+
+}
+
+/**
+ *
+ */
+class ElectingLevelDBStore extends ProxyLevelDBStore {
+ import ElectingLevelDBStore._
+
+ def proxy_target = master
+
+ @BeanProperty
+ var zkAddress = "tcp://127.0.0.1:2888"
+ @BeanProperty
+ var zkPassword:String = _
+ @BeanProperty
+ var zkPath = "/default"
+ @BeanProperty
+ var zkSessionTmeout = "2s"
+
+ var brokerName: String = _
+
+ @BeanProperty
+ var hostname: String = _
+ @BeanProperty
+ var bind = "tcp://0.0.0.0:61619"
+ @BeanProperty
+ var minReplica = 1
+ @BeanProperty
+ var securityToken = ""
+
+ var directory = LevelDBStore.DEFAULT_DIRECTORY;
+ override def setDirectory(dir: File) {
+ directory = dir
+ }
+ override def getDirectory: File = {
+ return directory
+ }
+
+ @BeanProperty
+ var logSize: Long = 1024 * 1024 * 100
+ @BeanProperty
+ var indexFactory: String = "org.fusesource.leveldbjni.JniDBFactory,
org.iq80.leveldb.impl.Iq80DBFactory"
+ @BeanProperty
+ var sync: Boolean = true
+ @BeanProperty
+ var verifyChecksums: Boolean = false
+ @BeanProperty
+ var indexMaxOpenFiles: Int = 1000
+ @BeanProperty
+ var indexBlockRestartInterval: Int = 16
+ @BeanProperty
+ var paranoidChecks: Boolean = false
+ @BeanProperty
+ var indexWriteBufferSize: Int = 1024 * 1024 * 6
+ @BeanProperty
+ var indexBlockSize: Int = 4 * 1024
+ @BeanProperty
+ var indexCompression: String = "snappy"
+ @BeanProperty
+ var logCompression: String = "none"
+ @BeanProperty
+ var indexCacheSize: Long = 1024 * 1024 * 256L
+ @BeanProperty
+ var flushDelay = 1000 * 5
+ @BeanProperty
+ var asyncBufferSize = 1024 * 1024 * 4
+ @BeanProperty
+ var monitorStats = false
+
+ def cluster_size_quorum = minReplica + 1
+
+ def cluster_size_max = (minReplica << 2) + 1
+
+ var master: MasterLevelDBStore = _
+ var slave: SlaveLevelDBStore = _
+
+ var zk_client: ZKClient = _
+ var zk_group: Group = _
+ var master_elector: MasterElector = _
+
+ var position: Long = -1L
+
+ def init() {
+
+ // Figure out our position in the store.
+ directory.mkdirs()
+ val log = new RecordLog(directory, LevelDBClient.LOG_SUFFIX)
+ log.logSize = logSize
+ log.open()
+ position = try {
+ log.current_appender.append_position
+ } finally {
+ log.close
+ }
+
+ zk_client = new ZKClient(zkAddress, Timespan.parse(zkSessionTmeout), null)
+ if( zkPassword!=null ) {
+ zk_client.setPassword(zkPassword)
+ }
+ zk_client.start
+ zk_client.waitForConnected(Timespan.parse("30s"))
+
+ val zk_group = ZooKeeperGroupFactory.create(zk_client, zkPath)
+ val master_elector = new MasterElector(this)
+ master_elector.start(zk_group)
+ master_elector.join
+
+ this.setUseLock(true)
+ this.setLocker(createDefaultLocker())
+ }
+
+ def createDefaultLocker(): Locker = new Locker {
+
+ def configure(persistenceAdapter: PersistenceAdapter) {}
+ def setFailIfLocked(failIfLocked: Boolean) {}
+ def setLockAcquireSleepInterval(lockAcquireSleepInterval: Long) {}
+ def setName(name: String) {}
+
+ def start() = {
+ master_started_latch.await()
+ }
+
+ def keepAlive(): Boolean = {
+ master_started.get()
+ }
+
+ def stop() {}
+ }
+
+
+ val master_started_latch = new CountDownLatch(1)
+ val master_started = new AtomicBoolean(false)
+
+ def start_master(func: (Int) => Unit) = {
+ assert(master==null)
+ master = create_master()
+ master.blocking_executor.execute(^{
+ master_started.set(true)
+ master.start();
+ master_started_latch.countDown()
+ func(master.getPort)
+ })
+ }
+
+ def isMaster = master_started.get() && !master_stopped.get()
+
+ val stopped_latch = new CountDownLatch(1)
+ val master_stopped = new AtomicBoolean(false)
+
+ def stop_master(func: => Unit) = {
+ assert(master!=null)
+ master.blocking_executor.execute(^{
+ master.stop();
+ master_stopped.set(true)
+ position = master.wal_append_position
+ stopped_latch.countDown()
+ func
+ })
+ }
+
+ protected def doStart() = {
+ master_started_latch.await()
+ }
+
+ protected def doStop(stopper: ServiceStopper) {
+ zk_client.close()
+ zk_client = null
+ if( master_started.get() ) {
+ stopped_latch.countDown()
+ }
+ }
+
+ def start_slave(address: String)(func: => Unit) = {
+ assert(master==null)
+ slave = create_slave()
+ slave.connect = address
+ slave.blocking_executor.execute(^{
+ slave.start();
+ func
+ })
+ }
+
+ def stop_slave(func: => Unit) = {
+ if( slave!=null ) {
+ val s = slave
+ slave = null
+ s.blocking_executor.execute(^{
+ s.stop();
+ position = s.wal_append_position
+ func
+ })
+ }
+ }
+
+ def create_slave() = {
+ val slave = new SlaveLevelDBStore();
+ configure(slave)
+ slave
+ }
+
+ def create_master() = {
+ val master = new MasterLevelDBStore
+ configure(master)
+ master.minReplica = minReplica
+ master.bind = bind
+ master
+ }
+
+ override def setBrokerName(brokerName: String): Unit = {
+ this.brokerName = brokerName
+ }
+
+ def configure(store: ReplicatedLevelDBStoreTrait) {
+ store.directory = directory
+ store.indexFactory = indexFactory
+ store.sync = sync
+ store.verifyChecksums = verifyChecksums
+ store.indexMaxOpenFiles = indexMaxOpenFiles
+ store.indexBlockRestartInterval = indexBlockRestartInterval
+ store.paranoidChecks = paranoidChecks
+ store.indexWriteBufferSize = indexWriteBufferSize
+ store.indexBlockSize = indexBlockSize
+ store.indexCompression = indexCompression
+ store.logCompression = logCompression
+ store.indexCacheSize = indexCacheSize
+ store.flushDelay = flushDelay
+ store.asyncBufferSize = asyncBufferSize
+ store.monitorStats = monitorStats
+ store.securityToken = securityToken
+ store.setBrokerName(brokerName)
+ store.setBrokerService(brokerService)
+ }
+
+ def address(port: Int) = {
+ if (hostname == null) {
+ hostname = machine_hostname
+ }
+ "tcp://" + hostname + ":" + port
+ }
+
+}
Added:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala?rev=1477387&view=auto
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
(added)
+++
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
Mon Apr 29 22:33:26 2013
@@ -0,0 +1,205 @@
+package org.apache.activemq.leveldb.replicated
+
+import org.fusesource.fabric.groups._
+import org.codehaus.jackson.annotate.JsonProperty
+import org.apache.activemq.leveldb.util.{Log, JsonCodec}
+
+
+class LevelDBNodeState extends NodeState {
+
+ @JsonProperty
+ var id: String = _
+
+ @JsonProperty
+ var address: String = _
+
+ @JsonProperty
+ var position: Long = -1
+
+ @JsonProperty
+ var elected: String = _
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case x:LevelDBNodeState =>
+ x.id == id &&
+ x.address == address &&
+ x.position == position &&
+ x.elected == elected
+ case _ => false
+ }
+ }
+
+ override
+ def toString = JsonCodec.encode(this).ascii().toString
+
+}
+
+object MasterElector extends Log
+
+/**
+ */
+class MasterElector(store: ElectingLevelDBStore) extends
ClusteredSingleton[LevelDBNodeState](classOf[LevelDBNodeState]) {
+
+ import MasterElector._
+
+ var last_state: LevelDBNodeState = _
+ var elected: String = _
+ var position: Long = -1
+ var address: String = _
+ var updating_store = false
+ var next_connect: String = _
+ var connected_address: String = _
+
+ def join: Unit = this.synchronized {
+ last_state = create_state
+ join(last_state)
+ add(changle_listener)
+ }
+
+ def elector = this
+
+ def update: Unit = elector.synchronized {
+ var next = create_state
+ if (next != last_state) {
+ last_state = next
+ update(next)
+ }
+ }
+
+ def create_state = {
+ val rc = new LevelDBNodeState
+ rc.id = store.brokerName
+ rc.elected = elected
+ rc.position = position
+ rc.address = address
+ rc
+ }
+
+ object changle_listener extends ChangeListener {
+
+ def connected = changed
+ def disconnected = changed
+
+ def changed:Unit = elector.synchronized {
+ // info(eid+" cluster state changed: "+members)
+ if (isMaster) {
+ // We are the master elector, we will choose which node will startup
the MasterLevelDBStore
+ members.get(store.brokerName) match {
+ case None =>
+ info("Not enough cluster members connected to elect a new master.")
+ case Some(members) =>
+
+ if (members.size < store.cluster_size_quorum) {
+ info("Not enough cluster members connected to elect a new
master.")
+ } else {
+
+ // If we already elected a master, lets make sure he is still
online..
+ if (elected != null) {
+ val by_eid = Map(members: _*)
+ if (by_eid.get(elected).isEmpty) {
+ info("Previously elected master is not online, staring new
election")
+ elected = null
+ }
+ }
+
+ // Do we need to elect a new master?
+ if (elected == null) {
+ // Find the member with the most updates.
+ val sortedMembers = members.filter(_._2.position >=
0).sortWith {
+ (a, b) => a._2.position > b._2.position
+ }
+ if (sortedMembers.size != members.size) {
+ info("Not enough cluster members have reported their update
positions yet.")
+ } else {
+ // We now have an election.
+ elected = sortedMembers.head._1
+ }
+ }
+ // Sort by the positions in the cluster..
+ }
+ }
+ } else {
+ // Only the master sets the elected field.
+ elected = null
+ }
+
+ val master_elected = master.map(_.elected).getOrElse(null)
+
+ // If no master is currently elected, we need to report our current
store position.
+ // Since that will be used to select the master.
+ val connect_target = if (master_elected != null) {
+ position = -1
+ members.get(store.brokerName).get.find(_._1 ==
master_elected).map(_._2.address).getOrElse(null)
+ } else {
+ // Once we are not running a master or server, report the position..
+ if( connected_address==null && address==null && !updating_store ) {
+ position = store.position
+ }
+ null
+ }
+
+ // Do we need to stop the running master?
+ if (master_elected != eid && address != null && !updating_store) {
+ info("Demoted to slave")
+ updating_store = true
+ store.stop_master {
+ elector.synchronized {
+ info("Master stopped")
+ address = null
+ changed
+ }
+ }
+ }
+
+ // Have we been promoted to being the master?
+ if (master_elected == eid && address==null && !updating_store ) {
+ info("Promoted to master")
+ updating_store = true
+ store.start_master { port =>
+ elector.synchronized {
+ updating_store = false
+ address = store.address(port)
+ info("Master started: "+address)
+ changed
+ }
+ }
+ }
+
+ // Can we become a slave?
+ if (master_elected != eid && address == null) {
+ // Did the master address change?
+ if (connect_target != connected_address) {
+
+ // Do we need to setup a new slave.
+ if (connect_target != null && !updating_store) {
+ updating_store = true
+ store.start_slave(connect_target) {
+ elector.synchronized {
+ updating_store=false
+ info("Slave started")
+ connected_address = connect_target
+ changed
+ }
+ }
+ }
+
+ // Lets stop the slave..
+ if (connect_target == null && !updating_store) {
+ updating_store = true
+ store.stop_slave {
+ elector.synchronized {
+ updating_store=false
+ info("Slave stopped")
+ connected_address = null
+ changed
+ }
+ }
+ }
+ }
+ }
+
+ update
+ }
+ }
+}
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala?rev=1477387&r1=1477386&r2=1477387&view=diff
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
(original)
+++
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
Mon Apr 29 22:33:26 2013
@@ -28,7 +28,6 @@ import java.io.{IOException, File}
import java.net.{InetSocketAddress, URI}
import java.util.concurrent.atomic.AtomicLong
import scala.reflect.BeanProperty
-import java.util.UUID
class PositionSync(val position:Long, count:Int) extends CountDownLatch(count)
@@ -36,7 +35,7 @@ object MasterLevelDBStore extends Log
/**
*/
-class MasterLevelDBStore extends LevelDBStore {
+class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait
{
import MasterLevelDBStore._
import collection.JavaConversions._
@@ -45,24 +44,10 @@ class MasterLevelDBStore extends LevelDB
@BeanProperty
var bind = "tcp://0.0.0.0:61619"
@BeanProperty
- var securityToken = ""
- @BeanProperty
var minReplica = 1
val slaves = new ConcurrentHashMap[String,SlaveState]()
- def replicaId:String = {
- val replicaid_file = directory / "replicaid.txt"
- if( replicaid_file.exists() ) {
- replicaid_file.readText()
- } else {
- val rc = UUID.randomUUID().toString
- replicaid_file.getParentFile.mkdirs()
- replicaid_file.writeText(rc)
- rc
- }
- }
-
override def doStart = {
super.doStart
start_protocol_server
@@ -79,7 +64,6 @@ class MasterLevelDBStore extends LevelDB
override def createClient = new MasterLevelDBClient(this)
def master_client = client.asInstanceOf[MasterLevelDBClient]
-
//////////////////////////////////////
// Replication Protocol Stuff
//////////////////////////////////////
@@ -112,20 +96,6 @@ class MasterLevelDBStore extends LevelDB
transport_server.stop(NOOP)
}
-
- case class HawtCallback[T](cb:(T)=>Unit) extends Function1[T, Unit] {
- val queue = getCurrentQueue
- def apply(value:T) = {
- if( queue==null || queue.isExecuting ) {
- cb(value)
- } else {
- queue {
- cb(value)
- }
- }
- }
- }
-
class Session(transport: Transport) extends TransportHandler(transport) {
var login:Login = _
@@ -347,4 +317,6 @@ class MasterLevelDBStore extends LevelDB
}
}
+ def wal_append_position = client.wal_append_position
+
}
Added:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala?rev=1477387&view=auto
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
(added)
+++
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
Mon Apr 29 22:33:26 2013
@@ -0,0 +1,114 @@
+package org.apache.activemq.leveldb.replicated
+
+import org.apache.activemq.broker.{LockableServiceSupport, BrokerService,
BrokerServiceAware, ConnectionContext}
+import org.apache.activemq.command._
+import org.apache.activemq.leveldb.LevelDBStore
+import org.apache.activemq.store._
+import org.apache.activemq.usage.SystemUsage
+import java.io.File
+import java.io.IOException
+import java.util.Set
+import org.apache.activemq.util.{ServiceStopper, ServiceSupport}
+
+/**
+ */
+abstract class ProxyLevelDBStore extends LockableServiceSupport with
BrokerServiceAware with PersistenceAdapter with TransactionStore with
PListStore {
+
+ def proxy_target: LevelDBStore
+
+ def beginTransaction(context: ConnectionContext) {
+ proxy_target.beginTransaction(context)
+ }
+
+ def getLastProducerSequenceId(id: ProducerId): Long = {
+ return proxy_target.getLastProducerSequenceId(id)
+ }
+
+ def createTopicMessageStore(destination: ActiveMQTopic): TopicMessageStore =
{
+ return proxy_target.createTopicMessageStore(destination)
+ }
+
+ def setDirectory(dir: File) {
+ proxy_target.setDirectory(dir)
+ }
+
+ def checkpoint(sync: Boolean) {
+ proxy_target.checkpoint(sync)
+ }
+
+ def createTransactionStore: TransactionStore = {
+ return proxy_target.createTransactionStore
+ }
+
+ def setUsageManager(usageManager: SystemUsage) {
+ proxy_target.setUsageManager(usageManager)
+ }
+
+ def commitTransaction(context: ConnectionContext) {
+ proxy_target.commitTransaction(context)
+ }
+
+ def getLastMessageBrokerSequenceId: Long = {
+ return proxy_target.getLastMessageBrokerSequenceId
+ }
+
+ def setBrokerName(brokerName: String) {
+ proxy_target.setBrokerName(brokerName)
+ }
+
+ def rollbackTransaction(context: ConnectionContext) {
+ proxy_target.rollbackTransaction(context)
+ }
+
+ def removeTopicMessageStore(destination: ActiveMQTopic) {
+ proxy_target.removeTopicMessageStore(destination)
+ }
+
+ def getDirectory: File = {
+ return proxy_target.getDirectory
+ }
+
+ def size: Long = {
+ return proxy_target.size
+ }
+
+ def removeQueueMessageStore(destination: ActiveMQQueue) {
+ proxy_target.removeQueueMessageStore(destination)
+ }
+
+ def createQueueMessageStore(destination: ActiveMQQueue): MessageStore = {
+ return proxy_target.createQueueMessageStore(destination)
+ }
+
+ def deleteAllMessages {
+ proxy_target.deleteAllMessages
+ }
+
+ def getDestinations: Set[ActiveMQDestination] = {
+ return proxy_target.getDestinations
+ }
+
+ def rollback(txid: TransactionId) {
+ proxy_target.rollback(txid)
+ }
+
+ def recover(listener: TransactionRecoveryListener) {
+ proxy_target.recover(listener)
+ }
+
+ def prepare(txid: TransactionId) {
+ proxy_target.prepare(txid)
+ }
+
+ def commit(txid: TransactionId, wasPrepared: Boolean, preCommit: Runnable,
postCommit: Runnable) {
+ proxy_target.commit(txid, wasPrepared, preCommit, postCommit)
+ }
+
+ def getPList(name: String): PList = {
+ return proxy_target.getPList(name)
+ }
+
+ def removePList(name: String): Boolean = {
+ return proxy_target.removePList(name)
+ }
+}
\ No newline at end of file
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala?rev=1477387&r1=1477386&r2=1477387&view=diff
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
(original)
+++
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
Mon Apr 29 22:33:26 2013
@@ -16,7 +16,7 @@
*/
package org.apache.activemq.leveldb.replicated
-import org.apache.activemq.leveldb.{LevelDBClient, LevelDBStore}
+import org.apache.activemq.leveldb.LevelDBStore
import org.apache.activemq.util.ServiceStopper
import java.util
import org.fusesource.hawtdispatch._
@@ -29,13 +29,12 @@ import org.apache.activemq.leveldb.util.
import FileSupport._
import java.io.{IOException, RandomAccessFile, File}
import scala.reflect.BeanProperty
-import java.util.UUID
object SlaveLevelDBStore extends Log
/**
*/
-class SlaveLevelDBStore extends LevelDBStore {
+class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
import SlaveLevelDBStore._
import ReplicationSupport._
@@ -43,34 +42,14 @@ class SlaveLevelDBStore extends LevelDBS
@BeanProperty
var connect = "tcp://0.0.0.0:61619"
- @BeanProperty
- var securityToken = ""
val queue = createQueue("leveldb replication slave")
var replay_from = 0L
var caughtUp = false
- override def createClient = new SlaveLevelDBClient(this)
- def slave_client = client.asInstanceOf[SlaveLevelDBClient]
-
- class SlaveLevelDBClient(val store:SlaveLevelDBStore) extends
LevelDBClient(store) {
- }
-
var wal_session:Session = _
var transfer_session:Session = _
- def replicaId:String = {
- val replicaid_file = directory / "replicaid.txt"
- if( replicaid_file.exists() ) {
- replicaid_file.readText()
- } else {
- val rc = UUID.randomUUID().toString
- replicaid_file.getParentFile.mkdirs()
- replicaid_file.writeText(rc)
- rc
- }
- }
-
override def doStart() = {
client.init()
Added:
activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java?rev=1477387&view=auto
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
(added)
+++
activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ElectingLevelDBStoreTest.java
Mon Apr 29 22:33:26 2013
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.test;
+
+import junit.framework.TestCase;
+import org.apache.activemq.Service;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.leveldb.CountDownFuture;
+import org.apache.activemq.leveldb.LevelDBStore;
+import org.apache.activemq.leveldb.replicated.ElectingLevelDBStore;
+import org.apache.activemq.leveldb.util.FileSupport;
+import org.apache.activemq.store.MessageStore;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import static org.apache.activemq.leveldb.test.ReplicationTestSupport.*;
+
+/**
+ */
+public class ElectingLevelDBStoreTest extends TestCase {
+ protected static final Logger LOG =
LoggerFactory.getLogger(ElectingLevelDBStoreTest.class);
+
+ NIOServerCnxnFactory connector;
+
+ static File data_dir() {
+ return new File("target/activemq-data/leveldb-elections");
+ }
+
+
+ @Override
+ protected void setUp() throws Exception {
+ FileSupport.toRichFile(data_dir()).recursiveDelete();
+
+ System.out.println("Starting ZooKeeper");
+ ZooKeeperServer zk_server = new ZooKeeperServer();
+ zk_server.setTickTime(500);
+ zk_server.setTxnLogFactory(new FileTxnSnapLog(new File(data_dir(),
"zk-log"), new File(data_dir(), "zk-data")));
+ connector = new NIOServerCnxnFactory();
+ connector.configure(new InetSocketAddress(0), 100);
+ connector.startup(zk_server);
+ System.out.println("ZooKeeper Started");
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ if( connector!=null ) {
+ connector.shutdown();
+ connector = null;
+ }
+ }
+
+ public void testElection() throws Exception {
+
+ ArrayList<ElectingLevelDBStore> stores = new
ArrayList<ElectingLevelDBStore>();
+ ArrayList<CountDownFuture> pending_starts = new
ArrayList<CountDownFuture>();
+
+ for(String dir: new String[]{"leveldb-node1", "leveldb-node2",
"leveldb-node3"}) {
+ ElectingLevelDBStore store = createStoreNode();
+ store.setDirectory(new File(data_dir(), dir));
+ stores.add(store);
+ pending_starts.add(asyncStart(store));
+ }
+
+ // At least one of the stores should have started.
+ CountDownFuture f = waitFor(30 * 1000, pending_starts.toArray(new
CountDownFuture[pending_starts.size()]));
+ assertTrue(f!=null);
+ pending_starts.remove(f);
+
+ // The other stores should not start..
+ LOG.info("Making sure the other stores don't start");
+ Thread.sleep(5000);
+ for(CountDownFuture start: pending_starts) {
+ assertFalse(start.completed());
+ }
+
+ // Make sure only of the stores is reporting to be the master.
+ ElectingLevelDBStore master = null;
+ for(ElectingLevelDBStore store: stores) {
+ if( store.isMaster() ) {
+ assertNull(master);
+ master = store;
+ }
+ }
+ assertNotNull(master);
+
+ // We can work out who the slaves are...
+ HashSet<ElectingLevelDBStore> slaves = new
HashSet<ElectingLevelDBStore>(stores);
+ slaves.remove(master);
+
+ // Start sending messages to the master.
+ ArrayList<String> expected_list = new ArrayList<String>();
+ MessageStore ms = master.createQueueMessageStore(new
ActiveMQQueue("TEST"));
+ final int TOTAL = 500;
+ for (int i = 0; i < TOTAL; i++) {
+ if (i % ((int) (TOTAL * 0.10)) == 0) {
+ LOG.info("" + (100 * i / TOTAL) + "% done");
+ }
+
+ if( i == 250 ) {
+
+ LOG.info("Checking master state");
+ assertEquals(expected_list, getMessages(ms));
+
+ // mid way, lets kill the master..
+ LOG.info("Killing Master.");
+ master.stop();
+
+ // At least one of the remaining stores should complete
starting.
+ LOG.info("Waiting for slave takeover...");
+ f = waitFor(60 * 1000, pending_starts.toArray(new
CountDownFuture[pending_starts.size()]));
+ assertTrue(f!=null);
+ pending_starts.remove(f);
+
+ // Make sure one and only one of the slaves becomes the
master..
+ master = null;
+ for(ElectingLevelDBStore store: slaves) {
+ if( store.isMaster() ) {
+ assertNull(master);
+ master = store;
+ }
+ }
+
+ assertNotNull(master);
+ slaves.remove(master);
+
+ ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
+ }
+
+ String msgid = "m:" + i;
+ addMessage(ms, msgid);
+ expected_list.add(msgid);
+ }
+
+ LOG.info("Checking master state");
+ assertEquals(expected_list, getMessages(ms));
+
+ master.stop();
+ for(ElectingLevelDBStore store: stores) {
+ store.stop();
+ }
+ }
+
+ private CountDownFuture waitFor(int timeout, CountDownFuture... futures)
throws InterruptedException {
+ long deadline = System.currentTimeMillis()+timeout;
+ while( true ) {
+ for (CountDownFuture f:futures) {
+ if( f.await(1, TimeUnit.MILLISECONDS) ) {
+ return f;
+ }
+ }
+ long remaining = deadline - System.currentTimeMillis();
+ if( remaining < 0 ) {
+ return null;
+ } else {
+ Thread.sleep(Math.min(remaining / 10, 100L));
+ }
+ }
+ }
+
+ private CountDownFuture asyncStart(final Service service) {
+ final CountDownFuture<Throwable> f = new CountDownFuture<Throwable>();
+ LevelDBStore.BLOCKING_EXECUTOR().execute(new Runnable() {
+ public void run() {
+ try {
+ service.start();
+ f.set(null);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ f.set(e);
+ }
+ }
+ });
+ return f;
+ }
+
+ private CountDownFuture asyncStop(final Service service) {
+ final CountDownFuture<Throwable> f = new CountDownFuture<Throwable>();
+ LevelDBStore.BLOCKING_EXECUTOR().execute(new Runnable() {
+ public void run() {
+ try {
+ service.stop();
+ f.set(null);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ f.set(e);
+ }
+ }
+ });
+ return f;
+ }
+
+ private ElectingLevelDBStore createStoreNode() {
+ ElectingLevelDBStore store = new ElectingLevelDBStore();
+ store.setSecurityToken("foo");
+ store.setLogSize(1023 * 200);
+ store.setMinReplica(1);
+ store.setZkAddress("localhost:" + connector.getLocalPort());
+ store.setZkPath("/broker-stores");
+ store.setBrokerName("foo");
+ store.setBind("tcp://0.0.0.0:0");
+ return store;
+ }
+
+}
Modified:
activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java?rev=1477387&r1=1477386&r2=1477387&view=diff
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
(original)
+++
activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java
Mon Apr 29 22:33:26 2013
@@ -37,8 +37,8 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import static org.apache.activemq.leveldb.test.ReplicationTestSupport.*;
/**
*/
@@ -61,29 +61,29 @@ public class ReplicatedLevelDBStoreTest
// Updating the store should not complete since we don't have enough
// replicas.
CountDownFuture f = asyncAddMessage(ms, "m1");
- assertFalse(f.completed().await(2, TimeUnit.SECONDS));
+ assertFalse(f.await(2, TimeUnit.SECONDS));
// Adding a slave should allow that update to complete.
SlaveLevelDBStore slave = createSlave(master, slaveDir);
slave.start();
- assertTrue(f.completed().await(2, TimeUnit.SECONDS));
+ assertTrue(f.await(2, TimeUnit.SECONDS));
// New updates should complete quickly now..
f = asyncAddMessage(ms, "m2");
- assertTrue(f.completed().await(1, TimeUnit.SECONDS));
+ assertTrue(f.await(1, TimeUnit.SECONDS));
// If the slave goes offline, then updates should once again
// not complete.
slave.stop();
f = asyncAddMessage(ms, "m3");
- assertFalse(f.completed().await(2, TimeUnit.SECONDS));
+ assertFalse(f.await(2, TimeUnit.SECONDS));
// Restart and the op should complete.
slave = createSlave(master, slaveDir);
slave.start();
- assertTrue(f.completed().await(2, TimeUnit.SECONDS));
+ assertTrue(f.await(2, TimeUnit.SECONDS));
master.stop();
slave.stop();
@@ -91,15 +91,14 @@ public class ReplicatedLevelDBStoreTest
}
private CountDownFuture asyncAddMessage(final MessageStore ms, final
String body) {
- final CountDownFuture f = new CountDownFuture(new CountDownLatch(1));
+ final CountDownFuture<Throwable> f = new CountDownFuture<Throwable>();
LevelDBStore.BLOCKING_EXECUTOR().execute(new Runnable() {
public void run() {
try {
addMessage(ms, body);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- f.countDown();
+ f.set(null);
+ } catch (Throwable e) {
+ f.set(e);
}
}
});
@@ -114,13 +113,13 @@ public class ReplicatedLevelDBStoreTest
directories.add(new File("target/activemq-data/leveldb-node2"));
directories.add(new File("target/activemq-data/leveldb-node3"));
- for( File f: directories) {
+ for (File f : directories) {
FileSupport.toRichFile(f).recursiveDelete();
}
ArrayList<String> expected_list = new ArrayList<String>();
// We will rotate between 3 nodes the task of being the master.
- for( int j=0; j < 10; j++) {
+ for (int j = 0; j < 10; j++) {
MasterLevelDBStore master = createMaster(directories.get(0));
master.start();
@@ -132,11 +131,11 @@ public class ReplicatedLevelDBStoreTest
MessageStore ms = master.createQueueMessageStore(new
ActiveMQQueue("TEST"));
final int TOTAL = 500;
for (int i = 0; i < TOTAL; i++) {
- if ( i % ((int) (TOTAL * 0.10)) == 0) {
- LOG.info("" + (100*i/TOTAL) + "% done");
+ if (i % ((int) (TOTAL * 0.10)) == 0) {
+ LOG.info("" + (100 * i / TOTAL) + "% done");
}
- if( i == 250 ) {
+ if (i == 250) {
slave1.start();
slave2.stop();
}
@@ -149,9 +148,9 @@ public class ReplicatedLevelDBStoreTest
LOG.info("Checking master state");
assertEquals(expected_list, getMessages(ms));
- LOG.info("Stopping master: "+master.replicaId());
+ LOG.info("Stopping master: " + master.replicaId());
master.stop();
- LOG.info("Stopping slave: "+slave1.replicaId());
+ LOG.info("Stopping slave: " + slave1.replicaId());
slave1.stop();
// Rotate the dir order so that slave1 becomes the master next.
@@ -164,7 +163,7 @@ public class ReplicatedLevelDBStoreTest
slave1.setDirectory(directory);
slave1.setConnect("tcp://127.0.0.1:" + master.getPort());
slave1.setSecurityToken("foo");
- slave1.setLogSize(1023*200);
+ slave1.setLogSize(1023 * 200);
return slave1;
}
@@ -178,49 +177,5 @@ public class ReplicatedLevelDBStoreTest
return master;
}
- long id_counter = 0L;
- String payload = "";
- {
- for (int i = 0; i < 1024; i++) {
- payload += "x";
- }
- }
-
- public ActiveMQTextMessage addMessage(MessageStore ms, String body) throws
JMSException, IOException {
- ActiveMQTextMessage message = new ActiveMQTextMessage();
- message.setPersistent(true);
- message.setResponseRequired(true);
- message.setStringProperty("id", body);
- message.setText(payload);
- id_counter += 1;
- MessageId messageId = new
MessageId("ID:localhost-56913-1254499826208-0:0:1:1:" + id_counter);
- messageId.setBrokerSequenceId(id_counter);
- message.setMessageId(messageId);
- ms.addMessage(new ConnectionContext(), message);
- return message;
- }
-
- public ArrayList<String> getMessages(MessageStore ms) throws Exception {
- final ArrayList<String> rc = new ArrayList<String>();
- ms.recover(new MessageRecoveryListener() {
- public boolean recoverMessage(Message message) throws Exception {
- rc.add(((ActiveMQTextMessage)
message).getStringProperty("id"));
- return true;
- }
-
- public boolean hasSpace() {
- return true;
- }
-
- public boolean recoverMessageReference(MessageId ref) throws
Exception {
- return true;
- }
-
- public boolean isDuplicate(MessageId ref) {
- return false;
- }
- });
- return rc;
- }
}
Added:
activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java?rev=1477387&view=auto
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
(added)
+++
activemq/trunk/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java
Mon Apr 29 22:33:26 2013
@@ -0,0 +1,62 @@
+package org.apache.activemq.leveldb.test;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.util.ArrayList;
+
+/**
+ */
+public class ReplicationTestSupport {
+
+ static long id_counter = 0L;
+ static String payload = "";
+ {
+ for (int i = 0; i < 1024; i++) {
+ payload += "x";
+ }
+ }
+
+ static public ActiveMQTextMessage addMessage(MessageStore ms, String body)
throws JMSException, IOException {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setPersistent(true);
+ message.setResponseRequired(true);
+ message.setStringProperty("id", body);
+ message.setText(payload);
+ id_counter += 1;
+ MessageId messageId = new
MessageId("ID:localhost-56913-1254499826208-0:0:1:1:" + id_counter);
+ messageId.setBrokerSequenceId(id_counter);
+ message.setMessageId(messageId);
+ ms.addMessage(new ConnectionContext(), message);
+ return message;
+ }
+
+ static public ArrayList<String> getMessages(MessageStore ms) throws
Exception {
+ final ArrayList<String> rc = new ArrayList<String>();
+ ms.recover(new MessageRecoveryListener() {
+ public boolean recoverMessage(Message message) throws Exception {
+ rc.add(((ActiveMQTextMessage)
message).getStringProperty("id"));
+ return true;
+ }
+
+ public boolean hasSpace() {
+ return true;
+ }
+
+ public boolean recoverMessageReference(MessageId ref) throws
Exception {
+ return true;
+ }
+
+ public boolean isDuplicate(MessageId ref) {
+ return false;
+ }
+ });
+ return rc;
+ }
+}