(2014/05/22 20:23), Satoshi Kobayashi wrote:
> - Topology change is notified
>    - JSON-RPC/WebSocket
> 
> Signed-off-by: Satoshi Kobayashi <[email protected]>
> ---
>   ryu/app/ws_topology.py |  105 
> ++++++++++++++++++++++++++++++++++++++++++++++++
>   ryu/app/wsgi.py        |   34 +++++++++++++++-
>   2 files changed, 138 insertions(+), 1 deletions(-)
>   create mode 100644 ryu/app/ws_topology.py
> 
> diff --git a/ryu/app/ws_topology.py b/ryu/app/ws_topology.py
> new file mode 100644
> index 0000000..4ca0f6d
> --- /dev/null
> +++ b/ryu/app/ws_topology.py
> @@ -0,0 +1,105 @@
> +# Copyright (C) 2014 Stratosphere Inc.
> +#
> +# Licensed under the Apache License, Version 2.0 (the "License");
> +# you may not use this file except in compliance with the License.
> +# You may obtain a copy of the License at
> +#
> +#    http://www.apache.org/licenses/LICENSE-2.0
> +#
> +# Unless required by applicable law or agreed to in writing, software
> +# distributed under the License is distributed on an "AS IS" BASIS,
> +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> +# implied.
> +# See the License for the specific language governing permissions and
> +# limitations under the License.
> +
> +"""
> +Usage example
> +
> +1. Run this application:
> +$ ryu-manager --verbose --observe-links ryu.app.ws_topology
> +
> +2. Connect to this application by WebSocket (use your favorite client):
> +$ wscat -c ws://localhost:8080/v1.0/topology/ws
> +
> +3. Join switches (use your favorite method):
> +$ sudo mn --controller=remote --topo linear,2
> +
> +4. Topology change is notified:
> +< {"params": [{"ports": [{"hw_addr": "56:c7:08:12:bb:36", "name": "s1-eth1", 
> "port_no": "00000001", "dpid": "0000000000000001"}, {"hw_addr": 
> "de:b9:49:24:74:3f", "name": "s1-eth2", "port_no": "00000002", "dpid": 
> "0000000000000001"}], "dpid": "0000000000000001"}], "jsonrpc": "2.0", 
> "method": "event_switch_enter", "id": 1}
> +> {"id": 1, "jsonrpc": "2.0", "result": ""}
> +
> +< {"params": [{"ports": [{"hw_addr": "56:c7:08:12:bb:36", "name": "s1-eth1", 
> "port_no": "00000001", "dpid": "0000000000000001"}, {"hw_addr": 
> "de:b9:49:24:74:3f", "name": "s1-eth2", "port_no": "00000002", "dpid": 
> "0000000000000001"}], "dpid": "0000000000000001"}], "jsonrpc": "2.0", 
> "method": "event_switch_leave", "id": 2}
> +> {"id": 2, "jsonrpc": "2.0", "result": ""}
> +...
> +"""
> +
> +from tinyrpc.exc import InvalidReplyError
> +
> +from ryu.app.wsgi import (
> +    ControllerBase,
> +    WSGIApplication,
> +    websocket,
> +    WebSocketRPCClient
> +)
> +from ryu.base import app_manager
> +from ryu.topology import event, switches
> +from ryu.controller.handler import set_ev_cls
> +
> +
> +class WebSocketTopology(app_manager.RyuApp):
> +    _CONTEXTS = {
> +        'wsgi': WSGIApplication,
> +        'switches': switches.Switches,
> +    }
> +
> +    def __init__(self, *args, **kwargs):
> +        super(WebSocketTopology, self).__init__(*args, **kwargs)
> +
> +        self.rpc_clients = []
> +
> +        wsgi = kwargs['wsgi']
> +        wsgi.register(TopologyController, {'app': self})
> +
> +    @set_ev_cls(event.EventSwitchEnter)
> +    def _event_switch_enter_handler(self, ev):
> +        msg = ev.switch.to_dict()
> +        self._rpc_broadcall('event_switch_enter', msg)
> +
> +    @set_ev_cls(event.EventSwitchLeave)
> +    def _event_switch_leave_handler(self, ev):
> +        msg = ev.switch.to_dict()
> +        self._rpc_broadcall('event_switch_leave', msg)
> +
> +    @set_ev_cls(event.EventLinkAdd)
> +    def _event_link_add_handler(self, ev):
> +        msg = ev.link.to_dict()
> +        self._rpc_broadcall('event_link_add', msg)
> +
> +    @set_ev_cls(event.EventLinkDelete)
> +    def _event_link_delete_handler(self, ev):
> +        msg = ev.link.to_dict()
> +        self._rpc_broadcall('event_link_delete', msg)
> +
> +    def _rpc_broadcall(self, func_name, msg):
> +        for rpc_client in self.rpc_clients:
> +            # NOTE: Although broadcasting is desired,
> +            #       RPCClient#get_proxy(one_way=False) does not work well
> +            rpc_server = rpc_client.get_proxy()
> +            try:
> +                getattr(rpc_server, func_name)(msg)
> +            except InvalidReplyError as e:
> +                self.logger.error(e)
> +
> +
> +class TopologyController(ControllerBase):
> +
> +    def __init__(self, req, link, data, **config):
> +        super(TopologyController, self).__init__(req, link, data, **config)
> +        self.app = data['app']
> +
> +    @websocket('topology', '/v1.0/topology/ws')
> +    def _websocket_handler(self, ws):
> +        rpc_client = WebSocketRPCClient(ws)
> +        self.app.rpc_clients.append(rpc_client)
> +        rpc_client.serve_forever()
> diff --git a/ryu/app/wsgi.py b/ryu/app/wsgi.py
> index 6870731..94e67f6 100644
> --- a/ryu/app/wsgi.py
> +++ b/ryu/app/wsgi.py
> @@ -27,7 +27,8 @@ from tinyrpc.server import RPCServer
>   from tinyrpc.dispatch import RPCDispatcher
>   from tinyrpc.dispatch import public as rpc_public
>   from tinyrpc.protocols.jsonrpc import JSONRPCProtocol
> -from tinyrpc.transports import ServerTransport
> +from tinyrpc.transports import ServerTransport, ClientTransport
> +from tinyrpc.client import RPCClient
>   
>   CONF = cfg.CONF
>   CONF.register_cli_opts([
> @@ -159,6 +160,37 @@ class WebSocketRPCServer(RPCServer):
>           hub.spawn(func, *args, **kwargs)
>   
>   
> +class WebSocketClientTransport(ClientTransport):
> +
> +    def __init__(self, ws, queue):
> +        self.ws = ws
> +        self.queue = queue
> +
> +    def send_message(self, message, expect_reply=True):
> +        self.ws.send(unicode(message))
> +
> +        if expect_reply:
> +            return self.queue.get()
> +
> +
> +class WebSocketRPCClient(RPCClient):
> +
> +    def __init__(self, ws):
> +        self.ws = ws
> +        self.queue = hub.Queue()
> +        super(WebSocketRPCClient, self).__init__(
> +            JSONRPCProtocol(),
> +            WebSocketClientTransport(ws, self.queue),
> +        )
> +
> +    def serve_forever(self):
> +        while True:
> +            msg = self.ws.wait()
> +            if msg is None:
> +                break
> +            self.queue.put(msg)
> +
> +
>   class wsgify_hack(webob.dec.wsgify):
>       def __call__(self, environ, start_response):
>           self.kwargs['start_response'] = start_response
> 

Looks good to me.


------------------------------------------------------------------------------
"Accelerate Dev Cycles with Automated Cross-Browser Testing - For FREE
Instantly run your Selenium tests across 300+ browser/OS combos.
Get unparalleled scalability from the best Selenium testing platform available
Simple to use. Nothing to install. Get started now for free."
http://p.sf.net/sfu/SauceLabs
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to