Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java?rev=1461905&r1=1461904&r2=1461905&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java (original) +++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java Wed Mar 27 23:58:58 2013 @@ -24,8 +24,7 @@ import org.apache.activemq.apollo.broker import org.apache.activemq.apollo.filter.FilterException; import org.apache.activemq.apollo.mqtt.MqttSessionManager.HostState; import org.apache.activemq.apollo.mqtt.MqttSessionManager.SessionState; -import org.apache.activemq.apollo.util.LRUCache; -import org.apache.activemq.apollo.util.LongCounter; +import org.apache.activemq.apollo.util.*; import org.apache.activemq.apollo.util.path.Path$; import org.apache.activemq.apollo.util.path.PathMap; import org.apache.activemq.apollo.util.path.PathParser; @@ -37,7 +36,6 @@ import org.fusesource.mqtt.client.Topic; import org.fusesource.mqtt.codec.*; import scala.Option; import scala.Tuple2; -import scala.runtime.BoxedUnit; import java.net.ProtocolException; import java.util.*; @@ -45,6 +43,8 @@ import java.util.*; import static org.fusesource.hawtdispatch.Dispatch.NOOP; import static org.fusesource.hawtdispatch.Dispatch.createQueue; +import static org.apache.activemq.apollo.mqtt.MqttProtocolHandler.*; + /** * An MqttSession can be switch from one connection/protocol handler to another, * but it will only be associated with one at a time. An MqttSession tracks @@ -54,14 +54,6 @@ import static org.fusesource.hawtdispatc */ public class MqttSession { - public static final ScalaSupport.Logger log = new ScalaSupport.Logger(MqttProtocolHandler$.MODULE$); - - public static <T> T received(T value) { - log.trace("received: %s", value); - return value; - } - - public final HostState host_state; public final UTF8Buffer client_id; public final SessionState session_state; @@ -83,7 +75,7 @@ public class MqttSession { boolean publish_body = false; public VirtualHost host() { - return host_state.host(); + return host_state.host; } public void connect(final MqttProtocolHandler next) { @@ -142,23 +134,23 @@ public class MqttSession { public void attach() { queue.assertExecuting(); final MqttProtocolHandler h = handler; - clean_session = h.connect_message().cleanSession(); - security_context = h.security_context(); - h.command_handler_$eq(ScalaSupport.toScala(new UnitFn1<Object>() { + clean_session = h.connect_message.cleanSession(); + security_context = h.security_context; + h.command_handler = new UnitFn1<Object>() { @Override public void call(Object v1) { on_transport_command(v1); } - })); + }; destination_parser = h.destination_parser(); - mqtt_consumer().consumer_sink.downstream_$eq(ScalaSupport.some(h.sink_manager().open())); + mqtt_consumer().consumer_sink.downstream_$eq(Scala2Java.some(h.sink_manager.open())); final Task ack_connect = new Task() { @Override public void run() { queue.assertExecuting(); - connect_message = h.connect_message(); + connect_message = h.connect_message; CONNACK connack = new CONNACK(); connack.code(CONNACK.Code.CONNECTION_ACCEPTED); send(connack); @@ -167,10 +159,10 @@ public class MqttSession { if (!clean_session) { // Setup the previous subscriptions.. - session_state.strategy().create(host().store(), client_id); - if (!session_state.subscriptions().isEmpty()) { + session_state.strategy.create(host().store(), client_id); + if (!session_state.subscriptions.isEmpty()) { h._suspend_read("subscribing"); - ArrayList<Topic> topics = ScalaSupport.map(session_state.subscriptions().values(), new Fn1<Tuple2<Topic, BindAddress>, Topic>() { + ArrayList<Topic> topics = Scala2Java.map(session_state.subscriptions.values(), new Fn1<Tuple2<Topic, BindAddress>, Topic>() { @Override public Topic apply(Tuple2<Topic, BindAddress> v1) { return v1._1(); @@ -189,10 +181,10 @@ public class MqttSession { } else { // do we need to clear the received ids? // durable_session_state.received_message_ids.clear() - session_state.subscriptions().clear(); - if (session_state.durable_sub() != null) { - final DestinationAddress[] addresses = new DestinationAddress[]{session_state.durable_sub()}; - session_state.durable_sub_$eq(null); + session_state.subscriptions.clear(); + if (session_state.durable_sub != null) { + final DestinationAddress[] addresses = new DestinationAddress[]{session_state.durable_sub}; + session_state.durable_sub = null; host().dispatch_queue().execute(new Task() { @Override public void run() { @@ -201,7 +193,7 @@ public class MqttSession { }); } - session_state.strategy().destroy(new Task() { + session_state.strategy.destroy(new Task() { @Override public void run() { ack_connect.run(); @@ -237,10 +229,10 @@ public class MqttSession { }); mqtt_consumer().addresses.clear(); } - session_state.subscriptions().clear(); + session_state.subscriptions.clear(); } else { - if (session_state.durable_sub() != null) { - final BindAddress[] addresses = new BindAddress[]{session_state.durable_sub()}; + if (session_state.durable_sub != null) { + final BindAddress[] addresses = new BindAddress[]{session_state.durable_sub}; host().dispatch_queue().execute(new Runnable() { @Override public void run() { @@ -248,27 +240,27 @@ public class MqttSession { } }); mqtt_consumer().addresses.clear(); - session_state.durable_sub_$eq(null); + session_state.durable_sub = null; } } for (Request request : in_flight_publishes.values()) { - if (request.ack() != null) { - request.ack().apply( - request.delivered() ? Delivered$.MODULE$ : Undelivered$.MODULE$ + if (request.ack != null) { + request.ack.apply( + request.delivered ? Delivered$.MODULE$ : Undelivered$.MODULE$ ); } } in_flight_publishes.clear(); - handler.sink_manager().close(mqtt_consumer().consumer_sink.downstream().get(), ScalaSupport.<Request>noopFn1()); - mqtt_consumer().consumer_sink.downstream_$eq(ScalaSupport.<Sink<Request>>none()); + handler.sink_manager.close(mqtt_consumer().consumer_sink.downstream().get(), Scala2Java.<Request>noopFn1()); + mqtt_consumer().consumer_sink.downstream_$eq(Scala2Java.<Sink<Request>>none()); handler.on_transport_disconnected(); } public SimpleAddress decode_destination(UTF8Buffer value) { - SimpleAddress rc = destination_parser.decode_single_destination(value.toString(), ScalaSupport.toScala(new Fn1<String, SimpleAddress>() { + SimpleAddress rc = destination_parser.decode_single_destination(value.toString(), Scala2Java.toScala(new Fn1<String, SimpleAddress>() { public SimpleAddress apply(String name) { return new SimpleAddress("topic", destination_parser.decode_path(name)); } @@ -291,15 +283,15 @@ public class MqttSession { public void send(MessageSupport.Message message) { queue.assertExecuting(); - handler.connection_sink().offer(new Request((short) 0, message, null)); + handler.connection_sink.offer(new Request((short) 0, message, null)); } public void publish_completed(short id) { queue.assertExecuting(); Request request = in_flight_publishes.remove(id); if (request != null) { - if (request.ack() != null) { - request.ack().apply(Consumed$.MODULE$); + if (request.ack != null) { + request.ack.apply(Consumed$.MODULE$); } } else { // It's possible that on a reconnect, we get an ACK @@ -329,8 +321,8 @@ public class MqttSession { final PUBREL ack = received(new PUBREL().decode(command)); // TODO: perhaps persist the processed list.. otherwise // we can't filter out dups after a broker restart. - session_state.received_message_ids().remove(ack.messageId()); - session_state.strategy().update(new Task() { + session_state.received_message_ids.remove(ack.messageId()); + session_state.strategy.update(new Task() { @Override public void run() { send(new PUBCOMP().messageId(ack.messageId())); @@ -437,12 +429,12 @@ public class MqttSession { @Override public int send_buffer_size() { - return handler.codec().getReadBufferSize(); + return handler.codec.getReadBufferSize(); } @Override public Option<BrokerConnection> connection() { - return ScalaSupport.some(handler.connection()); + return Scala2Java.some(handler.connection()); } @Override @@ -453,14 +445,14 @@ public class MqttSession { public void on_mqtt_publish(final PUBLISH publish) { - if ((publish.qos() == QoS.EXACTLY_ONCE) && session_state.received_message_ids().contains(publish.messageId())) { + if ((publish.qos() == QoS.EXACTLY_ONCE) && session_state.received_message_ids.contains(publish.messageId())) { PUBREC response = new PUBREC(); response.messageId(publish.messageId()); send(response); return; } - handler.messages_received().incrementAndGet(); + handler.messages_received.incrementAndGet(); queue.assertExecuting(); MqttProducerRoute route = producerRoutes.get(publish.topicName()); @@ -526,8 +518,8 @@ public class MqttSession { @Override public void run() { // TODO: perhaps persist the processed list.. - session_state.received_message_ids().add(publish.messageId()); - session_state.strategy().update(new Task() { + session_state.received_message_ids.add(publish.messageId()); + session_state.strategy.update(new Task() { @Override public void run() { PUBREC response = new PUBREC(); @@ -555,7 +547,7 @@ public class MqttSession { delivery.message_$eq(new RawMessage(publish.payload())); delivery.persistent_$eq(publish.qos().ordinal() > 0); delivery.size_$eq(publish.payload().length); - delivery.ack_$eq(ScalaSupport.toScala(ack)); + delivery.ack_$eq(Scala2Java.toScala(ack)); if (publish.retain()) { if (delivery.size() == 0) { delivery.retain_$eq(RetainRemove$.MODULE$); @@ -599,7 +591,7 @@ public class MqttSession { @Override public Option<BrokerConnection> connection() { - return handler != null ? ScalaSupport.some(handler.connection()) : ScalaSupport.<BrokerConnection>none(); + return handler != null ? Scala2Java.some(handler.connection()) : Scala2Java.<BrokerConnection>none(); } @Override @@ -629,7 +621,7 @@ public class MqttSession { delivery.retain_$eq(RetainSet$.MODULE$); } } - delivery.ack_$eq(ScalaSupport.toScala(new UnitFn2<DeliveryResult, StoreUOW>() { + delivery.ack_$eq(Scala2Java.toScala(new UnitFn2<DeliveryResult, StoreUOW>() { @Override public void call(DeliveryResult x, StoreUOW y) { host().dispatch_queue().execute(new Task() { @@ -641,7 +633,7 @@ public class MqttSession { complete_close.run(); } })); - handler.messages_received().incrementAndGet(); + handler.messages_received.incrementAndGet(); prodcuer.offer(delivery); } } @@ -664,7 +656,7 @@ public class MqttSession { queue.execute(new Task() { @Override public void run() { - session_state.strategy().update(new Task() { + session_state.strategy.update(new Task() { @Override public void run() { SUBACK suback = new SUBACK(); @@ -688,11 +680,11 @@ public class MqttSession { } public void subscribe(Collection<Topic> topics, final Task on_subscribed) { - final ArrayList<BindAddress> addresses = ScalaSupport.map(topics, new Fn1<Topic, BindAddress>() { + final ArrayList<BindAddress> addresses = Scala2Java.map(topics, new Fn1<Topic, BindAddress>() { @Override public BindAddress apply(Topic topic) { BindAddress address = decode_destination(topic.name()); - session_state.subscriptions().put(topic.name(), new Tuple2<Topic, BindAddress>(topic, address)); + session_state.subscriptions.put(topic.name(), new Tuple2<Topic, BindAddress>(topic, address)); mqtt_consumer().addresses.put(address, topic.qos()); if (PathParser.containsWildCards(address.path())) { mqtt_consumer().wildcards.put(address.path(), topic.qos()); @@ -702,12 +694,12 @@ public class MqttSession { }); - handler.subscription_count_$eq(mqtt_consumer().addresses.size()); + handler.subscription_count = mqtt_consumer().addresses.size(); if (!clean_session) { Set<BindAddress> bindAddressSet = mqtt_consumer().addresses.keySet(); SubscriptionAddress durable_sub = new SubscriptionAddress(Path$.MODULE$.create(client_id.toString()), null, bindAddressSet.toArray(new BindAddress[bindAddressSet.size()])); - session_state.durable_sub_$eq(durable_sub); + session_state.durable_sub = durable_sub; addresses.clear(); addresses.add(durable_sub); } @@ -716,7 +708,7 @@ public class MqttSession { @Override public void run() { for (BindAddress address : addresses) { - host().router().bind(new BindAddress[]{address}, mqtt_consumer(), security_context, ScalaSupport.<Option<String>>noopFn1()); + host().router().bind(new BindAddress[]{address}, mqtt_consumer(), security_context, Scala2Java.<Option<String>>noopFn1()); } on_subscribed.run(); } @@ -725,10 +717,10 @@ public class MqttSession { public void on_mqtt_unsubscribe(final UNSUBSCRIBE unsubscribe) { - ArrayList<BindAddress> addressesList = ScalaSupport.flatMap(Arrays.asList(unsubscribe.topics()), new Fn1<UTF8Buffer, Option<BindAddress>>() { + ArrayList<BindAddress> addressesList = Scala2Java.flatMap(Arrays.asList(unsubscribe.topics()), new Fn1<UTF8Buffer, Option<BindAddress>>() { @Override public Option<BindAddress> apply(UTF8Buffer topicName) { - Tuple2<Topic, BindAddress> removed = session_state.subscriptions().remove(topicName); + Tuple2<Topic, BindAddress> removed = session_state.subscriptions.remove(topicName); if (removed != null) { Topic topic = removed._1(); BindAddress address = removed._2(); @@ -736,20 +728,20 @@ public class MqttSession { if (PathParser.containsWildCards(address.path())) { mqtt_consumer().wildcards.remove(address.path(), topic.qos()); } - return ScalaSupport.some(address); + return Scala2Java.some(address); } else { - return ScalaSupport.none(); + return Scala2Java.none(); } } }); final BindAddress[] addresses = addressesList.toArray(new BindAddress[addressesList.size()]); - handler.subscription_count_$eq(mqtt_consumer().addresses.size()); + handler.subscription_count = mqtt_consumer().addresses.size(); if (!clean_session) { Set<BindAddress> bindAddressSet = mqtt_consumer().addresses.keySet(); - session_state.durable_sub_$eq(new SubscriptionAddress(Path$.MODULE$.create(client_id.toString()), null, bindAddressSet.toArray(new BindAddress[bindAddressSet.size()]))); + session_state.durable_sub = new SubscriptionAddress(Path$.MODULE$.create(client_id.toString()), null, bindAddressSet.toArray(new BindAddress[bindAddressSet.size()])); } host().dispatch_queue().execute(new Task() { @@ -759,16 +751,16 @@ public class MqttSession { host().router().unbind(addresses, mqtt_consumer(), false, security_context); } else { if (mqtt_consumer().addresses.isEmpty()) { - host().router().unbind(new BindAddress[]{session_state.durable_sub()}, mqtt_consumer(), true, security_context); - session_state.durable_sub_$eq(null); + host().router().unbind(new BindAddress[]{session_state.durable_sub}, mqtt_consumer(), true, security_context); + session_state.durable_sub = null; } else { - host().router().bind(new BindAddress[]{session_state.durable_sub()}, mqtt_consumer(), security_context, ScalaSupport.<Option<String>>noopFn1()); + host().router().bind(new BindAddress[]{session_state.durable_sub}, mqtt_consumer(), security_context, Scala2Java.<Option<String>>noopFn1()); } } queue.execute(new Task() { @Override public void run() { - session_state.strategy().update(new Task() { + session_state.strategy.update(new Task() { @Override public void run() { UNSUBACK ack = new UNSUBACK(); @@ -847,7 +839,7 @@ public class MqttSession { MutableSink<Request> consumer_sink = new MutableSink<Request>(); { - consumer_sink.downstream_$eq(ScalaSupport.<Sink<Request>>none()); + consumer_sink.downstream_$eq(Scala2Java.<Sink<Request>>none()); } public LongCounter next_seq_id = new LongCounter(0); @@ -862,7 +854,7 @@ public class MqttSession { (value & 0x7FFF)); // the lower 15 bits come for the original seq id. } - CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>> credit_window_filter = new CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>>(consumer_sink.flatMap(ScalaSupport.toScala(new Fn1<Tuple2<Session<Delivery>, Delivery>, Option<Request>>() { + CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>> credit_window_filter = new CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>>(consumer_sink.flatMap(Scala2Java.toScala(new Fn1<Tuple2<Session<Delivery>, Delivery>, Option<Request>>() { public Option<Request> apply(Tuple2<Session<Delivery>, Delivery> event) { queue.assertExecuting(); Session<Delivery> session = event._1(); @@ -875,12 +867,12 @@ public class MqttSession { QoS qos = addresses.get(topic); if (qos == null) { - qos = ScalaSupport.<QoS>head(wildcards.get(topic.path())); + qos = Scala2Java.<QoS>head(wildcards.get(topic.path())); } if (qos == null) { acked(delivery, Consumed$.MODULE$); - return ScalaSupport.none(); + return Scala2Java.none(); } else { PUBLISH publish = new PUBLISH(); publish.topicName(new UTF8Buffer(destination_parser.encode_destination(delivery.sender().head()))); @@ -902,7 +894,7 @@ public class MqttSession { } } - handler.messages_sent().incrementAndGet(); + handler.messages_sent.incrementAndGet(); UnitFn1<DeliveryResult> ack = new UnitFn1<DeliveryResult>() { @Override @@ -925,7 +917,7 @@ public class MqttSession { // A reconnecting client could have acked before // we get dispatched by the durable sub. - if (prev.message() == null) { + if (prev.message == null) { in_flight_publishes.remove(id); acked(delivery, Consumed$.MODULE$); } else { @@ -935,18 +927,18 @@ public class MqttSession { handler.async_die("Client not acking regularly.", null); } } - return ScalaSupport.some(request); + return Scala2Java.some(request); } else { // This callback gets executed once the message // sent to the transport. publish.qos(QoS.AT_MOST_ONCE); - return ScalaSupport.some(new Request((short) 0, publish, ack)); + return Scala2Java.some(new Request((short) 0, publish, ack)); } } } - })), SessionDeliverySizer$.MODULE$); + })), SessionDeliverySizer.INSTANCE); public void acked(Delivery delivery, DeliveryResult result) { @@ -958,7 +950,7 @@ public class MqttSession { } { - credit_window_filter.credit(handler.codec().getWriteBufferSize() * 2, 1); + credit_window_filter.credit(handler.codec.getWriteBufferSize() * 2, 1); } SessionSinkMux<Delivery> session_manager = new SessionSinkMux<Delivery>(credit_window_filter, queue, Delivery$.MODULE$, Integer.MAX_VALUE / 2, receive_buffer_size()) { @@ -984,7 +976,7 @@ public class MqttSession { @Override public Option<BrokerConnection> connection() { - return handler != null ? ScalaSupport.some(handler.connection()) : ScalaSupport.<BrokerConnection>none(); + return handler != null ? Scala2Java.some(handler.connection()) : Scala2Java.<BrokerConnection>none(); } @Override @@ -1051,7 +1043,7 @@ public class MqttSession { } public void dispose() { - session_manager.close(downstream(), ScalaSupport.toScala(new UnitFn1<Delivery>() { + session_manager.close(downstream(), Scala2Java.toScala(new UnitFn1<Delivery>() { @Override public void call(Delivery delivery) { // We have been closed so we have to nak any deliveries.
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSessionManager.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSessionManager.java?rev=1461905&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSessionManager.java (added) +++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSessionManager.java Wed Mar 27 23:58:58 2013 @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.mqtt; + +import org.apache.activemq.apollo.broker.BindAddress; +import org.apache.activemq.apollo.broker.SimpleAddress; +import org.apache.activemq.apollo.broker.SubscriptionAddress; +import org.apache.activemq.apollo.broker.VirtualHost; +import org.apache.activemq.apollo.broker.store.Store; +import org.apache.activemq.apollo.broker.store.StoreUOW; +import org.apache.activemq.apollo.util.Fn0; +import org.apache.activemq.apollo.util.Scala2Java; +import org.apache.activemq.apollo.util.UnitFn0; +import org.apache.activemq.apollo.util.UnitFn1; +import org.fusesource.hawtbuf.AsciiBuffer; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.UTF8Buffer; +import org.fusesource.hawtbuf.proto.InvalidProtocolBufferException; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.DispatchQueue; +import org.fusesource.hawtdispatch.Task; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import scala.Tuple2; +import scala.collection.Seq; + +import java.util.HashMap; +import java.util.HashSet; + +import static org.fusesource.hawtdispatch.Dispatch.createQueue; + +/** + * Tracks active sessions so that we can ensure that a given + * session id is only associated with once connection + * at a time. If a client tries to establish a 2nd + * connection, the first one will be closed before the session + * is switch to the new connection. + * + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class MqttSessionManager { + + public static final Scala2Java.Logger log = MqttProtocolHandler.log; + + static DispatchQueue queue = createQueue("session manager"); + + interface StorageStrategy { + void update(Task cb); + void destroy(Task cb); + void create(Store store, UTF8Buffer client_id); + } + + static class SessionState { + SubscriptionAddress durable_sub = null; + java.util.HashMap<UTF8Buffer, Tuple2<Topic, BindAddress>> subscriptions = new java.util.HashMap<UTF8Buffer, Tuple2<Topic, BindAddress>>(); + HashSet<Short> received_message_ids = new HashSet<Short>(); + StorageStrategy strategy = new NoopStrategy(); + + class NoopStrategy implements StorageStrategy { + + @Override + public void create(Store store, UTF8Buffer client_id) { + if (store != null) { + strategy = new StoreStrategy(store, client_id); + } + } + + @Override + public void update(Task cb) { + cb.run(); + } + + @Override + public void destroy(Task cb) { + cb.run(); + } + } + + class StoreStrategy implements StorageStrategy { + + public final Store store; + public final UTF8Buffer client_id; + public final UTF8Buffer session_key; + + public StoreStrategy(Store store, UTF8Buffer client_id) { + this.store = store; + this.client_id = client_id; + this.session_key = new UTF8Buffer("mqtt:" + client_id); + } + + @Override + public void create(Store store, UTF8Buffer client_id) { + } + + @Override + public void update(final Task cb) { + StoreUOW uow = store.create_uow(); + SessionPB.Bean session_pb = new SessionPB.Bean(); + session_pb.setClientId(client_id); + for (Short id : received_message_ids) { + session_pb.addReceivedMessageIds(id.intValue()); + } + for (Tuple2<Topic, BindAddress> entry : subscriptions.values()) { + Topic topic = entry._1(); + BindAddress address = entry._2(); + TopicPB.Bean topic_pb = new TopicPB.Bean(); + topic_pb.setName(topic.name()); + topic_pb.setQos(topic.qos().ordinal()); + topic_pb.setAddress(new UTF8Buffer(address.toString())); + session_pb.addSubscriptions(topic_pb); + } + uow.put(session_key, session_pb.freeze().toUnframedBuffer()); + + final DispatchQueue current = Dispatch.getCurrentQueue(); + uow.on_complete(Scala2Java.toScala(new UnitFn0() { + @Override + public void call() { + current.execute(new Task() { + @Override + public void run() { + cb.run(); + } + }); + } + })); + uow.release(); + } + + @Override + public void destroy(final Task cb) { + StoreUOW uow = store.create_uow(); + uow.put(session_key, null); + final DispatchQueue current = Dispatch.getCurrentQueue(); + uow.on_complete(Scala2Java.toScala(new UnitFn0() { + @Override + public void call() { + current.execute(new Task() { + @Override + public void run() { + strategy = new NoopStrategy(); + cb.run(); + } + }); + } + })); + uow.release(); + } + + } + } + + + static public class HostState { + + public final VirtualHost host; + public final HashMap<UTF8Buffer, SessionState> session_states = new HashMap<UTF8Buffer, SessionState>(); + public final HashMap<UTF8Buffer, MqttSession> sessions = new HashMap<UTF8Buffer, MqttSession>(); + public boolean loaded = false; + + public HostState(VirtualHost host) { + this.host = host; + } + + public void on_load(final Task func) { + if (loaded) { + func.run(); + } else { + if (host.store() != null) { + // We load all the persisted session's from the host's store when we are first accessed. + queue.suspend(); + host.store().get_prefixed_map_entries(new AsciiBuffer("mqtt:"), Scala2Java.toScala(new UnitFn1<Seq<Tuple2<Buffer, Buffer>>>() { + @Override + public void call(final Seq<Tuple2<Buffer, Buffer>> entries) { + queue.resume(); + queue.execute(new Task() { + @Override + public void run() { + for (Tuple2<Buffer, Buffer> entry : Scala2Java.toIterable(entries)) { + try { + Buffer value = entry._2(); + SessionPB.Buffer session_pb = SessionPB.FACTORY.parseUnframed(value); + SessionState session_state = new SessionState(); + session_state.strategy.create(host.store(), session_pb.getClientId()); + if (session_pb.hasReceivedMessageIds()) { + for (Integer i : session_pb.getReceivedMessageIdsList()) { + session_state.received_message_ids.add(i.shortValue()); + } + } + if (session_pb.hasSubscriptions()) { + for (TopicPB.Getter sub : session_pb.getSubscriptionsList()) { + SimpleAddress address = SimpleAddress.apply(sub.getAddress().toString()); + Topic topic = new Topic(sub.getName(), QoS.values()[sub.getQos()]); + session_state.subscriptions.put(sub.getName(), new Tuple2<Topic, BindAddress>(topic, address)); + + } + } + session_states.put(session_pb.getClientId(), session_state); + } catch (InvalidProtocolBufferException e) { + log.warn(e, "Could not load a stored MQTT session"); + } + } + loaded = true; + func.run(); + } + }); + } + })); + + } else { + loaded = true; + func.run(); + } + } + } + } + + static public void attach(final VirtualHost host, final UTF8Buffer client_id, final MqttProtocolHandler handler) { + queue.execute(new Task() { + @Override + public void run() { + final HostState host_state = host.plugin_state( + Scala2Java.toScala(new Fn0<HostState>(){ + @Override + public HostState apply() { + return new HostState(host); + } + }), + HostState.class); + host_state.on_load(new Task() { + @Override + public void run() { + MqttSession assignment = host_state.sessions.get(client_id); + if (assignment != null) { + assignment.connect(handler); + } else { + SessionState state; + if (handler.connect_message.cleanSession()) { + state = host_state.session_states.remove(client_id); + if (state == null) { + state = new SessionState(); + } + } else { + state = host_state.session_states.get(client_id); + if (state == null) { + state = new SessionState(); + host_state.session_states.put(client_id, state); + } + } + assignment = new MqttSession(host_state, client_id, state); + assignment.connect(handler); + host_state.sessions.put(client_id, assignment); + } + } + }); + } + }); + } + + static public void disconnect(final HostState host_state, final UTF8Buffer client_id, final MqttProtocolHandler handler) { + queue.execute(new Task() { + @Override + public void run() { + MqttSession assignment = host_state.sessions.get(client_id); + if (assignment != null) { + assignment.disconnect(handler); + } + } + }); + } + + static public void remove(final HostState host_state, final UTF8Buffer client_id) { + queue.execute(new Task() { + @Override + public void run() { + host_state.sessions.remove(client_id); + } + }); + } +} Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/Request.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/Request.java?rev=1461905&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/Request.java (added) +++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/Request.java Wed Mar 27 23:58:58 2013 @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.mqtt; + +import org.apache.activemq.apollo.broker.DeliveryResult; +import org.apache.activemq.apollo.util.UnitFn1; +import org.fusesource.mqtt.codec.MQTTFrame; +import org.fusesource.mqtt.codec.MessageSupport; + +/** + */ +public class Request { + + public final short id; + public final MessageSupport.Message message; + public final UnitFn1<DeliveryResult> ack; + + MQTTFrame frame; + boolean delivered; + + public Request(short id, MessageSupport.Message message, UnitFn1<DeliveryResult> ack) { + this.id = id; + this.message = message; + this.ack = ack; + frame = message==null ? null : message.encode(); + } +} \ No newline at end of file Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/SessionDeliverySizer.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/SessionDeliverySizer.java?rev=1461905&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/SessionDeliverySizer.java (added) +++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/SessionDeliverySizer.java Wed Mar 27 23:58:58 2013 @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.mqtt; + +import org.apache.activemq.apollo.broker.Delivery; +import org.apache.activemq.apollo.broker.Delivery$; +import org.apache.activemq.apollo.broker.Session; +import org.apache.activemq.apollo.broker.Sizer; +import scala.Tuple2; + +/** + */ +class SessionDeliverySizer implements Sizer<Tuple2<Session<Delivery>, Delivery>> { + + public static final SessionDeliverySizer INSTANCE = new SessionDeliverySizer(); + + @Override + public int size(Tuple2<Session<Delivery>, Delivery> value) { + return Delivery$.MODULE$.size(value._2()); + } +} Copied: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2Java.java (from r1461839, activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2Java.java?p2=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2Java.java&p1=activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java&r1=1461839&r2=1461905&rev=1461905&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java (original) +++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2Java.java Wed Mar 27 23:58:58 2013 @@ -14,15 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.apollo.mqtt; +package org.apache.activemq.apollo.util; -import org.apache.activemq.apollo.util.Log; -import scala.Function1; -import scala.Function2; -import scala.Option; +import org.apache.activemq.apollo.util.*; +import org.fusesource.hawtbuf.Buffer; +import scala.*; +import scala.collection.JavaConversions$; +import scala.collection.Seq; import scala.collection.immutable.List; import scala.runtime.BoxedUnit; +import java.lang.Boolean; +import java.lang.Long; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -30,8 +33,31 @@ import java.util.Iterator; /** * @author <a href="http://hiramchirino.com">Hiram Chirino</a> */ -public class ScalaSupport { - private static ScalaSupportHelper$ helper = ScalaSupportHelper$.MODULE$; +public class Scala2Java { + private static Scala2JavaHelper$ helper = Scala2JavaHelper$.MODULE$; + + + public static int get(Integer value, int defaultValue) { + if( value!=null ) { + return value.intValue(); + } else { + return defaultValue; + } + } + public static long get(Long value, long defaultValue) { + if( value!=null ) { + return value.longValue(); + } else { + return defaultValue; + } + } + public static boolean get(Boolean value, boolean defaultValue) { + if( value!=null ) { + return value.booleanValue(); + } else { + return defaultValue; + } + } static final Function1<Object,BoxedUnit> NOOP_FN1 = helper.toScala(new UnitFn1<Object>() { @Override @@ -39,10 +65,20 @@ public class ScalaSupport { } }); + public static String toString(Object o) { + return o == null ? null : o.toString(); + } + public static <T1> Function1<T1, BoxedUnit> noopFn1() { return (Function1<T1, BoxedUnit>) NOOP_FN1; } + public static <R> Function0<R> toScala(Fn0<R> func) { + if( func == null ) { + return null; + } + return helper.toScala(func); + } public static <T1, R> Function1<T1, R> toScala(Fn1<T1, R> func) { if( func == null ) { @@ -98,6 +134,9 @@ public class ScalaSupport { return helper.toList(s); } + public static <T> Iterable<T> toIterable(Seq<T> entries) { + return JavaConversions$.MODULE$.asJavaIterable(entries); + } public static class Logger { final Log log; Copied: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2JavaHelper.scala (from r1461839, activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2JavaHelper.scala?p2=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2JavaHelper.scala&p1=activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala&r1=1461839&r2=1461905&rev=1461905&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala (original) +++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2JavaHelper.scala Wed Mar 27 23:58:58 2013 @@ -14,18 +14,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.apollo.mqtt +package org.apache.activemq.apollo.util -import scala.Function1 import org.apache.activemq.apollo.util.Log +import scala.Function1 import scala.runtime.BoxedUnit +abstract class Fn0[+R] { + def apply(): R +} + +class UnitFn0 extends Fn0[BoxedUnit] { + def call() = {} + def apply() = { + call() + BoxedUnit.UNIT; + } +} + abstract class Fn1[-T1,+R] { def apply(v1: T1): R } -class UnitFn1[-T1] extends Fn1[T1, BoxedUnit] { - def call(v1: T1) = {} +abstract class UnitFn1[-T1] extends Fn1[T1, BoxedUnit] { + def call(v1: T1) def apply(v1: T1) = { call(v1) BoxedUnit.UNIT; @@ -51,7 +63,8 @@ class UnitFn2[-T1,-T2] extends Fn2[T1,T2 * Time: 3:27 PM * To change this template use File | Settings | File Templates. */ -object ScalaSupportHelper { +object Scala2JavaHelper { + def toScala[R](func:Fn0[R]):Function0[R] = () => { func.apply() } def toScala[T1,R](func:Fn1[T1,R]):Function1[T1,R] = (v1:T1) => { func.apply(v1) } def toScala[T1,T2,R](func:Fn2[T1,T2,R]):Function2[T1,T2,R] = (v1:T1, v2:T2) => { func.apply(v1,v2) } def none[T]:Option[T] = None
