Author: chirino
Date: Thu Jan 26 22:46:12 2012
New Revision: 1236424
URL: http://svn.apache.org/viewvc?rev=1236424&view=rev
Log:
- Added ':' as a valid destination part value.
- Temp destinations now check ownership against the a connection's session_id
instead of the connection id (the session id is value assigned by the protocol).
- Updated the openwire protocol handler temp destination implementation to use
the broker's temp destination features so that it can enforce security and the
temp dest lifecycle.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
activemq/activemq-apollo/trunk/apollo-openwire/pom.xml
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
Thu Jan 26 22:46:12 2012
@@ -348,12 +348,12 @@ class Broker() extends BaseService with
def schedule_virtualhost_maintenance:Unit = dispatch_queue.after(1,
TimeUnit.SECONDS) {
if( service_state.is_started ) {
- val active_connections = connections.keySet
+ val active_sessions = connections.values.flatMap(_.session_id).toSet
virtual_hosts.values.foreach { host=>
host.dispatch_queue {
if(host.service_state.is_started) {
- host.router.remove_temp_destinations(active_connections)
+ host.router.remove_temp_destinations(active_sessions)
}
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
Thu Jan 26 22:46:12 2012
@@ -96,6 +96,8 @@ class BrokerConnection(var connector: Co
var protocol_handler: ProtocolHandler = null;
+ def session_id = Option(protocol_handler).flatMap(_.session_id)
+
override def toString = "id: "+id.toString
protected override def _start(on_completed:Runnable) = {
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
Thu Jan 26 22:46:12 2012
@@ -28,6 +28,7 @@ import scala.Array
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class DestinationParser extends PathParser {
+ import PathParser._
var queue_prefix = "queue:"
var topic_prefix = "topic:"
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Thu Jan 26 22:46:12 2012
@@ -213,7 +213,7 @@ class LocalRouter(val virtual_host:Virtu
None
} else {
try {
- Some((destination.path.get(1), destination.path.get(2).toLong))
+ Some((destination.path.get(1), destination.path.get(2)))
} catch {
case _ => None
}
@@ -287,7 +287,7 @@ class LocalRouter(val virtual_host:Virtu
for(dest <- get_destination_matches(path)) {
if( is_temp(destination) ) {
val owner = temp_owner(destination).get
- for( connection <- security.connection_id) {
+ for( connection <- security.session_id) {
if( (virtual_host.broker.id, connection) != owner ) {
return Some("Not authorized to destroy the temp %s '%s'.
Principals=%s".format(dest.resource_kind.id, dest.id, security.principal_dump))
}
@@ -313,7 +313,7 @@ class LocalRouter(val virtual_host:Virtu
if( is_temp(destination) ) {
temp_owner(destination) match {
case Some(owner) =>
- for( connection <- security.connection_id) {
+ for( connection <- security.session_id) {
if( (virtual_host.broker.id, connection) != owner ) {
return Some("Not authorized to receive from the temporary
destination. Principals=%s".format(security.principal_dump))
}
@@ -911,7 +911,7 @@ class LocalRouter(val virtual_host:Virtu
}
- def remove_temp_destinations(active_connections:scala.collection.Set[Long])
= {
+ def
remove_temp_destinations(active_connections:scala.collection.Set[String]) = {
virtual_host.dispatch_queue.assertExecuting()
val min_create_time = virtual_host.broker.now - 1000;
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Thu Jan 26 22:46:12 2012
@@ -48,7 +48,7 @@ trait Router extends Service {
def apply_update(on_completed:Runnable):Unit
- def
remove_temp_destinations(active_connections:scala.collection.Set[Long]):Unit
+ def
remove_temp_destinations(active_connections:scala.collection.Set[String]):Unit
}
/**
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
Thu Jan 26 22:46:12 2012
@@ -154,6 +154,8 @@ class AnyProtocolHandler extends Protoco
var discriminated = false
+ def session_id = None
+
override def on_transport_command(command: AnyRef) = {
if (!command.isInstanceOf[ProtocolDetected]) {
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
Thu Jan 26 22:46:12 2012
@@ -67,6 +67,8 @@ trait ProtocolHandler {
def protocol:String
+ def session_id:Option[String]
+
var connection:BrokerConnection = null;
def set_connection(brokerConnection:BrokerConnection) = {
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
Thu Jan 26 22:46:12 2012
@@ -39,7 +39,7 @@ class SecurityContext {
var local_address:SocketAddress = _
var remote_address:SocketAddress = _
var login_context:LoginContext = _
- var connection_id:Option[Long] = None
+ var session_id:Option[String] = None
def credential_dump = {
var rc = List[String]()
Modified: activemq/activemq-apollo/trunk/apollo-openwire/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/pom.xml?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/pom.xml Thu Jan 26 22:46:12
2012
@@ -60,33 +60,47 @@
<!-- so we can test against a persisentce store -->
<dependency>
<groupId>org.apache.activemq</groupId>
- <artifactId>apollo-leveldb</artifactId>
+ <artifactId>apollo-broker</artifactId>
<version>1.1-SNAPSHOT</version>
+ <type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
- <artifactId>apollo-bdb</artifactId>
+ <artifactId>apollo-util</artifactId>
<version>1.1-SNAPSHOT</version>
+ <type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
- <artifactId>apollo-broker</artifactId>
+ <artifactId>apollo-bdb</artifactId>
<version>1.1-SNAPSHOT</version>
- <type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
- <artifactId>apollo-util</artifactId>
+ <artifactId>apollo-leveldb</artifactId>
<version>1.1-SNAPSHOT</version>
- <type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.eclipse.jetty.aggregate</groupId>
+ <artifactId>jetty-all-server</artifactId>
+ <version>${jetty-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-web</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
Thu Jan 26 22:46:12 2012
@@ -175,7 +175,7 @@ class DestinationAdvisoryRouterListener(
def send(delivery:Delivery): Unit = {
val message = delivery.message.asInstanceOf[OpenwireMessage].message
- val dest: Array[DestinationDTO] =
to_destination_dto(message.getDestination)
+ val dest: Array[DestinationDTO] =
to_destination_dto(message.getDestination,null)
val key = dest.toList
val route = producerRoutes.get(key) match {
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
Thu Jan 26 22:46:12 2012
@@ -16,11 +16,12 @@
*/
package org.apache.activemq.apollo.openwire
-import command._
-import org.apache.activemq.apollo.util.path.Path
import java.util.regex.{Matcher, Pattern}
import org.apache.activemq.apollo.dto.{TopicDestinationDTO,
QueueDestinationDTO, DestinationDTO}
-import org.apache.activemq.apollo.broker.{DestinationParser, LocalRouter}
+import org.apache.activemq.apollo.broker.DestinationParser
+import org.fusesource.hawtbuf.Buffer.utf8
+import org.apache.activemq.apollo.openwire.command._
+import org.fusesource.hawtbuf._
/**
* <p>
@@ -35,35 +36,32 @@ object DestinationConverter {
OPENWIRE_PARSER.any_child_wildcard = "*"
OPENWIRE_PARSER.any_descendant_wildcard = ">"
- def to_destination_dto(domain: String, parts:Array[String]): DestinationDTO
= domain match {
- case "queue" => new QueueDestinationDTO(parts)
- case "topic" => new TopicDestinationDTO(parts)
- case _ => throw new Exception("Uknown destination domain: " + domain);
- }
-
- def to_destination_dto(domain: String, path: Path): Array[DestinationDTO] = {
- Array(to_destination_dto(domain, OPENWIRE_PARSER.path_parts(path)))
- }
+ // = Pattern.compile("[ a-zA-Z0-9\\_\\-\\%\\~]")
+
+ def to_destination_dto(dest: ActiveMQDestination,
handler:OpenwireProtocolHandler): Array[DestinationDTO] = {
- def to_destination_dto(dest: ActiveMQDestination): Array[DestinationDTO] = {
if( !dest.isComposite ) {
import ActiveMQDestination._
- val physicalName = dest.getPhysicalName.replaceAll(Pattern.quote(":"),
Matcher.quoteReplacement("%58"))
-
- var path = OPENWIRE_PARSER.decode_path(physicalName)
- dest.getDestinationType match {
+ var name = dest.getPhysicalName
+ Array(dest.getDestinationType match {
case QUEUE_TYPE =>
- to_destination_dto("queue", path)
+ var path_parts =
OPENWIRE_PARSER.parts(name).map(sanitize_destination_part(_))
+ new QueueDestinationDTO(path_parts)
case TOPIC_TYPE =>
- to_destination_dto("topic", path)
+ var path_parts =
OPENWIRE_PARSER.parts(name).map(sanitize_destination_part(_))
+ new TopicDestinationDTO(path_parts)
case TEMP_QUEUE_TYPE =>
- to_destination_dto("queue", Path("ActiveMQ", "Temp") + path)
+ val (connectionid, rest)= name.splitAt(name.lastIndexOf(':'))
+ val real_path = ("temp" :: handler.broker.id ::
sanitize_destination_part(connectionid) ::
sanitize_destination_part(rest.substring(1)) :: Nil).toArray
+ new QueueDestinationDTO( real_path ).temp(true)
case TEMP_TOPIC_TYPE =>
- to_destination_dto("topic", Path("ActiveMQ", "Temp") + path)
- }
+ val (connectionid, rest)= name.splitAt(name.lastIndexOf(':'))
+ val real_path = ("temp" :: handler.broker.id ::
sanitize_destination_part(connectionid) ::
sanitize_destination_part(rest.substring(1)) :: Nil).toArray
+ new TopicDestinationDTO( real_path ).temp(true)
+ })
} else {
dest.getCompositeDestinations.map { c =>
- to_destination_dto(c)(0)
+ to_destination_dto(c, handler)(0)
}
}
}
@@ -72,26 +70,21 @@ object DestinationConverter {
import collection.JavaConversions._
val rc = dest.map { dest =>
- var temp = false // dest.temp_owner != null
- val name = OPENWIRE_PARSER.encode_path(asScalaBuffer(dest.path).toList
match {
- case "ActiveMQ" :: "Temp" :: rest =>
- temp = true
- rest
- case rest =>
- rest
- }).replaceAll(Pattern.quote("%58"), Matcher.quoteReplacement(":"))
+ val temp = dest.path.headOption == Some("temp")
dest match {
case dest:QueueDestinationDTO =>
if( temp ) {
- new ActiveMQTempQueue(name)
+ new
ActiveMQTempQueue(dest.path.toList.drop(2).map(unsanitize_destination_part(_)).mkString(":"))
} else {
+ val name =
OPENWIRE_PARSER.encode_path(asScalaBuffer(dest.path).toList.map(unsanitize_destination_part(_)))
new ActiveMQQueue(name)
}
case dest:TopicDestinationDTO =>
if( temp ) {
- new ActiveMQTempTopic(name)
+ new
ActiveMQTempTopic(dest.path.toList.drop(2).map(unsanitize_destination_part(_)).mkString(":"))
} else {
+ val name =
OPENWIRE_PARSER.encode_path(asScalaBuffer(dest.path).toList.map(unsanitize_destination_part(_)))
new ActiveMQTopic(name)
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Thu Jan 26 22:46:12 2012
@@ -36,11 +36,10 @@ import codec.OpenWireFormat
import command._
import
org.apache.activemq.apollo.openwire.dto.{OpenwireConnectionStatusDTO,OpenwireDTO}
import org.apache.activemq.apollo.dto.{AcceptingConnectorDTO,
TopicDestinationDTO, DurableSubscriptionDestinationDTO, DestinationDTO}
-import org.apache.activemq.apollo.openwire.DestinationConverter._
import org.apache.activemq.apollo.broker._
import protocol._
import security.SecurityContext
-
+import DestinationConverter._
object OpenwireProtocolHandler extends Log {
def unit:Unit = {}
@@ -97,7 +96,6 @@ class OpenwireProtocolHandler extends Pr
private def queue = connection.dispatch_queue
- var session_id: AsciiBuffer = _
var wire_format: OpenWireFormat = _
var login: Option[AsciiBuffer] = None
var passcode: Option[AsciiBuffer] = None
@@ -111,6 +109,9 @@ class OpenwireProtocolHandler extends Pr
var current_command: Object = _
var codec:OpenwireCodec = _
+ var temp_destination_map = HashMap[ActiveMQDestination, DestinationDTO]()
+
+ def session_id = security_context.session_id
override def create_connection_status = {
var rc = new OpenwireConnectionStatusDTO
@@ -179,7 +180,6 @@ class OpenwireProtocolHandler extends Pr
}
override def on_transport_connected():Unit = {
- security_context.connection_id = Some(connection.id)
security_context.local_address = connection.transport.getLocalAddress
security_context.remote_address = connection.transport.getRemoteAddress
@@ -421,11 +421,12 @@ class OpenwireProtocolHandler extends Pr
def on_connection_info(info: ConnectionInfo) = {
val id = info.getConnectionId()
- if (!all_connections.contains(id)) {
+ if (connection_context==null) {
new ConnectionContext(info).attach
security_context.user = info.getUserName
security_context.password = info.getPassword
+ security_context.session_id =
Some(sanitize_destination_part(info.getConnectionId.toString))
reset {
if( host.authenticator!=null && host.authorizer!=null ) {
@@ -458,7 +459,7 @@ class OpenwireProtocolHandler extends Pr
def on_session_info(info: SessionInfo) = {
val id = info.getSessionId();
if (!all_sessions.contains(id)) {
- val parent = all_connections.get(id.getParentId()).getOrElse(die("Cannot
add a session to a connection that had not been registered."))
+ val parent = get_context(id.getParentId())
new SessionContext(parent, info).attach
}
ack(info);
@@ -484,7 +485,7 @@ class OpenwireProtocolHandler extends Pr
}
def on_destination_info(info:DestinationInfo) = {
- val destinations = to_destination_dto(info.getDestination)
+ val destinations = to_destination_dto(info.getDestination, this)
// if( info.getDestination.isTemporary ) {
// destinations.foreach(_.temp_owner = connection.id)
// }
@@ -506,7 +507,7 @@ class OpenwireProtocolHandler extends Pr
def on_remove_info(info: RemoveInfo) = {
info.getObjectId match {
- case id: ConnectionId => all_connections.get(id).foreach(_.dettach)
+ case id: ConnectionId => Option(connection_context).foreach(_.dettach)
case id: SessionId => all_sessions.get(id).foreach(_.dettach)
case id: ProducerId => all_producers.get(id).foreach(_.dettach)
case id: ConsumerId => all_consumers.get(id).foreach(_.dettach )
@@ -516,8 +517,15 @@ class OpenwireProtocolHandler extends Pr
ack(info)
}
+ def get_context(id:ConnectionId) = {
+ if(connection_context!=null && connection_context.info.getConnectionId ==
id)
+ connection_context
+ else
+ die("Cannot add a session to a connection that had not been registered.")
+ }
+
def on_transaction_info(info:TransactionInfo) = {
- val parent =
all_connections.get(info.getConnectionId()).getOrElse(die("Cannot add a session
to a connection that had not been registered."))
+ val parent = get_context(info.getConnectionId())
val id = info.getTransactionId
info.getType match {
case TransactionInfo.BEGIN =>
@@ -591,7 +599,7 @@ class OpenwireProtocolHandler extends Pr
def perform_send(msg:ActiveMQMessage, uow:StoreUOW=null): Unit = {
- val destiantion = to_destination_dto(msg.getDestination)
+ val destiantion = to_destination_dto(msg.getDestination, this)
val key = destiantion.toList
producerRoutes.get(key) match {
case null =>
@@ -685,8 +693,8 @@ class OpenwireProtocolHandler extends Pr
// host.createQueue(destination);
// return ack(info);
// }
-
- val all_connections = new HashMap[ConnectionId, ConnectionContext]();
+ var connection_context:ConnectionContext= null
+
val all_sessions = new HashMap[SessionId, SessionContext]();
val all_producers = new HashMap[ProducerId, ProducerContext]();
val all_consumers = new HashMap[ConsumerId, ConsumerContext]();
@@ -701,15 +709,18 @@ class OpenwireProtocolHandler extends Pr
def default_session_id = new SessionId(info.getConnectionId(), -1)
def attach = {
+ if( connection_context!=null ) {
+ die("Only one logic connection is supported.")
+ }
// create the default session.
new SessionContext(this, new SessionInfo(default_session_id)).attach
- all_connections.put(info.getConnectionId, this)
+ connection_context = this
}
def dettach = {
sessions.values.toArray.foreach(_.dettach)
transactions.values.toArray.foreach(_.dettach)
- all_connections.remove(info.getConnectionId)
+ connection_context = null
}
}
@@ -811,7 +822,7 @@ class OpenwireProtocolHandler extends Pr
def attach = {
if( info.getDestination == null ) fail("destination was not set")
- destination = to_destination_dto(info.getDestination)
+ destination = to_destination_dto(info.getDestination,
OpenwireProtocolHandler.this)
// if they are temp dests.. attach our owner id so that we don't
// get rejected.
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Thu Jan 26 22:46:12 2012
@@ -55,6 +55,7 @@ object BufferSupport {
}
import BufferSupport._
+import PathParser._
object StompProtocolHandler extends Log {
@@ -566,7 +567,6 @@ class StompProtocolHandler extends Proto
val security_context = new SecurityContext
var waiting_on: ()=>String = WAITING_ON_CLIENT_REQUEST
var config:StompDTO = _
- var session_id:String = _
var protocol_filters = List[ProtocolFilter]()
@@ -575,6 +575,8 @@ class StompProtocolHandler extends Proto
var codec:StompCodec = _
+ def session_id = security_context.session_id
+
implicit def toDestinationDTO(value:AsciiBuffer):Array[DestinationDTO] = {
val rc = destination_parser.decode_multi_destination(value.toString)
if( rc==null ) {
@@ -584,7 +586,7 @@ class StompProtocolHandler extends Proto
if( dest.temp() ) {
temp_destination_map.getOrElseUpdate(dest, {
import scala.collection.JavaConversions._
- val real_path= ("temp" :: broker.id :: connection.id.toString ::
dest.path.toList).toArray
+ val real_path= ("temp" :: broker.id :: session_id.get ::
dest.path.toList).toArray
dest match {
case dest:QueueDestinationDTO => new QueueDestinationDTO(
real_path ).temp(true)
case dest:TopicDestinationDTO => new TopicDestinationDTO(
real_path ).temp(true)
@@ -833,7 +835,6 @@ class StompProtocolHandler extends Proto
case _ => None
}
- security_context.connection_id = Some(connection.id)
security_context.local_address = connection.transport.getLocalAddress
security_context.remote_address = connection.transport.getRemoteAddress
security_context.user = get(headers, LOGIN).map(decode_header
_).getOrElse(null)
@@ -892,9 +893,7 @@ class StompProtocolHandler extends Proto
var connected_headers = ListBuffer((VERSION, protocol_version))
connected_headers +=
SERVER->encode_header("apache-apollo/"+Broker.version)
- val v = encode_header("%s-%x-".format(this.host.config.id,
this.host.session_counter.incrementAndGet))
- session_id = v.toString
- connected_headers += SESSION->v
+ connected_headers += SESSION->encode_header(session_id.get)
val outbound_heart_beat_header =
ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
connected_headers += HEART_BEAT->outbound_heart_beat_header
@@ -930,6 +929,7 @@ class StompProtocolHandler extends Proto
noop
} else {
this.host=host
+ security_context.session_id =
Some("%s-%x-".format(sanitize_destination_part(this.host.config.id),
this.host.session_counter.incrementAndGet))
connection_log = host.connection_log
if( host.authenticator!=null && host.authorizer!=null ) {
suspend_read("authenticating and authorizing connect")
@@ -1060,7 +1060,7 @@ class StompProtocolHandler extends Proto
// Do we need to add the message id?
if( get( headers, MESSAGE_ID) == None ) {
message_id_counter += 1
- rc ::= (MESSAGE_ID -> ascii(session_id+message_id_counter))
+ rc ::= (MESSAGE_ID -> ascii(session_id.get+message_id_counter))
}
if( config.add_timestamp_header!=null ) {
Modified:
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala
Thu Jan 26 22:46:12 2012
@@ -20,8 +20,8 @@ import java.util.LinkedList
import java.util.regex._
import collection.JavaConversions._
import org.apache.activemq.apollo.util.path.PathParser.PartFilter
-import util.matching.Regex
import collection.mutable.ListBuffer
+import org.fusesource.hawtbuf.{DataByteArrayOutputStream, AsciiBuffer}
/**
* Holds the delimiters used to parse paths.
@@ -108,6 +108,44 @@ object PathParser {
}
}
+
+ def sanitize_destination_part(value:String) = {
+ val buffer = new AsciiBuffer(value)
+ val rc = new StringBuffer(value.length())
+ var i = 0
+ val l = buffer.length
+ while( i < l ) {
+ val c = buffer.data(i)
+ if(
+ ('a' <= c && c <= 'z') ||
+ ('A' <= c && c <= 'Z') ||
+ ('0' <= c && c <= '9') ||
+ c=='_' || c=='-' || c=='~' || c==':'
+ ) {
+ rc.append(c.toChar)
+ } else {
+ rc.append("%%%02x".format(c))
+ }
+ i += 1
+ }
+ rc.toString
+ }
+
+ def unsanitize_destination_part(value:String):String = {
+ val rc = new DataByteArrayOutputStream
+ var pos = value
+ while( pos.length() > 0 ) {
+ if( pos.startsWith("%") && pos.length()> 3 ) {
+ val dec = pos.substring(1,3)
+ rc.writeByte(Integer.parseInt(dec, 16))
+ pos = pos.substring(3);
+ } else {
+ rc.writeByte(pos.charAt(0))
+ pos = pos.substring(1)
+ }
+ }
+ rc.toBuffer.utf8().toString
+ }
}
class PathParser {
@@ -117,7 +155,7 @@ class PathParser {
var regex_wildcard_start = "{"
var regex_wildcard_end = "}"
var path_separator = "."
- var part_pattern = Pattern.compile("[ a-zA-Z0-9\\_\\-\\%\\~]+")
+ var part_pattern = Pattern.compile("[ a-zA-Z0-9\\_\\-\\%\\~\\:]+")
def copy(other:PathParser) = {
any_descendant_wildcard = other.any_descendant_wildcard
Modified:
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1236424&r1=1236423&r2=1236424&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
(original)
+++
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Thu Jan 26 22:46:12 2012
@@ -1569,6 +1569,7 @@ header in the STOMP `SUBSCRIBE` frame to
### Destination Name Restrictions
Destination names are restricted to using the characters `a-z`, `A-Z`, `0-9`,
-`_`, `-` `%`, `~`, ' ', or `.` in addition to composite separator `,` and the
wild
-card `*`.
+`_`, `-` `%`, `~`, `:`, ' ', or `.` in addition to composite separator `,` and
the wild
+card `*`. Any other characters must be UTF-8 and then URL encoded if you wish
to
+preserve their significance.