osmith has submitted this change. ( https://gerrit.osmocom.org/c/osmo-s1gw/+/37036?usp=email )
Change subject: Initial SCTP proxy (server/client) implementation ...................................................................... Initial SCTP proxy (server/client) implementation Change-Id: Ia317f58f7dcbec42930165fdcd42d0ddd23e289c --- A .gitignore A config/sys.config A rebar.config A rebar.lock A src/osmo_s1gw.app.src A src/osmo_s1gw_app.erl A src/sctp_client.erl A src/sctp_proxy.erl A src/sctp_server.erl 9 files changed, 480 insertions(+), 0 deletions(-) Approvals: osmith: Looks good to me, approved; Verified diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..83aa88b --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +_build +*.beam diff --git a/config/sys.config b/config/sys.config new file mode 100644 index 0000000..072f183 --- /dev/null +++ b/config/sys.config @@ -0,0 +1,8 @@ +%% -*- erlang -*- + +[{kernel, + [{logger_level, info}, + {logger, + [{handler, default, logger_std_h, + #{ level => debug, formatter => {logger_formatter, + #{ template => ["[", level, "] ", pid, " ", file, ":", line, " ", msg, "\n"] }}}}]}]}]. diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..7d7b6f2 --- /dev/null +++ b/rebar.config @@ -0,0 +1,15 @@ +%% -*- erlang -*- + +{erl_opts, [debug_info, {parse_transform}]}. + +{minimum_otp_vsn, "25.2.3"}. + +{deps, []}. + +{xref_checks, [undefined_function_calls, undefined_functions, + deprecated_function_calls, deprecated_functions]}. + +{dialyzer, [ + {plt_extra_apps, [kernel, stdlib, erts, tools, inets, compiler]}, + {warnings, [no_improper_lists]} +]}. diff --git a/rebar.lock b/rebar.lock new file mode 100644 index 0000000..57afcca --- /dev/null +++ b/rebar.lock @@ -0,0 +1 @@ +[]. diff --git a/src/osmo_s1gw.app.src b/src/osmo_s1gw.app.src new file mode 100644 index 0000000..bc2a8d7 --- /dev/null +++ b/src/osmo_s1gw.app.src @@ -0,0 +1,14 @@ +%-*- mode: erlang -*- + +{application, osmo_s1gw, [ + {description, "Osmocom S1AP Gateway"}, + {vsn, "1"}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {modules, []}, + {mod, {osmo_s1gw_app, []}}, + {env, []} +]}. diff --git a/src/osmo_s1gw_app.erl b/src/osmo_s1gw_app.erl new file mode 100644 index 0000000..596b3e8 --- /dev/null +++ b/src/osmo_s1gw_app.erl @@ -0,0 +1,11 @@ +-module(osmo_s1gw_app). +-behaviour(application). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + sctp_server:start_link(). + %% TODO: osmo_s1gw_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/sctp_client.erl b/src/sctp_client.erl new file mode 100644 index 0000000..67c7069 --- /dev/null +++ b/src/sctp_client.erl @@ -0,0 +1,42 @@ +-module(sctp_client). + +-export([connect/0, + connect/1, + connect/2, + send_data/2, + disconnect/1]). + +-include_lib("kernel/include/inet.hrl"). +-include_lib("kernel/include/inet_sctp.hrl"). + +-define(S1AP_PORT, 36412). +-define(S1AP_PPID, 18). +-define(SCTP_STREAM, 0). + +%% ------------------------------------------------------------------ +%% public API +%% ------------------------------------------------------------------ + +connect() -> + connect(localhost). + +connect(Host) -> + connect(Host, ?S1AP_PORT). + +connect(Host, Port) -> + {ok, Sock} = gen_sctp:open([{type, seqpacket}, + {active, true}]), + gen_sctp:connect_init(Sock, Host, Port, []), + {ok, Sock}. + + +send_data({Sock, Aid}, Data) -> + gen_sctp:send(Sock, #sctp_sndrcvinfo{stream = ?SCTP_STREAM, + ppid = ?S1AP_PPID, + assoc_id = Aid}, Data). + + +disconnect({Sock, Aid}) -> + gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Aid}). + +%% vim:set ts=4 sw=4 et: diff --git a/src/sctp_proxy.erl b/src/sctp_proxy.erl new file mode 100644 index 0000000..e09720c --- /dev/null +++ b/src/sctp_proxy.erl @@ -0,0 +1,177 @@ +-module(sctp_proxy). +-behaviour(gen_statem). + +-export([init/1, + callback_mode/0, + connecting/3, + connected/3, + code_change/4, + terminate/3]). +-export([start_link/3, + send_data/2, + shutdown/1]). + +-include_lib("kernel/include/inet.hrl"). +-include_lib("kernel/include/inet_sctp.hrl"). + +%% ------------------------------------------------------------------ +%% public API +%% ------------------------------------------------------------------ + +start_link(Aid, MmeAddr, MmePort) -> + gen_statem:start_link(?MODULE, [Aid, MmeAddr, MmePort], []). + + +send_data(Pid, Data) -> + gen_statem:cast(Pid, {send_data, Data}). + + +shutdown(Pid) -> + gen_statem:stop(Pid). + + +%% ------------------------------------------------------------------ +%% gen_statem API +%% ------------------------------------------------------------------ + +init([Aid, MmeAddr, MmePort]) -> + {ok, connecting, + #{enb_aid => Aid, + mme_addr => MmeAddr, + mmr_port => MmePort, + tx_queue => []}}. + + +callback_mode() -> + [state_functions, state_enter]. + + +%% CONNECTING state +connecting(enter, OldState, + #{mme_addr := MmeAddr, mmr_port := MmePort} = S) -> + logger:info("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]), + %% Initiate connection establishment with the MME + {ok, Sock} = sctp_client:connect(MmeAddr, MmePort), + {keep_state, S#{sock => Sock}, + [{state_timeout, 2_000_000, conn_est_timeout}]}; + +%% Handle connection establishment timeout +connecting(state_timeout, conn_est_timeout, _S) -> + {stop, {shutdown, conn_est_timeout}}; + +%% Handle an eNB -> MME data forwarding request (queue) +connecting(cast, {send_data, Data}, + #{tx_queue := Pending} = S) -> + {keep_state, S#{tx_queue := [Data | Pending]}}; + +%% Handle an #sctp_assoc_change event (connection state) +connecting(info, {sctp, _Socket, MmeAddr, MmePort, + {[], #sctp_assoc_change{state = ConnState, + assoc_id = Aid}}}, S) -> + case ConnState of + comm_up -> + logger:notice("MME connection (id=~p, ~p:~p) established", + [Aid, MmeAddr, MmePort]), + {next_state, connected, S#{mme_aid => Aid}}; + _ -> + logger:notice("MME connection establishment failed: ~p", [ConnState]), + {stop, {shutdown, conn_est_fail}} + end; + +%% Catch-all for other kinds of SCTP events +connecting(info, {sctp, _Socket, MmeAddr, MmePort, + {AncData, Data}}, S) -> + logger:debug("Unhandled SCTP event (~p:~p): ~p, ~p", + [MmeAddr, MmePort, AncData, Data]), + {keep_state, S}; + +connecting(Event, EventData, S) -> + logger:error("Unexpected event ~p: ~p", [Event, EventData]), + {keep_state, S}. + + +%% CONNECTED state +connected(enter, OldState, S) -> + logger:info("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]), + %% Send pending eNB -> MME messages (if any) + ok = sctp_send_pending(S), + {keep_state, maps:remove(tx_queue, S)}; + +%% Handle an eNB -> MME data forwarding request (forward) +connected(cast, {send_data, Data}, S) -> + ok = sctp_send(S, Data), + {keep_state, S}; + +%% Handle an #sctp_assoc_change event (MME connection state) +connected(info, {sctp, _Socket, MmeAddr, MmePort, + {[], #sctp_assoc_change{state = ConnState, + assoc_id = Aid}}}, S) -> + case ConnState of + comm_up -> + logger:notice("MME connection (id=~p, ~p:~p) is already established?!?", + [Aid, MmeAddr, MmePort]), + {keep_state, S}; + _ -> + logger:notice("MME connection state: ~p", [ConnState]), + {stop, {shutdown, conn_fail}} + end; + +%% Handle an #sctp_sndrcvinfo event (MME -> eNB data) +connected(info, {sctp, _Socket, MmeAddr, MmePort, + {[#sctp_sndrcvinfo{assoc_id = Aid}], Data}}, S) -> + logger:info("MME connection (id=~p, ~p:~p) Rx ~p", + [Aid, MmeAddr, MmePort, Data]), + sctp_server:send_data(maps:get(enb_aid, S), Data), + {keep_state, S}; + +%% Catch-all for other kinds of SCTP events +connected(info, {sctp, _Socket, MmeAddr, MmePort, + {AncData, Data}}, S) -> + logger:debug("Unhandled SCTP event (~p:~p): ~p, ~p", + [MmeAddr, MmePort, AncData, Data]), + {keep_state, S}; + +%% Catch-all handler for this state +connected(Event, EventData, S) -> + logger:error("Unexpected event ~p: ~p", [Event, EventData]), + {keep_state, S}. + + + +code_change(_Vsn, State, S, _Extra) -> + {ok, State, S}. + + +terminate(Reason, State, S) -> + logger:notice("Terminating in state ~p, reason ~p", [State, Reason]), + case S of + #{sock := Sock, mme_aid := Aid} -> + sctp_client:disconnect({Sock, Aid}), + gen_sctp:close(Sock); + #{sock := Sock} -> + gen_sctp:close(Sock); + _ -> ok + end. + + +%% ------------------------------------------------------------------ +%% private API +%% ------------------------------------------------------------------ + +%% Send a single message to the MME +sctp_send(#{sock := Sock, mme_aid := Aid}, Data) -> + sctp_client:send_data({Sock, Aid}, Data). + + +%% Send pending messages to the MME +sctp_send_pending(#{tx_queue := Pending} = S) -> + sctp_send_pending(S, lists:reverse(Pending)). + +sctp_send_pending(S, [Data | Pending]) -> + sctp_send(S, Data), + sctp_send_pending(S, Pending); + +sctp_send_pending(_S, []) -> + ok. + +%% vim:set ts=4 sw=4 et: diff --git a/src/sctp_server.erl b/src/sctp_server.erl new file mode 100644 index 0000000..883991a --- /dev/null +++ b/src/sctp_server.erl @@ -0,0 +1,202 @@ +-module(sctp_server). +-behaviour(gen_server). + +-export([init/1, + handle_info/2, + handle_call/3, + handle_cast/2, + terminate/2]). +-export([start_link/0, + start_link/2]). +-export([send_data/2]). + +-include_lib("kernel/include/inet.hrl"). +-include_lib("kernel/include/inet_sctp.hrl"). + +-define(S1AP_PORT, 36412). +-define(S1AP_PPID, 18). +-define(SCTP_STREAM, 0). + +-record(server_state, {sock, clients}). +-record(client_state, {addr_port, pid}). + +%% ------------------------------------------------------------------ +%% public API +%% ------------------------------------------------------------------ + +start_link() -> + start_link({127,0,0,1}, ?S1AP_PORT). + +start_link(BindAddr, BindPort) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, + [BindAddr, BindPort], []). + + +send_data(Aid, Data) -> + gen_server:cast(?MODULE, {send_data, Aid, Data}). + + +%% ------------------------------------------------------------------ +%% gen_server API +%% ------------------------------------------------------------------ + +init([BindAddr, BindPort]) -> + process_flag(trap_exit, true), + {ok, Sock} = gen_sctp:open([{ip, BindAddr}, + {port, BindPort}, + {type, seqpacket}, + {reuseaddr, true}, + {active, true}]), + logger:info("SCTP server listening on ~w:~w", [BindAddr, BindPort]), + ok = gen_sctp:listen(Sock, true), + {ok, #server_state{sock = Sock, + clients = dict:new()}}. + + +handle_call(Info, _From, State) -> + error_logger:error_report(["unknown handle_call()", + {module, ?MODULE}, {info, Info}, {state, State}]), + {reply, error, not_implemented}. + + +handle_cast({send_data, Aid, Data}, State) -> + gen_sctp:send(State#server_state.sock, + #sctp_sndrcvinfo{stream = ?SCTP_STREAM, + ppid = ?S1AP_PPID, + assoc_id = Aid}, Data), + {noreply, State}; + +handle_cast(Info, State) -> + error_logger:error_report(["unknown handle_cast()", + {module, ?MODULE}, {info, Info}, {state, State}]), + {noreply, State}. + + +%% Handle SCTP events coming from gen_sctp module +handle_info({sctp, _Socket, FromAddr, FromPort, {AncData, Data}}, State) -> + NewState = sctp_recv(State, {FromAddr, FromPort, AncData, Data}), + {noreply, NewState}; + +%% Handle termination events of the child processes +handle_info({'EXIT', Pid, Reason}, + #server_state{sock = Sock, clients = Clients} = State) -> + logger:debug("Child process ~p terminated with reason ~p", [Pid, Reason]), + case client_find(State, Pid) of + {ok, {Aid, _Client}} -> + %% gracefully close the eNB connection + gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Aid}), + {noreply, State#server_state{clients = dict:erase(Aid, Clients)}}; + error -> + {noreply, State} + end; + +%% Catch-all for unknown messages +handle_info(Info, State) -> + error_logger:error_report(["unknown handle_info()", + {module, ?MODULE}, {info, Info}, {state, State}]), + {noreply, State}. + + +terminate(shutdown, State) -> + logger:notice("Terminating ~p", [?MODULE]), + close_conns(State), + gen_sctp:close(State#server_state.sock), + ok. + +%% ------------------------------------------------------------------ +%% private API +%% ------------------------------------------------------------------ + +%% Handle an #sctp_assoc_change event (connection state) +sctp_recv(State, {FromAddr, FromPort, [], + #sctp_assoc_change{state = ConnState, + assoc_id = Aid}}) -> + case ConnState of + comm_up -> + logger:notice("Connection (id=~p, ~p:~p) established", [Aid, FromAddr, FromPort]), + Clients = client_add(State#server_state.clients, Aid, FromAddr, FromPort); + shutdown_comp -> + logger:notice("Connection (id=~p, ~p:~p) closed", [Aid, FromAddr, FromPort]), + Clients = client_del(State#server_state.clients, Aid); + comm_lost -> + logger:notice("Connection (id=~p, ~p:~p) lost", [Aid, FromAddr, FromPort]), + Clients = client_del(State#server_state.clients, Aid); + _ -> + logger:notice("Connection (id=~p, ~p:~p) state ~p", + [Aid, FromAddr, FromPort, ConnState]), + Clients = State#server_state.clients + end, + State#server_state{clients = Clients}; + +%% Handle an #sctp_sndrcvinfo event (incoming data) +sctp_recv(State, {FromAddr, FromPort, + [#sctp_sndrcvinfo{assoc_id = Aid}], Data}) -> + logger:info("Connection (id=~p, ~p:~p) Rx ~p", [Aid, FromAddr, FromPort, Data]), + case dict:find(Aid, State#server_state.clients) of + {ok, #client_state{pid = Pid}} -> + sctp_proxy:send_data(Pid, Data); + error -> + logger:error("Connection (id=~p, ~p:~p) is not known to us?!?", + [Aid, FromAddr, FromPort]) + end, + State; + +%% Catch-all for other kinds of SCTP events +sctp_recv(State, {FromAddr, FromPort, AncData, Data}) -> + logger:debug("Unhandled SCTP event (~p:~p): ~p, ~p", + [FromAddr, FromPort, AncData, Data]), + State. + + +%% Add a new client to the list, spawning a proxy process +client_add(Clients, Aid, FromAddr, FromPort) -> + {ok, Pid} = sctp_proxy:start_link(Aid, {127,0,1,1}, ?S1AP_PORT), %% XXX! + NewClient = #client_state{addr_port = {FromAddr, FromPort}, pid = Pid}, + dict:store(Aid, NewClient, Clients). + + +%% Delete an existing client from the list, stopping the proxy process +client_del(Clients, Aid) -> + case dict:find(Aid, Clients) of + {ok, Client} -> + sctp_proxy:shutdown(Client#client_state.pid), + dict:erase(Aid, Clients); + error -> + Clients + end. + + +%% Find a client by process ID +client_find(#server_state{clients = Clients}, Pid) -> + client_find(dict:to_list(Clients), Pid); + +client_find([{Aid, Client} | Clients], Pid) -> + case Client of + #client_state{pid = Pid} -> + {ok, {Aid, Client}}; + _ -> + client_find(Clients, Pid) + end; + +client_find([], _Pid) -> + error. + + +%% Gracefully terminate client connections +close_conns(#server_state{sock = Sock, clients = Clients}) -> + close_conns(Sock, dict:to_list(Clients)). + +close_conns(Sock, [{Aid, Client} | Clients]) -> + {FromAddr, FromPort} = Client#client_state.addr_port, + logger:notice("Terminating connection (id=~p, ~p:~p)", [Aid, FromAddr, FromPort]), + %% request to terminate an MME connection + sctp_proxy:shutdown(Client#client_state.pid), + %% gracefully close an eNB connection + gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Aid}), + %% ... and so for the remaining clients + close_conns(Sock, Clients); + +close_conns(_Sock, []) -> + ok. + +%% vim:set ts=4 sw=4 et: -- To view, visit https://gerrit.osmocom.org/c/osmo-s1gw/+/37036?usp=email To unsubscribe, or for help writing mail filters, visit https://gerrit.osmocom.org/settings Gerrit-Project: osmo-s1gw Gerrit-Branch: master Gerrit-Change-Id: Ia317f58f7dcbec42930165fdcd42d0ddd23e289c Gerrit-Change-Number: 37036 Gerrit-PatchSet: 1 Gerrit-Owner: osmith <osm...@sysmocom.de> Gerrit-Reviewer: osmith <osm...@sysmocom.de> Gerrit-CC: fixeria <vyanits...@sysmocom.de> Gerrit-MessageType: merged