Updated Branches:
  refs/heads/javelin 1b9164139 -> 6fd6b38b4

Add AsyncMethod support


Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/6fd6b38b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/6fd6b38b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/6fd6b38b

Branch: refs/heads/javelin
Commit: 6fd6b38b43300fab4bdbf3f19bb1ad30422eb308
Parents: 1b91641
Author: Kelven Yang <[email protected]>
Authored: Thu Dec 6 15:14:18 2012 -0800
Committer: Kelven Yang <[email protected]>
Committed: Thu Dec 6 15:14:45 2012 -0800

----------------------------------------------------------------------
 .../messaging/AsyncCallbackDispatcher.java         |   70 +++++++++++++++
 .../framework/messaging/AsyncCallbackDriver.java   |   23 +++++
 .../framework/messaging/AsyncCallbackHandler.java  |   23 +++++
 .../messaging/AsyncCompletionCallback.java         |   66 ++++++++++++++
 .../framework/messaging/ComponentContainer.java    |   23 -----
 .../framework/messaging/ComponentEndpoint.java     |   65 -------------
 .../framework/messaging/EventBusEndpoint.java      |   60 ++++++++++++
 .../framework/messaging/EventDispatcher.java       |   32 +++++++-
 .../messaging/InplaceAsyncCallbackDriver.java      |   27 ++++++
 .../framework/messaging/RpcClientCallImpl.java     |    5 +-
 .../framework/messaging/RpcProvider.java           |   13 ++-
 .../framework/messaging/RpcProviderImpl.java       |    4 +-
 .../framework/messaging/RpcServerCallImpl.java     |    2 +-
 .../framework/messaging/RpcServiceDispatcher.java  |   32 +++++++-
 .../framework/messaging/AsyncSampleCallee.java     |   41 +++++++++
 .../framework/messaging/AsyncSampleCaller.java     |   38 ++++++++
 .../framework/messaging/SampleComponent.java       |   15 +++-
 .../cloudstack/framework/messaging/TestVolume.java |    5 +
 18 files changed, 443 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDispatcher.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDispatcher.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDispatcher.java
