osmith has uploaded this change for review. ( 
https://gerrit.osmocom.org/c/osmo-s1gw/+/37036?usp=email )


Change subject: Initial SCTP proxy (server/client) implementation
......................................................................

Initial SCTP proxy (server/client) implementation

Change-Id: Ia317f58f7dcbec42930165fdcd42d0ddd23e289c
---
A .gitignore
A config/sys.config
A rebar.config
A rebar.lock
A src/osmo_s1gw.app.src
A src/osmo_s1gw_app.erl
A src/sctp_client.erl
A src/sctp_proxy.erl
A src/sctp_server.erl
9 files changed, 480 insertions(+), 0 deletions(-)



  git pull ssh://gerrit.osmocom.org:29418/osmo-s1gw refs/changes/36/37036/1

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..83aa88b
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+_build
+*.beam
diff --git a/config/sys.config b/config/sys.config
new file mode 100644
index 0000000..072f183
--- /dev/null
+++ b/config/sys.config
@@ -0,0 +1,8 @@
+%% -*- erlang -*-
+
+[{kernel,
+  [{logger_level, info},
+   {logger,
+    [{handler, default, logger_std_h,
+      #{ level => debug, formatter => {logger_formatter,
+        #{ template => ["[", level, "] ", pid, " ", file, ":", line, " ", msg, 
"\n"] }}}}]}]}].
diff --git a/rebar.config b/rebar.config
new file mode 100644
index 0000000..7d7b6f2
--- /dev/null
+++ b/rebar.config
@@ -0,0 +1,15 @@
+%% -*- erlang -*-
+
+{erl_opts, [debug_info, {parse_transform}]}.
+
+{minimum_otp_vsn, "25.2.3"}.
+
+{deps, []}.
+
+{xref_checks, [undefined_function_calls, undefined_functions,
+              deprecated_function_calls, deprecated_functions]}.
+
+{dialyzer, [
+       {plt_extra_apps, [kernel, stdlib, erts, tools, inets, compiler]},
+       {warnings, [no_improper_lists]}
+]}.
diff --git a/rebar.lock b/rebar.lock
new file mode 100644
index 0000000..57afcca
--- /dev/null
+++ b/rebar.lock
@@ -0,0 +1 @@
+[].
diff --git a/src/osmo_s1gw.app.src b/src/osmo_s1gw.app.src
new file mode 100644
index 0000000..bc2a8d7
--- /dev/null
+++ b/src/osmo_s1gw.app.src
@@ -0,0 +1,14 @@
+%-*- mode: erlang -*-
+
+{application, osmo_s1gw, [
+       {description, "Osmocom S1AP Gateway"},
+       {vsn, "1"},
+       {registered, []},
+       {applications, [
+                       kernel,
+                       stdlib
+                       ]},
+       {modules, []},
+       {mod, {osmo_s1gw_app, []}},
+       {env, []}
+]}.
diff --git a/src/osmo_s1gw_app.erl b/src/osmo_s1gw_app.erl
new file mode 100644
index 0000000..596b3e8
--- /dev/null
+++ b/src/osmo_s1gw_app.erl
@@ -0,0 +1,11 @@
+-module(osmo_s1gw_app).
+-behaviour(application).
+
+-export([start/2, stop/1]).
+
+start(_StartType, _StartArgs) ->
+       sctp_server:start_link().
+       %% TODO: osmo_s1gw_sup:start_link().
+
+stop(_State) ->
+       ok.
diff --git a/src/sctp_client.erl b/src/sctp_client.erl
new file mode 100644
index 0000000..67c7069
--- /dev/null
+++ b/src/sctp_client.erl
@@ -0,0 +1,42 @@
+-module(sctp_client).
+
+-export([connect/0,
+         connect/1,
+         connect/2,
+         send_data/2,
+         disconnect/1]).
+
+-include_lib("kernel/include/inet.hrl").
+-include_lib("kernel/include/inet_sctp.hrl").
+
+-define(S1AP_PORT, 36412).
+-define(S1AP_PPID, 18).
+-define(SCTP_STREAM, 0).
+
+%% ------------------------------------------------------------------
+%% public API
+%% ------------------------------------------------------------------
+
+connect() ->
+    connect(localhost).
+
+connect(Host) ->
+    connect(Host, ?S1AP_PORT).
+
+connect(Host, Port) ->
+    {ok, Sock} = gen_sctp:open([{type, seqpacket},
+                                {active, true}]),
+    gen_sctp:connect_init(Sock, Host, Port, []),
+    {ok, Sock}.
+
+
+send_data({Sock, Aid}, Data) ->
+    gen_sctp:send(Sock, #sctp_sndrcvinfo{stream = ?SCTP_STREAM,
+                                         ppid = ?S1AP_PPID,
+                                         assoc_id = Aid}, Data).
+
+
+disconnect({Sock, Aid}) ->
+    gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Aid}).
+
+%% vim:set ts=4 sw=4 et:
diff --git a/src/sctp_proxy.erl b/src/sctp_proxy.erl
new file mode 100644
index 0000000..e09720c
--- /dev/null
+++ b/src/sctp_proxy.erl
@@ -0,0 +1,177 @@
+-module(sctp_proxy).
+-behaviour(gen_statem).
+
+-export([init/1,
+         callback_mode/0,
+         connecting/3,
+         connected/3,
+         code_change/4,
+         terminate/3]).
+-export([start_link/3,
+         send_data/2,
+         shutdown/1]).
+
+-include_lib("kernel/include/inet.hrl").
+-include_lib("kernel/include/inet_sctp.hrl").
+
+%% ------------------------------------------------------------------
+%% public API
+%% ------------------------------------------------------------------
+
+start_link(Aid, MmeAddr, MmePort) ->
+    gen_statem:start_link(?MODULE, [Aid, MmeAddr, MmePort], []).
+
+
+send_data(Pid, Data) ->
+    gen_statem:cast(Pid, {send_data, Data}).
+
+
+shutdown(Pid) ->
+    gen_statem:stop(Pid).
+
+
+%% ------------------------------------------------------------------
+%% gen_statem API
+%% ------------------------------------------------------------------
+
+init([Aid, MmeAddr, MmePort]) ->
+    {ok, connecting,
+     #{enb_aid => Aid,
+       mme_addr => MmeAddr,
+       mmr_port => MmePort,
+       tx_queue => []}}.
+
+
+callback_mode() ->
+    [state_functions, state_enter].
+
+
+%% CONNECTING state
+connecting(enter, OldState,
+           #{mme_addr := MmeAddr, mmr_port := MmePort} = S) ->
+    logger:info("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]),
+    %% Initiate connection establishment with the MME
+    {ok, Sock} = sctp_client:connect(MmeAddr, MmePort),
+    {keep_state, S#{sock => Sock},
+     [{state_timeout, 2_000_000, conn_est_timeout}]};
+
+%% Handle connection establishment timeout
+connecting(state_timeout, conn_est_timeout, _S) ->
+    {stop, {shutdown, conn_est_timeout}};
+
+%% Handle an eNB -> MME data forwarding request (queue)
+connecting(cast, {send_data, Data},
+           #{tx_queue := Pending} = S) ->
+    {keep_state, S#{tx_queue := [Data | Pending]}};
+
+%% Handle an #sctp_assoc_change event (connection state)
+connecting(info, {sctp, _Socket, MmeAddr, MmePort,
+                  {[], #sctp_assoc_change{state = ConnState,
+                                          assoc_id = Aid}}}, S) ->
+    case ConnState of
+        comm_up ->
+            logger:notice("MME connection (id=~p, ~p:~p) established",
+                          [Aid, MmeAddr, MmePort]),
+            {next_state, connected, S#{mme_aid => Aid}};
+        _ ->
+            logger:notice("MME connection establishment failed: ~p", 
[ConnState]),
+            {stop, {shutdown, conn_est_fail}}
+    end;
+
+%% Catch-all for other kinds of SCTP events
+connecting(info, {sctp, _Socket, MmeAddr, MmePort,
+                  {AncData, Data}}, S) ->
+    logger:debug("Unhandled SCTP event (~p:~p): ~p, ~p",
+                 [MmeAddr, MmePort, AncData, Data]),
+    {keep_state, S};
+
+connecting(Event, EventData, S) ->
+    logger:error("Unexpected event ~p: ~p", [Event, EventData]),
+    {keep_state, S}.
+
+
+%% CONNECTED state
+connected(enter, OldState, S) ->
+    logger:info("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]),
+    %% Send pending eNB -> MME messages (if any)
+    ok = sctp_send_pending(S),
+    {keep_state, maps:remove(tx_queue, S)};
+
+%% Handle an eNB -> MME data forwarding request (forward)
+connected(cast, {send_data, Data}, S) ->
+    ok = sctp_send(S, Data),
+    {keep_state, S};
+
+%% Handle an #sctp_assoc_change event (MME connection state)
+connected(info, {sctp, _Socket, MmeAddr, MmePort,
+                 {[], #sctp_assoc_change{state = ConnState,
+                                         assoc_id = Aid}}}, S) ->
+    case ConnState of
+        comm_up ->
+            logger:notice("MME connection (id=~p, ~p:~p) is already 
established?!?",
+                          [Aid, MmeAddr, MmePort]),
+            {keep_state, S};
+        _ ->
+            logger:notice("MME connection state: ~p", [ConnState]),
+            {stop, {shutdown, conn_fail}}
+    end;
+
+%% Handle an #sctp_sndrcvinfo event (MME -> eNB data)
+connected(info, {sctp, _Socket, MmeAddr, MmePort,
+                 {[#sctp_sndrcvinfo{assoc_id = Aid}], Data}}, S) ->
+    logger:info("MME connection (id=~p, ~p:~p) Rx ~p",
+                [Aid, MmeAddr, MmePort, Data]),
+    sctp_server:send_data(maps:get(enb_aid, S), Data),
+    {keep_state, S};
+
+%% Catch-all for other kinds of SCTP events
+connected(info, {sctp, _Socket, MmeAddr, MmePort,
+                 {AncData, Data}}, S) ->
+    logger:debug("Unhandled SCTP event (~p:~p): ~p, ~p",
+                 [MmeAddr, MmePort, AncData, Data]),
+    {keep_state, S};
+
+%% Catch-all handler for this state
+connected(Event, EventData, S) ->
+    logger:error("Unexpected event ~p: ~p", [Event, EventData]),
+    {keep_state, S}.
+
+
+
+code_change(_Vsn, State, S, _Extra) ->
+    {ok, State, S}.
+
+
+terminate(Reason, State, S) ->
+    logger:notice("Terminating in state ~p, reason ~p", [State, Reason]),
+    case S of
+        #{sock := Sock, mme_aid := Aid} ->
+            sctp_client:disconnect({Sock, Aid}),
+            gen_sctp:close(Sock);
+        #{sock := Sock} ->
+            gen_sctp:close(Sock);
+        _ -> ok
+    end.
+
+
+%% ------------------------------------------------------------------
+%% private API
+%% ------------------------------------------------------------------
+
+%% Send a single message to the MME
+sctp_send(#{sock := Sock, mme_aid := Aid}, Data) ->
+    sctp_client:send_data({Sock, Aid}, Data).
+
+
+%% Send pending messages to the MME
+sctp_send_pending(#{tx_queue := Pending} = S) ->
+    sctp_send_pending(S, lists:reverse(Pending)).
+
+sctp_send_pending(S, [Data | Pending]) ->
+    sctp_send(S, Data),
+    sctp_send_pending(S, Pending);
+
+sctp_send_pending(_S, []) ->
+    ok.
+
+%% vim:set ts=4 sw=4 et:
diff --git a/src/sctp_server.erl b/src/sctp_server.erl
new file mode 100644
index 0000000..883991a
--- /dev/null
+++ b/src/sctp_server.erl
@@ -0,0 +1,202 @@
+-module(sctp_server).
+-behaviour(gen_server).
+
+-export([init/1,
+         handle_info/2,
+         handle_call/3,
+         handle_cast/2,
+         terminate/2]).
+-export([start_link/0,
+         start_link/2]).
+-export([send_data/2]).
+
+-include_lib("kernel/include/inet.hrl").
+-include_lib("kernel/include/inet_sctp.hrl").
+
+-define(S1AP_PORT, 36412).
+-define(S1AP_PPID, 18).
+-define(SCTP_STREAM, 0).
+
+-record(server_state, {sock, clients}).
+-record(client_state, {addr_port, pid}).
+
+%% ------------------------------------------------------------------
+%% public API
+%% ------------------------------------------------------------------
+
+start_link() ->
+    start_link({127,0,0,1}, ?S1AP_PORT).
+
+start_link(BindAddr, BindPort) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE,
+                          [BindAddr, BindPort], []).
+
+
+send_data(Aid, Data) ->
+    gen_server:cast(?MODULE, {send_data, Aid, Data}).
+
+
+%% ------------------------------------------------------------------
+%% gen_server API
+%% ------------------------------------------------------------------
+
+init([BindAddr, BindPort]) ->
+    process_flag(trap_exit, true),
+    {ok, Sock} = gen_sctp:open([{ip, BindAddr},
+                                {port, BindPort},
+                                {type, seqpacket},
+                                {reuseaddr, true},
+                                {active, true}]),
+    logger:info("SCTP server listening on ~w:~w", [BindAddr, BindPort]),
+    ok = gen_sctp:listen(Sock, true),
+    {ok, #server_state{sock = Sock,
+                       clients = dict:new()}}.
+
+
+handle_call(Info, _From, State) ->
+    error_logger:error_report(["unknown handle_call()",
+                               {module, ?MODULE}, {info, Info}, {state, 
State}]),
+    {reply, error, not_implemented}.
+
+
+handle_cast({send_data, Aid, Data}, State) ->
+    gen_sctp:send(State#server_state.sock,
+                  #sctp_sndrcvinfo{stream = ?SCTP_STREAM,
+                                   ppid = ?S1AP_PPID,
+                                   assoc_id = Aid}, Data),
+    {noreply, State};
+
+handle_cast(Info, State) ->
+    error_logger:error_report(["unknown handle_cast()",
+                               {module, ?MODULE}, {info, Info}, {state, 
State}]),
+    {noreply, State}.
+
+
+%% Handle SCTP events coming from gen_sctp module
+handle_info({sctp, _Socket, FromAddr, FromPort, {AncData, Data}}, State) ->
+    NewState = sctp_recv(State, {FromAddr, FromPort, AncData, Data}),
+    {noreply, NewState};
+
+%% Handle termination events of the child processes
+handle_info({'EXIT', Pid, Reason},
+            #server_state{sock = Sock, clients = Clients} = State) ->
+    logger:debug("Child process ~p terminated with reason ~p", [Pid, Reason]),
+    case client_find(State, Pid) of
+        {ok, {Aid, _Client}} ->
+            %% gracefully close the eNB connection
+            gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Aid}),
+            {noreply, State#server_state{clients = dict:erase(Aid, Clients)}};
+        error ->
+            {noreply, State}
+    end;
+
+%% Catch-all for unknown messages
+handle_info(Info, State) ->
+    error_logger:error_report(["unknown handle_info()",
+                               {module, ?MODULE}, {info, Info}, {state, 
State}]),
+    {noreply, State}.
+
+
+terminate(shutdown, State) ->
+    logger:notice("Terminating ~p", [?MODULE]),
+    close_conns(State),
+    gen_sctp:close(State#server_state.sock),
+    ok.
+
+%% ------------------------------------------------------------------
+%% private API
+%% ------------------------------------------------------------------
+
+%% Handle an #sctp_assoc_change event (connection state)
+sctp_recv(State, {FromAddr, FromPort, [],
+                  #sctp_assoc_change{state = ConnState,
+                                     assoc_id = Aid}}) ->
+    case ConnState of
+        comm_up ->
+            logger:notice("Connection (id=~p, ~p:~p) established", [Aid, 
FromAddr, FromPort]),
+            Clients = client_add(State#server_state.clients, Aid, FromAddr, 
FromPort);
+        shutdown_comp ->
+            logger:notice("Connection (id=~p, ~p:~p) closed", [Aid, FromAddr, 
FromPort]),
+            Clients = client_del(State#server_state.clients, Aid);
+        comm_lost ->
+            logger:notice("Connection (id=~p, ~p:~p) lost", [Aid, FromAddr, 
FromPort]),
+            Clients = client_del(State#server_state.clients, Aid);
+        _ ->
+            logger:notice("Connection (id=~p, ~p:~p) state ~p",
+                          [Aid, FromAddr, FromPort, ConnState]),
+            Clients = State#server_state.clients
+    end,
+    State#server_state{clients = Clients};
+
+%% Handle an #sctp_sndrcvinfo event (incoming data)
+sctp_recv(State, {FromAddr, FromPort,
+                  [#sctp_sndrcvinfo{assoc_id = Aid}], Data}) ->
+    logger:info("Connection (id=~p, ~p:~p) Rx ~p", [Aid, FromAddr, FromPort, 
Data]),
+    case dict:find(Aid, State#server_state.clients) of
+        {ok, #client_state{pid = Pid}} ->
+            sctp_proxy:send_data(Pid, Data);
+        error ->
+            logger:error("Connection (id=~p, ~p:~p) is not known to us?!?",
+                         [Aid, FromAddr, FromPort])
+    end,
+    State;
+
+%% Catch-all for other kinds of SCTP events
+sctp_recv(State, {FromAddr, FromPort, AncData, Data}) ->
+    logger:debug("Unhandled SCTP event (~p:~p): ~p, ~p",
+                 [FromAddr, FromPort, AncData, Data]),
+    State.
+
+
+%% Add a new client to the list, spawning a proxy process
+client_add(Clients, Aid, FromAddr, FromPort) ->
+    {ok, Pid} = sctp_proxy:start_link(Aid, {127,0,1,1}, ?S1AP_PORT), %% XXX!
+    NewClient = #client_state{addr_port = {FromAddr, FromPort}, pid = Pid},
+    dict:store(Aid, NewClient, Clients).
+
+
+%% Delete an existing client from the list, stopping the proxy process
+client_del(Clients, Aid) ->
+    case dict:find(Aid, Clients) of
+        {ok, Client} ->
+            sctp_proxy:shutdown(Client#client_state.pid),
+            dict:erase(Aid, Clients);
+        error ->
+            Clients
+    end.
+
+
+%% Find a client by process ID
+client_find(#server_state{clients = Clients}, Pid) ->
+    client_find(dict:to_list(Clients), Pid);
+
+client_find([{Aid, Client} | Clients], Pid) ->
+    case Client of
+        #client_state{pid = Pid} ->
+            {ok, {Aid, Client}};
+        _ ->
+            client_find(Clients, Pid)
+    end;
+
+client_find([], _Pid) ->
+    error.
+
+
+%% Gracefully terminate client connections
+close_conns(#server_state{sock = Sock, clients = Clients}) ->
+    close_conns(Sock, dict:to_list(Clients)).
+
+close_conns(Sock, [{Aid, Client} | Clients]) ->
+    {FromAddr, FromPort} = Client#client_state.addr_port,
+    logger:notice("Terminating connection (id=~p, ~p:~p)", [Aid, FromAddr, 
FromPort]),
+    %% request to terminate an MME connection
+    sctp_proxy:shutdown(Client#client_state.pid),
+    %% gracefully close an eNB connection
+    gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Aid}),
+    %% ... and so for the remaining clients
+    close_conns(Sock, Clients);
+
+close_conns(_Sock, []) ->
+    ok.
+
+%% vim:set ts=4 sw=4 et:

--
To view, visit https://gerrit.osmocom.org/c/osmo-s1gw/+/37036?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings

Gerrit-Project: osmo-s1gw
Gerrit-Branch: master
Gerrit-Change-Id: Ia317f58f7dcbec42930165fdcd42d0ddd23e289c
Gerrit-Change-Number: 37036
Gerrit-PatchSet: 1
Gerrit-Owner: osmith <osm...@sysmocom.de>
Gerrit-CC: fixeria <vyanits...@sysmocom.de>
Gerrit-MessageType: newchange

Reply via email to