Author: chirino
Date: Fri Dec 31 01:21:29 2010
New Revision: 1054038
URL: http://svn.apache.org/viewvc?rev=1054038&view=rev
Log:
store-import store-export commands are now working for the bdb and jdbm2 stores.
Added:
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
- copied, changed from r1054037,
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/META-INF/services/org.apache.activemq.apollo/commands.index
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/Stop.scala
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1054038&r1=1054037&r2=1054038&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
Fri Dec 31 01:21:29 2010
@@ -451,7 +451,7 @@ class BDBClient(store: BDBStore) extends
import PBSupport._
streams.using_queue_stream { queue_stream=>
- foreach(queue_stream, QueuePB.FACTORY) { pb=>
+ foreach[QueuePB.Buffer](queue_stream, QueuePB.FACTORY) { pb=>
val record:QueueRecord = pb
queues_db.put(tx, record.key, record)
with_entries_db(record.key) { entriesdb=>
@@ -460,14 +460,14 @@ class BDBClient(store: BDBStore) extends
}
streams.using_message_stream { message_stream=>
- foreach(message_stream, MessagePB.FACTORY) { pb=>
+ foreach[MessagePB.Buffer](message_stream, MessagePB.FACTORY) { pb=>
val record:MessageRecord = pb
messages_db.put(tx, record.key, record)
}
}
streams.using_queue_entry_stream { queue_entry_stream=>
- foreach(queue_entry_stream, QueueEntryPB.FACTORY) { pb=>
+ foreach[QueueEntryPB.Buffer](queue_entry_stream,
QueueEntryPB.FACTORY) { pb=>
val record:QueueEntryRecord = pb
with_entries_db(record.queue_key) { entries_db=>
Modified:
activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/META-INF/services/org.apache.activemq.apollo/commands.index
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/META-INF/services/org.apache.activemq.apollo/commands.index?rev=1054038&r1=1054037&r2=1054038&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/META-INF/services/org.apache.activemq.apollo/commands.index
(original)
+++
activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/META-INF/services/org.apache.activemq.apollo/commands.index
Fri Dec 31 01:21:29 2010
@@ -22,3 +22,5 @@ org.apache.activemq.apollo.cli.commands.
org.apache.activemq.apollo.cli.commands.Run
org.apache.activemq.apollo.cli.commands.Encrypt
org.apache.activemq.apollo.cli.commands.Decrypt
+org.apache.activemq.apollo.cli.commands.StoreExport
+org.apache.activemq.apollo.cli.commands.StoreImport
Modified:
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/Stop.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/Stop.scala?rev=1054038&r1=1054037&r2=1054038&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/Stop.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/Stop.scala
Fri Dec 31 01:21:29 2010
@@ -56,11 +56,7 @@ class Stop extends Action with Logging {
error("Configuration file'%s' does not exist.\n\nTry creating a broker
instance using the 'apollo create' command.".format(conf));
}
- val store = new FileConfigStore
- store.file = conf
- store.start
- val config = store.load(true)
-
+ val config = new FileConfigStore(conf).load(true)
val web_admin = config.web_admin.getOrElse(new WebAdminDTO)
if( web_admin.enabled.getOrElse(true) ) {
Modified:
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala?rev=1054038&r1=1054037&r2=1054038&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
Fri Dec 31 01:21:29 2010
@@ -19,17 +19,13 @@ package org.apache.activemq.apollo.cli.c
import org.apache.felix.gogo.commands.{Action, Option => option, Argument =>
argument, Command => command}
import org.osgi.service.command.CommandSession
import org.apache.activemq.apollo.util.FileSupport._
-import org.apache.activemq.apollo.util.OptionSupport._
-import org.apache.commons.codec.binary.Base64
-import java.net.{HttpURLConnection, URL}
-import org.apache.activemq.apollo.broker.{VirtualHost, FileConfigStore}
-import org.apache.activemq.apollo.dto.{VirtualHostDTO, WebAdminDTO}
-import org.apache.activemq.apollo.util.{ServiceControl, Log,
DirectBufferPoolFactory, Logging}
-import java.util.zip.{ZipEntry, ZipOutputStream, ZipFile}
+import org.apache.activemq.apollo.broker.FileConfigStore
+import org.apache.activemq.apollo.dto.VirtualHostDTO
+import org.apache.activemq.apollo.util._
+import java.util.zip.{ZipEntry, ZipOutputStream}
import org.apache.activemq.apollo.broker.store.{StreamManager, StoreFactory}
import java.io.{OutputStream, FileOutputStream, File}
-
-object StoreExport extends Log
+import scala.util.continuations._
/**
* The apollo stop command
@@ -37,6 +33,8 @@ object StoreExport extends Log
@command(scope="apollo", name = "store-export", description = "exports the
contents of a broker message store")
class StoreExport extends Action {
+ object StoreExport extends Log
+
@option(name = "--conf", description = "The Apollo configuration file.")
var conf: File = _
@@ -61,10 +59,7 @@ class StoreExport extends Action {
error("Configuration file'%s' does not exist.\n\nTry creating a broker
instance using the 'apollo create' command.".format(conf));
}
- val config_store = new FileConfigStore
- config_store.file = conf
- config_store.start
- val config = config_store.load(true)
+ val config = new FileConfigStore(conf).load(true)
val hosts =
collection.JavaConversions.asScalaIterable(config.virtual_hosts).toArray
val vho:Option[VirtualHostDTO] = if( host==null ) {
@@ -83,14 +78,14 @@ class StoreExport extends Action {
error("Could not create the store.")
}
- store.configure(config.store, LoggingReporter(StoreExport))
+ store.configure(vh.store, LoggingReporter(StoreExport))
ServiceControl.start(store, "store startup")
using( new ZipOutputStream(new FileOutputStream(dest))) { out=>
out.setMethod(ZipEntry.DEFLATED)
out.setLevel(9)
- store.export_pb(new StreamManager[OutputStream]() {
+ val manager = new StreamManager[OutputStream]() {
def entry(name:String, func: (OutputStream) => Unit) = {
out.putNextEntry(new ZipEntry(name));
func(out)
@@ -99,7 +94,11 @@ class StoreExport extends Action {
def using_queue_stream(func: (OutputStream) => Unit) =
entry("queues.dat", func)
def using_queue_entry_stream(func: (OutputStream) => Unit) =
entry("queue_entries.dat", func)
def using_message_stream(func: (OutputStream) => Unit) =
entry("messages.dat", func)
- })
+ }
+ reset {
+ val rc = store.export_pb(manager)
+ rc.failure_option.foreach(error _)
+ }
}
ServiceControl.stop(store, "store stop");
Copied:
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
(from r1054037,
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala)
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala?p2=activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala&p1=activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala&r1=1054037&r2=1054038&rev=1054038&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
Fri Dec 31 01:21:29 2010
@@ -19,23 +19,22 @@ package org.apache.activemq.apollo.cli.c
import org.apache.felix.gogo.commands.{Action, Option => option, Argument =>
argument, Command => command}
import org.osgi.service.command.CommandSession
import org.apache.activemq.apollo.util.FileSupport._
-import org.apache.activemq.apollo.util.OptionSupport._
-import org.apache.commons.codec.binary.Base64
-import java.net.{HttpURLConnection, URL}
-import org.apache.activemq.apollo.broker.{VirtualHost, FileConfigStore}
-import org.apache.activemq.apollo.dto.{VirtualHostDTO, WebAdminDTO}
-import org.apache.activemq.apollo.util.{ServiceControl, Log,
DirectBufferPoolFactory, Logging}
-import java.util.zip.{ZipEntry, ZipOutputStream, ZipFile}
+import org.apache.activemq.apollo.broker.FileConfigStore
+import org.apache.activemq.apollo.dto.VirtualHostDTO
+import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.broker.store.{StreamManager, StoreFactory}
-import java.io.{OutputStream, FileOutputStream, File}
+import scala.util.continuations._
+import java.util.zip.{ZipFile, ZipEntry, ZipOutputStream}
+import java.io.{InputStream, OutputStream, FileOutputStream, File}
-object StoreExport extends Log
/**
* The apollo stop command
*/
-...@command(scope="apollo", name = "store-export", description = "exports the
contents of a broker message store")
-class StoreExport extends Action {
+...@command(scope="apollo", name = "store-import", description = "imports a
previously exported message store")
+class StoreImport extends Action {
+
+ object StoreImport extends Log
@option(name = "--conf", description = "The Apollo configuration file.")
var conf: File = _
@@ -61,10 +60,7 @@ class StoreExport extends Action {
error("Configuration file'%s' does not exist.\n\nTry creating a broker
instance using the 'apollo create' command.".format(conf));
}
- val config_store = new FileConfigStore
- config_store.file = conf
- config_store.start
- val config = config_store.load(true)
+ val config = new FileConfigStore(conf).load(true)
val hosts =
collection.JavaConversions.asScalaIterable(config.virtual_hosts).toArray
val vho:Option[VirtualHostDTO] = if( host==null ) {
@@ -83,23 +79,31 @@ class StoreExport extends Action {
error("Could not create the store.")
}
- store.configure(config.store, LoggingReporter(StoreExport))
+ store.configure(vh.store, LoggingReporter(StoreImport))
ServiceControl.start(store, "store startup")
-
- using( new ZipOutputStream(new FileOutputStream(dest))) { out=>
- out.setMethod(ZipEntry.DEFLATED)
- out.setLevel(9)
- store.export_pb(new StreamManager[OutputStream]() {
- def entry(name:String, func: (OutputStream) => Unit) = {
- out.putNextEntry(new ZipEntry(name));
- func(out)
- out.closeEntry();
+ val zip = new ZipFile(dest)
+ try {
+ val manager = new StreamManager[InputStream]() {
+ def entry(name:String, func: (InputStream) => Unit) = {
+ val entry = zip.getEntry(name)
+ if(entry == null) {
+ error("Invalid data file, zip entry not found: "+name);
+ }
+ using(zip.getInputStream(entry)) { is=>
+ func(is)
+ }
}
- def using_queue_stream(func: (OutputStream) => Unit) =
entry("queues.dat", func)
- def using_queue_entry_stream(func: (OutputStream) => Unit) =
entry("queue_entries.dat", func)
- def using_message_stream(func: (OutputStream) => Unit) =
entry("messages.dat", func)
- })
+ def using_queue_stream(func: (InputStream) => Unit) =
entry("queues.dat", func)
+ def using_queue_entry_stream(func: (InputStream) => Unit) =
entry("queue_entries.dat", func)
+ def using_message_stream(func: (InputStream) => Unit) =
entry("messages.dat", func)
+ }
+ reset {
+ val rc = store.import_pb(manager)
+ rc.failure_option.foreach(error _)
+ }
+ } finally {
+ zip.close
}
ServiceControl.stop(store, "store stop");
Modified:
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1054038&r1=1054037&r2=1054038&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
Fri Dec 31 01:21:29 2010
@@ -459,11 +459,11 @@ class JDBM2Client(store: JDBM2Store) {
transaction {
- def foreach[Buffer] (stream:InputStream,
fact:PBMessageFactory[_,_])(func: (Buffer)=>Unit):Unit = {
+ def foreach[B] (stream:InputStream, fact:PBMessageFactory[_,_])(func:
(B)=>Unit):Unit = {
var done = false
do {
try {
- func(fact.parseFramed(stream).asInstanceOf[Buffer])
+ func(fact.parseFramed(stream).asInstanceOf[B])
} catch {
case x:EOFException =>
done = true
@@ -475,7 +475,7 @@ class JDBM2Client(store: JDBM2Store) {
import PBSupport._
streams.using_queue_stream { queue_stream=>
- foreach(queue_stream, QueuePB.FACTORY) { pb=>
+ foreach[QueuePB.Buffer](queue_stream, QueuePB.FACTORY) { pb =>
val record:QueueRecord = pb
queues_db.put(record.key, record)
check_flush(1, 10000)
@@ -485,7 +485,7 @@ class JDBM2Client(store: JDBM2Store) {
recman.commit
streams.using_message_stream { message_stream=>
- foreach(message_stream, MessagePB.FACTORY) { pb=>
+ foreach[MessagePB.Buffer](message_stream, MessagePB.FACTORY) { pb=>
val record:MessageRecord = pb
messages_db.put(record.key, record)
check_flush(record.size, 1024*124*10)
@@ -495,7 +495,7 @@ class JDBM2Client(store: JDBM2Store) {
recman.commit
streams.using_queue_entry_stream { queue_entry_stream=>
- foreach(queue_entry_stream, QueueEntryPB.FACTORY) { pb=>
+ foreach[QueueEntryPB.Buffer](queue_entry_stream,
QueueEntryPB.FACTORY) { pb=>
val record:QueueEntryRecord = pb
entries_db.insert((record.queue_key, record.entry_seq), record,
true)
add_message_reference(record.message_key)
Modified:
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1054038&r1=1054037&r2=1054038&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
Fri Dec 31 01:21:29 2010
@@ -71,7 +71,6 @@ class JDBM2Store extends DelayingStoreSu
var next_msg_key = new AtomicLong(1)
var executor:ExecutorService = _
- var read_executor:ExecutorService = _
var config:JDBM2StoreDTO = defaultConfig
val client = new JDBM2Client(this)