new file mode 100644
index 0000000..80b9d91
--- /dev/null
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDispatcher.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.messaging;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+public class AsyncCallbackDispatcher {
+       private static Map<Class<?>, Method> s_handlerCache = new 
HashMap<Class<?>, Method>();
+       
+       public static boolean dispatch(Object target, AsyncCompletionCallback 
callback) {
+               assert(callback != null);
+               assert(target != null);
+               
+               Method handler = resolveHandler(target.getClass(), 
callback.getOperationName());
+               if(handler == null)
+                       return false;
+               
+               try {
+                       handler.invoke(target, callback);
+               } catch (IllegalArgumentException e) {
+                       throw new RuntimeException("IllegalArgumentException 
when invoking RPC callback for command: " + callback.getOperationName());
+               } catch (IllegalAccessException e) {
+                       throw new RuntimeException("IllegalAccessException when 
invoking RPC callback for command: " + callback.getOperationName());
+               } catch (InvocationTargetException e) {
+                       throw new RuntimeException("InvocationTargetException 
when invoking RPC callback for command: " + callback.getOperationName());
+               }
+               
+               return true;
+       }
+       
+       public static Method resolveHandler(Class<?> handlerClz, String 
operationName) {
+               synchronized(s_handlerCache) {
+                       Method handler = s_handlerCache.get(handlerClz);
+                       if(handler != null)
+                               return handler;
+                       
+                       for(Method method : handlerClz.getMethods()) {
+                               AsyncCallbackHandler annotation = 
method.getAnnotation(AsyncCallbackHandler.class);
+                               if(annotation != null) {
+                                       
if(annotation.operationName().equals(operationName)) {
+                                               s_handlerCache.put(handlerClz, 
method);
+                                               return method;
+                                       }
+                               }
+                       }
+               }
+               
+               return null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDriver.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDriver.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDriver.java
new file mode 100644
index 0000000..ac9543c
--- /dev/null
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDriver.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messaging;
+
+public interface AsyncCallbackDriver {
+       public void performCompletionCallback(AsyncCompletionCallback callback);
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackHandler.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackHandler.java
new file mode 100644
index 0000000..59cd75e
--- /dev/null
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackHandler.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messaging;
+
+public @interface AsyncCallbackHandler {
+       String operationName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCompletionCallback.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCompletionCallback.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCompletionCallback.java
new file mode 100644
index 0000000..d0bd47b
--- /dev/null
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCompletionCallback.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messaging;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AsyncCompletionCallback { 
+       private Map<String, Object> _contextMap = new HashMap<String, Object>();
+       private String _operationName;
+       private Object _targetObject;
+       
+       public AsyncCompletionCallback(Object target) {
+               _targetObject = target;
+       }
+       
+       public AsyncCompletionCallback setContextParam(String key, Object 
param) {
+               // ???
+               return this;
+       }
+       
+       public AsyncCompletionCallback attachDriver(AsyncCallbackDriver driver) 
{
+               // ???
+               return this;
+       }
+       
+       public AsyncCompletionCallback setOperationName(String name) {
+               _operationName = name;
+               return this;
+       }
+       
+       public String getOperationName() {
+               return _operationName;
+       }
+       
+       public <T> T getContextParam(String key) {
+               // ???
+               return null;
+       }
+       
+       public void complete(Object resultObject) {
+               ///
+       }
+       
+       public <T> T getResult() {
+               
+               // ???
+               return null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java
deleted file mode 100644
index c5828ae..0000000
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.messaging;
-
-public interface ComponentContainer {
-       ComponentEndpoint wire(ComponentEndpoint endpoint, String 
predefinedAddress);
-}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java
deleted file mode 100644
index 8c9b7cd..0000000
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.messaging;
-
-import org.apache.log4j.Logger;
-
-public class ComponentEndpoint implements RpcServiceEndpoint, Subscriber {
-    private static final Logger s_logger = 
Logger.getLogger(ComponentEndpoint.class);
-       
-       private TransportEndpoint transportEndpoint;
-       private RpcProvider rpcProvider;
-       
-       public ComponentEndpoint() {
-       }
-       
-       public TransportEndpoint getTransportEndpoint() {
-               return transportEndpoint;
-       }
-
-       public void setTransportEndpoint(TransportEndpoint transportEndpoint) {
-               this.transportEndpoint = transportEndpoint;
-       }
-
-       public RpcProvider getRpcProvider() {
-               return rpcProvider;
-       }
-
-       public void setRpcProvider(RpcProvider rpcProvider) {
-               this.rpcProvider = rpcProvider;
-       }
-       
-       public void initialize() {
-               rpcProvider.registerRpcServiceEndpoint(this);
-       }
-
-       @Override
-       public boolean onCallReceive(RpcServerCall call) {
-               return RpcServiceDispatcher.dispatch(this, call);
-       }
-       
-       @Override
-       public void onPublishEvent(String subject, String senderAddress, Object 
args) {
-               try {
-                       EventDispatcher.dispatch(this, subject, senderAddress, 
args);
-               } catch(RuntimeException e) {
-                       s_logger.error("Unhandled exception", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusEndpoint.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusEndpoint.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusEndpoint.java
new file mode 100644
index 0000000..b51fb6d
--- /dev/null
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusEndpoint.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.messaging;
+
+public class EventBusEndpoint {
+       private EventBus _eventBus;
+       private String _sender;
+       private PublishScope _scope;
+       
+       public EventBusEndpoint(EventBus eventBus, String sender, PublishScope 
scope) {
+               _eventBus = eventBus;
+               _sender = sender;
+               _scope = scope;
+       }
+       
+       public EventBusEndpoint setEventBus(EventBus eventBus) {
+               _eventBus = eventBus;
+               return this;
+       }
+       
+       public EventBusEndpoint setScope(PublishScope scope) {
+               _scope = scope;
+               return this;
+       }
+       
+       public PublishScope getScope() {
+               return _scope;
+       }
+       
+       public EventBusEndpoint setSender(String sender) {
+               _sender = sender;
+               return this;
+       }
+       
+       public String getSender() {
+               return _sender;
+       }
+       
+       public void Publish(String subject, Object args) {
+               assert(_eventBus != null);
+               _eventBus.publish(_sender, subject, _scope, args);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java
index ec2afb4..debc993 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java
@@ -23,9 +23,39 @@ import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Map;
 
-public class EventDispatcher {
+public class EventDispatcher implements Subscriber {
        private static Map<Class<?>, Method> s_handlerCache = new 
HashMap<Class<?>, Method>();
        
+       private static Map<Object, EventDispatcher> s_targetMap = new 
HashMap<Object, EventDispatcher>();
+       private Object _targetObject;
+       
+       public EventDispatcher(Object targetObject) {
+               _targetObject = targetObject;
+       }
+       
+       @Override
+       public void onPublishEvent(String senderAddress, String subject, Object 
args) {
+               dispatch(_targetObject, subject, senderAddress, args);
+       }
+       
+       public static EventDispatcher getDispatcher(Object targetObject) {
+               EventDispatcher dispatcher;
+               synchronized(s_targetMap) {
+                       dispatcher = s_targetMap.get(targetObject);
+                       if(dispatcher == null) {
+                               dispatcher = new EventDispatcher(targetObject);
+                               s_targetMap.put(targetObject, dispatcher);
+                       }
+               }
+               return dispatcher;
+       }
+       
+       public static void removeDispatcher(Object targetObject) {
+               synchronized(s_targetMap) {
+                       s_targetMap.remove(targetObject);
+               }
+       }
+       
        public static boolean dispatch(Object target, String subject, String 
senderAddress, Object args) {
                assert(subject != null);
                assert(target != null);

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/src/org/apache/cloudstack/framework/messaging/InplaceAsyncCallbackDriver.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/InplaceAsyncCallbackDriver.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/InplaceAsyncCallbackDriver.java
new file mode 100644
index 0000000..3cd5e6c
--- /dev/null
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/InplaceAsyncCallbackDriver.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messaging;
+
+public class InplaceAsyncCallbackDriver implements AsyncCallbackDriver {
+
+       @Override
+       public void performCompletionCallback(AsyncCompletionCallback callback) 
{
+               // TODO Auto-generated method stub
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
index ba8dd14..90c56d6 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
@@ -86,9 +86,10 @@ public class RpcClientCallImpl implements RpcClientCall {
                return this;
        }
 
+       @SuppressWarnings("unchecked")
        @Override
-       public Object getContextParam(String key) {
-               return _contextParams.get(key);
+       public <T> T getContextParam(String key) {
+               return (T)_contextParams.get(key);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
index b6d58cf..73d55b2 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
@@ -24,14 +24,17 @@ public interface RpcProvider extends TransportMultiplexier {
        void setMessageSerializer(MessageSerializer messageSerializer);
        MessageSerializer getMessageSerializer();
        
-       void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
-       void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
-
-       RpcClientCall newCall(TransportEndpoint sourceEndpoint, RpcAddressable 
targetAddress);
-       RpcClientCall newCall(TransportEndpoint sourceEndpoint, String 
targetAddress);
+       void registerRpcServiceEndpoint(String serviceAddress, 
RpcServiceEndpoint rpcEndpoint);
+       void unregisteRpcServiceEndpoint(RpcAddressable serviceAddress, 
RpcServiceEndpoint rpcEndpoint);
 
        RpcClientCall newCall(String targetAddress);
        RpcClientCall newCall(RpcAddressable targetAddress);
+       
+       //
+       // low-level public API
+       //
+       RpcClientCall newCall(TransportEndpoint sourceEndpoint, RpcAddressable 
targetAddress);
+       RpcClientCall newCall(TransportEndpoint sourceEndpoint, String 
targetAddress);
 
        void registerCall(RpcClientCall call);
        void cancelCall(RpcClientCall call);

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
index b3d8a4e..cae85ad 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
@@ -69,14 +69,14 @@ public class RpcProviderImpl implements RpcProvider {
        }
 
        @Override
-       public void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) {
+       public void registerRpcServiceEndpoint(String serviceAddress, 
RpcServiceEndpoint rpcEndpoint) {
                synchronized(_serviceEndpoints) {
                        _serviceEndpoints.add(rpcEndpoint);
                }
        }
 
        @Override
-       public void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) 
{
+       public void unregisteRpcServiceEndpoint(RpcAddressable serviceAddress, 
RpcServiceEndpoint rpcEndpoint) {
                synchronized(_serviceEndpoints) {
                        _serviceEndpoints.remove(rpcEndpoint);
                }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallImpl.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallImpl.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallImpl.java
index 75f521f..613669a 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallImpl.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallImpl.java
@@ -42,7 +42,7 @@ public class RpcServerCallImpl implements RpcServerCall {
        }
 
        @Override
-       public Object getCommandArgument() {
+       public <T> T getCommandArgument() {
                if(_requestPdu.getSerializedCommandArg() == null)
                        return null;
                

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceDispatcher.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceDispatcher.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceDispatcher.java
index 1f1d1b9..207fb06 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceDispatcher.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceDispatcher.java
@@ -23,9 +23,34 @@ import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Map;
 
-public class RpcServiceDispatcher {
+public class RpcServiceDispatcher implements RpcServiceEndpoint {
 
        private static Map<Class<?>, Method> s_handlerCache = new 
HashMap<Class<?>, Method>();
+
+       private static Map<Object, RpcServiceDispatcher> s_targetMap = new 
HashMap<Object, RpcServiceDispatcher>();
+       private Object _targetObject;
+       
+       public RpcServiceDispatcher(Object targetObject) {
+               _targetObject = targetObject;
+       }
+       
+       public static RpcServiceDispatcher getDispatcher(Object targetObject) {
+               RpcServiceDispatcher dispatcher;
+               synchronized(s_targetMap) {
+                       dispatcher = s_targetMap.get(targetObject);
+                       if(dispatcher == null) {
+                               dispatcher = new 
RpcServiceDispatcher(targetObject);
+                               s_targetMap.put(targetObject, dispatcher);
+                       }
+               }
+               return dispatcher;
+       }
+       
+       public static void removeDispatcher(Object targetObject) {
+               synchronized(s_targetMap) {
+                       s_targetMap.remove(targetObject);
+               }
+       }
        
        public static boolean dispatch(Object target, RpcServerCall 
serviceCall) {
                assert(serviceCall != null);
@@ -67,4 +92,9 @@ public class RpcServiceDispatcher {
                
                return null;
        }
+
+       @Override
+       public boolean onCallReceive(RpcServerCall call) {
+               return dispatch(_targetObject, call);
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/test/org/apache/cloudstack/framework/messaging/AsyncSampleCallee.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/test/org/apache/cloudstack/framework/messaging/AsyncSampleCallee.java
 
b/framework/ipc/test/org/apache/cloudstack/framework/messaging/AsyncSampleCallee.java
new file mode 100644
index 0000000..a52804b
--- /dev/null
+++ 
b/framework/ipc/test/org/apache/cloudstack/framework/messaging/AsyncSampleCallee.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messaging;
+
+public class AsyncSampleCallee {
+       AsyncSampleCallee _driver;
+
+       public TestVolume createVolume(Object realParam, 
AsyncCompletionCallback callback) {
+               _driver.createVolume(realParam,
+                               new AsyncCompletionCallback(this)
+                                       
.setOperationName("volume.driver.create")
+                                       .setContextParam("dsCompletion", 
callback)
+               );
+               
+               return null;
+       }
+       
+       @AsyncCallbackHandler(operationName="volume.driver.create")
+       public void onDriverCreateVolumeCallback(AsyncCompletionCallback 
driverCompletion) {
+               AsyncCompletionCallback dsCompletionCallback = 
driverCompletion.getContextParam("dsCompletion");
+               
+               String str = driverCompletion.getResult();
+               dsCompletionCallback.complete(str);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/test/org/apache/cloudstack/framework/messaging/AsyncSampleCaller.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/test/org/apache/cloudstack/framework/messaging/AsyncSampleCaller.java
 
b/framework/ipc/test/org/apache/cloudstack/framework/messaging/AsyncSampleCaller.java
new file mode 100644
index 0000000..d1cd4da
--- /dev/null
+++ 
b/framework/ipc/test/org/apache/cloudstack/framework/messaging/AsyncSampleCaller.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messaging;
+
+public class AsyncSampleCaller {
+       AsyncSampleCallee _ds;
+       
+       public void MethodThatWillCallAsyncMethod() {
+               TestVolume vol = new TestVolume();
+               
+               _ds.createVolume(vol,
+                       new 
AsyncCompletionCallback(this).setContextParam("vol", vol));
+       }
+       
+       @AsyncCallbackHandler(operationName="volume.create")
+       public void onCreateVolumeCallback(AsyncCompletionCallback callback) {
+               TestVolume contextVol = callback.getContextParam("vol");
+               
+               TestVolume result = callback.getResult();
+               // do something
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/test/org/apache/cloudstack/framework/messaging/SampleComponent.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/test/org/apache/cloudstack/framework/messaging/SampleComponent.java
 
b/framework/ipc/test/org/apache/cloudstack/framework/messaging/SampleComponent.java
index cd76729..db1f749 100644
--- 
a/framework/ipc/test/org/apache/cloudstack/framework/messaging/SampleComponent.java
+++ 
b/framework/ipc/test/org/apache/cloudstack/framework/messaging/SampleComponent.java
@@ -18,11 +18,24 @@
  */
 package org.apache.cloudstack.framework.messaging;
 
-public class SampleComponent extends ComponentEndpoint {
+public class SampleComponent {
 
+       RpcProvider _rpcProvider;
+       EventBus _eventBus;
+       
        public SampleComponent() {
        }
        
+       public void init() {
+               
+               _rpcProvider.registerRpcServiceEndpoint("AgentManager", 
+                       RpcServiceDispatcher.getDispatcher(this));
+               
+               // subscribe to all network events (for example)
+               _eventBus.subscribe("network", 
+                       EventDispatcher.getDispatcher(this));
+       }
+       
        @RpcServiceHandler(command="StartCommand")
        void onStartCommand(RpcServerCall call) {
                call.completeCall("Call response");

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/6fd6b38b/framework/ipc/test/org/apache/cloudstack/framework/messaging/TestVolume.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/test/org/apache/cloudstack/framework/messaging/TestVolume.java 
b/framework/ipc/test/org/apache/cloudstack/framework/messaging/TestVolume.java
new file mode 100644
index 0000000..3001ee5
--- /dev/null
+++ 
b/framework/ipc/test/org/apache/cloudstack/framework/messaging/TestVolume.java
@@ -0,0 +1,5 @@
+package org.apache.cloudstack.framework.messaging;
+
+public class TestVolume {
+
+}

Reply via email to