Author: chirino
Date: Thu Dec 30 19:29:37 2010
New Revision: 1053982
URL: http://svn.apache.org/viewvc?rev=1053982&view=rev
Log:
refactored the common protobuf classes to the broker module.
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/
activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
- copied, changed from r1053959,
activemq/activemq-apollo/trunk/apollo-bdb/src/main/proto/data.proto
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
Removed:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/proto/data.proto
activemq/activemq-apollo/trunk/apollo-cassandra/src/main/proto/data.proto
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/proto/data.proto
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/pom.xml
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
activemq/activemq-apollo/trunk/apollo-broker/pom.xml
activemq/activemq-apollo/trunk/apollo-cassandra/pom.xml
activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/pom.xml
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
Modified: activemq/activemq-apollo/trunk/apollo-bdb/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/pom.xml?rev=1053982&r1=1053981&r2=1053982&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/pom.xml Thu Dec 30 19:29:37 2010
@@ -45,18 +45,7 @@
<artifactId>je</artifactId>
<version>${bdb-version}</version>
</dependency>
-
- <dependency>
- <groupId>org.fusesource.hawtbuf</groupId>
- <artifactId>hawtbuf-proto</artifactId>
- <version>${hawtbuf-version}</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j-version}</version>
- </dependency>
<!-- Since we implement a jade template to display the BDB status -->
<dependency>
@@ -130,22 +119,6 @@
<plugins>
<plugin>
- <groupId>org.fusesource.hawtbuf</groupId>
- <artifactId>hawtbuf-proto</artifactId>
- <version>${hawtbuf-version}</version>
- <configuration>
- <type>alt</type>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
<groupId>org.fusesource.scalate</groupId>
<artifactId>maven-scalate-plugin</artifactId>
<version>${scalate-version}</version>
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1053982&r1=1053981&r2=1053982&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
Thu Dec 30 19:29:37 2010
@@ -280,7 +280,7 @@ class BDBClient(store: BDBStore) extends
def getQueue(queue_key: Long): Option[QueueRecord] = {
with_ctx { ctx=>
import ctx._
- queues_db.get(tx, to_DatabaseEntry(queue_key)).map( x=>
to_QueueRecord(x) )
+ queues_db.get(tx, to_database_entry(queue_key)).map( x=>
to_queue_record(x) )
}
}
@@ -329,7 +329,7 @@ class BDBClient(store: BDBStore) extends
import ctx._
with_entries_db(queue_key) { entries_db=>
- entries_db.cursor_from(tx, to_DatabaseEntry(firstSeq)) { (key, value)
=>
+ entries_db.cursor_from(tx, to_database_entry(firstSeq)) { (key, value)
=>
val entry_seq:Long = key
val entry:QueueEntryRecord = value
rc += entry
@@ -349,7 +349,7 @@ class BDBClient(store: BDBStore) extends
requests.flatMap { case (message_key, callback)=>
val record = metric_load_from_index_counter.time {
- messages_db.get(tx, to_DatabaseEntry(message_key)).map (
to_MessageRecord _ )
+ messages_db.get(tx, to_database_entry(message_key)).map (
to_message_record _ )
}
record match {
case None =>
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala?rev=1053982&r1=1053981&r2=1053982&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
Thu Dec 30 19:29:37 2010
@@ -16,85 +16,33 @@
*/
package org.apache.activemq.apollo.broker.store.bdb
-import model._
-import org.apache.activemq.apollo.broker.store.{MessageRecord, QueueRecord,
QueueEntryRecord}
import java.util.Comparator
import java.nio.ByteBuffer
import com.sleepycat.je._
import java.io.Serializable
+import org.apache.activemq.apollo.broker.store.{PBSupport, MessageRecord,
QueueRecord, QueueEntryRecord}
+import PBSupport._
object HelperTrait {
- implicit def to_MessageRecord(entry: DatabaseEntry): MessageRecord = {
- val pb = MessagePB.FACTORY.parseUnframed(entry.getData)
- val rc = new MessageRecord
- rc.key = pb.getMessageKey
- rc.protocol = pb.getProtocol
- rc.size = pb.getSize
- rc.buffer = pb.getValue
- rc.expiration = pb.getExpiration
- rc
- }
-
- implicit def to_DatabaseEntry(v: MessageRecord): DatabaseEntry = {
- val pb = new MessagePB.Bean
- pb.setMessageKey(v.key)
- pb.setProtocol(v.protocol)
- pb.setSize(v.size)
- pb.setValue(v.buffer)
- pb.setExpiration(v.expiration)
- new DatabaseEntry(pb.freeze.toUnframedByteArray)
- }
-
- implicit def to_QueueEntryRecord(entry: DatabaseEntry): QueueEntryRecord = {
- val pb = QueueEntryPB.FACTORY.parseUnframed(entry.getData)
- val rc = new QueueEntryRecord
- rc.queue_key = pb.getQueueKey
- rc.entry_seq = pb.getQueueSeq
- rc.message_key = pb.getMessageKey
- rc.attachment = pb.getAttachment
- rc.size = pb.getSize
- rc.redeliveries = pb.getRedeliveries.toShort
- rc
- }
+ implicit def to_message_record(entry: DatabaseEntry): MessageRecord =
entry.getData
+ implicit def to_database_entry(v: MessageRecord): DatabaseEntry = new
DatabaseEntry(v)
- implicit def to_DatabaseEntry(v: QueueEntryRecord): DatabaseEntry = {
- val pb = new QueueEntryPB.Bean
- pb.setQueueKey(v.queue_key)
- pb.setQueueSeq(v.entry_seq)
- pb.setMessageKey(v.message_key)
- pb.setAttachment(v.attachment)
- pb.setSize(v.size)
- pb.setRedeliveries(v.redeliveries)
- new DatabaseEntry(pb.freeze.toUnframedByteArray)
- }
+ implicit def to_queue_entry_record(entry: DatabaseEntry): QueueEntryRecord =
entry.getData
+ implicit def to_database_entry(v: QueueEntryRecord): DatabaseEntry = new
DatabaseEntry(v)
- implicit def to_QueueRecord(entry: DatabaseEntry): QueueRecord = {
- val pb = QueuePB.FACTORY.parseUnframed(entry.getData)
- val rc = new QueueRecord
- rc.key = pb.getKey
- rc.binding_data = pb.getBindingData
- rc.binding_kind = pb.getBindingKind
- rc
- }
-
- implicit def to_DatabaseEntry(v: QueueRecord): DatabaseEntry = {
- val pb = new QueuePB.Bean
- pb.setKey(v.key)
- pb.setBindingData(v.binding_data)
- pb.setBindingKind(v.binding_kind)
- new DatabaseEntry(pb.freeze.toUnframedByteArray)
- }
+ implicit def to_queue_record(entry: DatabaseEntry): QueueRecord =
entry.getData
+ implicit def to_database_entry(v: QueueRecord): DatabaseEntry = new
DatabaseEntry(v)
implicit def to_bytes(l:Long):Array[Byte] = ByteBuffer.wrap(new
Array[Byte](8)).putLong(l).array()
implicit def to_long(bytes:Array[Byte]):Long =
ByteBuffer.wrap(bytes).getLong()
- implicit def to_DatabaseEntry(l:Long):DatabaseEntry = new
DatabaseEntry(to_bytes(l))
+ implicit def to_database_entry(l:Long):DatabaseEntry = new
DatabaseEntry(to_bytes(l))
implicit def to_long(value:DatabaseEntry):Long = to_long(value.getData)
implicit def to_bytes(l:Int):Array[Byte] = ByteBuffer.wrap(new
Array[Byte](4)).putInt(l).array()
implicit def to_int(bytes:Array[Byte]):Int = ByteBuffer.wrap(bytes).getInt()
- implicit def to_DatabaseEntry(l:Int):DatabaseEntry = new
DatabaseEntry(to_bytes(l))
+ implicit def to_database_entry(l:Int):DatabaseEntry = new
DatabaseEntry(to_bytes(l))
implicit def to_int(value:DatabaseEntry):Int = to_int(value.getData)
@@ -168,7 +116,8 @@ object HelperTrait {
}
}
}
- implicit def DatabaseWrapper(x: Database) = new RichDatabase(x)
+
+ implicit def to_rich_database(x: Database) = new RichDatabase(x)
def entries_db_name(queue_key: Long): String = "entries-" + queue_key
Modified: activemq/activemq-apollo/trunk/apollo-broker/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/pom.xml?rev=1053982&r1=1053981&r2=1053982&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/pom.xml Thu Dec 30 19:29:37
2010
@@ -67,6 +67,12 @@
<version>${jasypt-version}</version>
</dependency>
+ <dependency>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf-proto</artifactId>
+ <version>${hawtbuf-version}</version>
+ </dependency>
+
<!-- Scala Support -->
<dependency>
@@ -134,6 +140,22 @@
</executions>
</plugin>
+ <plugin>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf-proto</artifactId>
+ <version>${hawtbuf-version}</version>
+ <configuration>
+ <type>alt</type>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
</plugins>
</build>
Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
(from r1053959,
activemq/activemq-apollo/trunk/apollo-bdb/src/main/proto/data.proto)
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto&p1=activemq/activemq-apollo/trunk/apollo-bdb/src/main/proto/data.proto&r1=1053959&r2=1053982&rev=1053982&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/proto/data.proto
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Thu
Dec 30 19:29:37 2010
@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
-package org.apache.activemq.apollo.broker.store.bdb.model;
+package org.apache.activemq.apollo.broker.store;
option java_multiple_files = true;
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1053982&view=auto
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
(added)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
Thu Dec 30 19:29:37 2010
@@ -0,0 +1,108 @@
+package org.apache.activemq.apollo.broker.store
+
+import java.io.{OutputStream, InputStream}
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object PBSupport {
+
+ implicit def to_pb(v: MessageRecord):MessagePB.Buffer = {
+ val pb = new MessagePB.Bean
+ pb.setMessageKey(v.key)
+ pb.setProtocol(v.protocol)
+ pb.setSize(v.size)
+ pb.setValue(v.buffer)
+ pb.setExpiration(v.expiration)
+ pb.freeze
+ }
+
+ implicit def from_pb(pb: MessagePB.Getter):MessageRecord = {
+ val rc = new MessageRecord
+ rc.key = pb.getMessageKey
+ rc.protocol = pb.getProtocol
+ rc.size = pb.getSize
+ rc.buffer = pb.getValue
+ rc.expiration = pb.getExpiration
+ rc
+ }
+
+ def encode_message_record(out: OutputStream, v: MessageRecord) =
to_pb(v).writeUnframed(out)
+ def decode_message_record(in: InputStream):MessageRecord =
MessagePB.FACTORY.parseUnframed(in)
+
+ implicit def encode_message_record(v: MessageRecord) =
to_pb(v).toUnframedByteArray
+ implicit def decode_message_record(data: Array[Byte]):MessageRecord =
MessagePB.FACTORY.parseUnframed(data)
+
+
+
+ implicit def to_pb(v: QueueRecord):QueuePB.Buffer = {
+ val pb = new QueuePB.Bean
+ pb.setKey(v.key)
+ pb.setBindingData(v.binding_data)
+ pb.setBindingKind(v.binding_kind)
+ pb.freeze
+ }
+
+ implicit def from_pb(pb: QueuePB.Getter):QueueRecord = {
+ val rc = new QueueRecord
+ rc.key = pb.getKey
+ rc.binding_data = pb.getBindingData
+ rc.binding_kind = pb.getBindingKind
+ rc
+ }
+
+ def encode_queue_record(out: OutputStream, v: QueueRecord) =
to_pb(v).writeUnframed(out)
+ def decode_queue_record(in: InputStream):QueueRecord =
QueuePB.FACTORY.parseUnframed(in)
+
+ implicit def encode_queue_record(v: QueueRecord) =
to_pb(v).toUnframedByteArray
+ implicit def decode_queue_record(data: Array[Byte]):QueueRecord =
QueuePB.FACTORY.parseUnframed(data)
+
+
+ implicit def to_pb(v: QueueEntryRecord):QueueEntryPB.Buffer = {
+ val pb = new QueueEntryPB.Bean
+ pb.setQueueKey(v.queue_key)
+ pb.setQueueSeq(v.entry_seq)
+ pb.setMessageKey(v.message_key)
+ pb.setAttachment(v.attachment)
+ pb.setSize(v.size)
+ pb.setRedeliveries(v.redeliveries)
+ pb.freeze
+ }
+
+ implicit def from_pb(pb: QueueEntryPB.Getter):QueueEntryRecord = {
+ val rc = new QueueEntryRecord
+ rc.queue_key = pb.getQueueKey
+ rc.entry_seq = pb.getQueueSeq
+ rc.message_key = pb.getMessageKey
+ rc.attachment = pb.getAttachment
+ rc.size = pb.getSize
+ rc.redeliveries = pb.getRedeliveries.toShort
+ rc
+ }
+
+ def encode_queue_entry_record(out: OutputStream, v: QueueEntryRecord) =
to_pb(v).writeUnframed(out)
+ def decode_queue_entry_record(in: InputStream):QueueEntryRecord =
QueueEntryPB.FACTORY.parseUnframed(in)
+
+ implicit def encode_queue_entry_record(v: QueueEntryRecord) =
to_pb(v).toUnframedByteArray
+ implicit def decode_queue_entry_record(data: Array[Byte]):QueueEntryRecord =
QueueEntryPB.FACTORY.parseUnframed(data)
+
+}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-cassandra/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/pom.xml?rev=1053982&r1=1053981&r2=1053982&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/pom.xml Thu Dec 30 19:29:37
2010
@@ -53,18 +53,6 @@
<version>${cascal-version}</version>
</dependency>
- <dependency>
- <groupId>org.fusesource.hawtbuf</groupId>
- <artifactId>hawtbuf-proto</artifactId>
- <version>${hawtbuf-version}</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j-version}</version>
- </dependency>
-
-
<!-- Scala Support -->
<dependency>
<groupId>org.scala-lang</groupId>
@@ -170,22 +158,6 @@
<build>
<plugins>
- <plugin>
- <groupId>org.fusesource.hawtbuf</groupId>
- <artifactId>hawtbuf-proto</artifactId>
- <version>${hawtbuf-version}</version>
- <configuration>
- <type>alt</type>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
<!-- Tests are failing on windows, need to investigate -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Modified:
activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala?rev=1053982&r1=1053981&r2=1053982&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala
Thu Dec 30 19:29:37 2010
@@ -18,11 +18,10 @@ package org.apache.activemq.apollo.broke
import com.shorrockin.cascal.session._
import com.shorrockin.cascal.utils.Conversions._
-import java.util.{HashMap}
-import org.fusesource.hawtbuf.AsciiBuffer._
-import org.fusesource.hawtbuf.{AsciiBuffer, DataByteArrayInputStream,
DataByteArrayOutputStream, Buffer}
+import org.fusesource.hawtbuf.Buffer
import org.apache.activemq.apollo.broker.store._
import collection.mutable.ListBuffer
+import org.apache.activemq.apollo.broker.store.PBSupport._
/**
*
@@ -54,64 +53,6 @@ class CassandraClient() {
}
}
- def decodeMessageRecord(v: Array[Byte]): MessageRecord = {
- import PBMessageRecord._
- val pb = PBMessageRecord.FACTORY.parseUnframed(v)
- val rc = new MessageRecord
- rc.protocol = pb.getProtocol
- rc.size = pb.getSize
- rc.buffer = pb.getValue
- rc.expiration = pb.getExpiration
- rc
- }
-
- def encodeMessageRecord(v: MessageRecord): Array[Byte] = {
- val pb = new PBMessageRecord.Bean
- pb.setProtocol(v.protocol)
- pb.setSize(v.size)
- pb.setValue(v.buffer)
- pb.setExpiration(v.expiration)
- pb.freeze.toUnframedByteArray
- }
-
- implicit def decodeQueueEntryRecord(v: Array[Byte]): QueueEntryRecord = {
- import PBQueueEntryRecord._
- val pb = PBQueueEntryRecord.FACTORY.parseUnframed(v)
- val rc = new QueueEntryRecord
- rc.message_key = pb.getMessageKey
- rc.attachment = pb.getAttachment
- rc.size = pb.getSize
- rc.redeliveries = pb.getRedeliveries.toShort
- rc
- }
-
- implicit def encodeQueueEntryRecord(v: QueueEntryRecord): Array[Byte] = {
- val pb = new PBQueueEntryRecord.Bean
- pb.setMessageKey(v.message_key)
- pb.setAttachment(v.attachment)
- pb.setSize(v.size)
- pb.setRedeliveries(v.redeliveries)
- pb.freeze.toUnframedByteArray
- }
-
- implicit def decodeQueueRecord(v: Array[Byte]): QueueRecord = {
- import PBQueueRecord._
- val pb = PBQueueRecord.FACTORY.parseUnframed(v)
- val rc = new QueueRecord
- rc.key = pb.getKey
- rc.binding_kind = pb.getBindingKind
- rc.binding_data = pb.getBindingData
- rc
- }
-
- implicit def encodeQueueRecord(v: QueueRecord): Array[Byte] = {
- val pb = new PBQueueRecord.Bean
- pb.setKey(v.key)
- pb.setBindingKind(v.binding_kind)
- pb.setBindingData(v.binding_data)
- pb.freeze.toUnframedByteArray
- }
-
def purge() = {
withSession {
session =>
@@ -176,7 +117,7 @@ class CassandraClient() {
case (msg, action) =>
var rc =
if (action.messageRecord != null) {
- operations ::= Insert( schema.message_data \ (msg,
encodeMessageRecord(action.messageRecord) ) )
+ operations ::= Insert( schema.message_data \ (msg,
action.messageRecord ) )
}
action.enqueues.foreach {
queueEntry =>
@@ -201,8 +142,7 @@ class CassandraClient() {
session =>
session.get(schema.message_data \ id) match {
case Some(x) =>
- val rc: MessageRecord = decodeMessageRecord(x.value)
- rc.key = id
+ val rc: MessageRecord = x.value
Some(rc)
case None =>
None
@@ -246,8 +186,6 @@ class CassandraClient() {
session =>
session.list(schema.entries \ queue_key, RangePredicate(firstSeq,
lastSeq)).map { x=>
val rc:QueueEntryRecord = x.value
- rc.queue_key = queue_key
- rc.entry_seq = x.name
rc
}
}
Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/pom.xml?rev=1053982&r1=1053981&r2=1053982&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/pom.xml Thu Dec 30 19:29:37 2010
@@ -45,18 +45,6 @@
<artifactId>jdbm</artifactId>
<version>${jdbm-version}</version>
</dependency>
-
- <dependency>
- <groupId>org.fusesource.hawtbuf</groupId>
- <artifactId>hawtbuf-proto</artifactId>
- <version>${hawtbuf-version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j-version}</version>
- </dependency>
<!-- Since we implement a jade template to display the JDBM status -->
<dependency>
@@ -130,22 +118,6 @@
<plugins>
<plugin>
- <groupId>org.fusesource.hawtbuf</groupId>
- <artifactId>hawtbuf-proto</artifactId>
- <version>${hawtbuf-version}</version>
- <configuration>
- <type>alt</type>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
<groupId>org.fusesource.scalate</groupId>
<artifactId>maven-scalate-plugin</artifactId>
<version>${scalate-version}</version>
Modified:
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1053982&r1=1053981&r2=1053982&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
Thu Dec 30 19:29:37 2010
@@ -20,7 +20,6 @@ import dto.JDBM2StoreDTO
import java.{lang=>jl}
import java.{util=>ju}
-import java.util.concurrent.atomic.AtomicInteger
import collection.mutable.ListBuffer
import org.apache.activemq.apollo.broker.store._
import org.apache.activemq.apollo.util._
@@ -28,80 +27,25 @@ import jdbm._
import btree.BTree
import htree.HTree
import java.util.Comparator
-import model._
import java.io.Serializable
import jdbm.helper._
+import PBSupport._
object JDBM2Client extends Log {
object MessageRecordSerializer extends Serializer[MessageRecord] {
-
- def serialize(out: SerializerOutput, v: MessageRecord) = {
- val pb = new MessagePB.Bean
- pb.setMessageKey(v.key)
- pb.setProtocol(v.protocol)
- pb.setSize(v.size)
- pb.setValue(v.buffer)
- pb.setExpiration(v.expiration)
- pb.freeze.writeUnframed(out)
- }
-
- def deserialize(in: SerializerInput) = {
- val pb = MessagePB.FACTORY.parseUnframed(in)
- val rc = new MessageRecord
- rc.key = pb.getMessageKey
- rc.protocol = pb.getProtocol
- rc.size = pb.getSize
- rc.buffer = pb.getValue
- rc.expiration = pb.getExpiration
- rc
- }
+ def serialize(out: SerializerOutput, v: MessageRecord) =
encode_message_record(out, v)
+ def deserialize(in: SerializerInput) = decode_message_record(in)
}
object QueueRecordSerializer extends Serializer[QueueRecord] {
-
- def serialize(out: SerializerOutput, v: QueueRecord) = {
- val pb = new QueuePB.Bean
- pb.setKey(v.key)
- pb.setBindingData(v.binding_data)
- pb.setBindingKind(v.binding_kind)
- pb.freeze.writeUnframed(out)
- }
-
- def deserialize(in: SerializerInput) = {
- val pb = QueuePB.FACTORY.parseUnframed(in)
- val rc = new QueueRecord
- rc.key = pb.getKey
- rc.binding_data = pb.getBindingData
- rc.binding_kind = pb.getBindingKind
- rc
- }
+ def serialize(out: SerializerOutput, v: QueueRecord) =
encode_queue_record(out, v)
+ def deserialize(in: SerializerInput) = decode_queue_record(in)
}
object QueueEntryRecordSerializer extends Serializer[QueueEntryRecord] {
-
- def serialize(out: SerializerOutput, v: QueueEntryRecord) = {
- val pb = new QueueEntryPB.Bean
- pb.setQueueKey(v.queue_key)
- pb.setQueueSeq(v.entry_seq)
- pb.setMessageKey(v.message_key)
- pb.setAttachment(v.attachment)
- pb.setSize(v.size)
- pb.setRedeliveries(v.redeliveries)
- pb.freeze.writeUnframed(out)
- }
-
- def deserialize(in: SerializerInput) = {
- val pb = QueueEntryPB.FACTORY.parseUnframed(in)
- val rc = new QueueEntryRecord
- rc.queue_key = pb.getQueueKey
- rc.entry_seq = pb.getQueueSeq
- rc.message_key = pb.getMessageKey
- rc.attachment = pb.getAttachment
- rc.size = pb.getSize
- rc.redeliveries = pb.getRedeliveries.toShort
- rc
- }
+ def serialize(out: SerializerOutput, v: QueueEntryRecord) =
encode_queue_entry_record(out, v)
+ def deserialize(in: SerializerInput) = decode_queue_entry_record(in)
}
object QueueEntryKeySerializer extends Serializer[(Long,Long)] {