laforge has submitted this change. ( https://gerrit.osmocom.org/c/pysim/+/36972?usp=email )
( 5 is the latest approved patch-set. No files were changed between the latest approved patch-set and the submitted one. )Change subject: esim.es2p: Split generic part of HTTP/REST API from ES2+ ...................................................................... esim.es2p: Split generic part of HTTP/REST API from ES2+ This way we can reuse it for other eSIM RSP HTTP interfaces like ES9+, ES11, ... Change-Id: I468041da40a88875e8df15b04d3ad508e06f16f7 --- M pySim/esim/es2p.py A pySim/esim/http_json_api.py 2 files changed, 275 insertions(+), 234 deletions(-) Approvals: fixeria: Looks good to me, approved Jenkins Builder: Verified osmith: Looks good to me, but someone else must approve diff --git a/pySim/esim/es2p.py b/pySim/esim/es2p.py index fa21d2c..a73dd0c 100644 --- a/pySim/esim/es2p.py +++ b/pySim/esim/es2p.py @@ -15,98 +15,16 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import abc import requests import logging -import json from datetime import datetime import time -import base64 + +from pySim.esim.http_json_api import * logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -class ApiParam(abc.ABC): - """A class reprsenting a single parameter in the ES2+ API.""" - @classmethod - def verify_decoded(cls, data): - """Verify the decoded reprsentation of a value. Should raise an exception if somthing is odd.""" - pass - - @classmethod - def verify_encoded(cls, data): - """Verify the encoded reprsentation of a value. Should raise an exception if somthing is odd.""" - pass - - @classmethod - def encode(cls, data): - """[Validate and] Encode the given value.""" - cls.verify_decoded(data) - encoded = cls._encode(data) - cls.verify_decoded(encoded) - return encoded - - @classmethod - def _encode(cls, data): - """encoder function, typically [but not always] overridden by derived class.""" - return data - - @classmethod - def decode(cls, data): - """[Validate and] Decode the given value.""" - cls.verify_encoded(data) - decoded = cls._decode(data) - cls.verify_decoded(decoded) - return decoded - - @classmethod - def _decode(cls, data): - """decoder function, typically [but not always] overridden by derived class.""" - return data - -class ApiParamString(ApiParam): - """Base class representing an API parameter of 'string' type.""" - pass - - -class ApiParamInteger(ApiParam): - """Base class representing an API parameter of 'integer' type.""" - @classmethod - def _decode(cls, data): - return int(data) - - @classmethod - def _encode(cls, data): - return str(data) - - @classmethod - def verify_decoded(cls, data): - if not isinstance(data, int): - raise TypeError('Expected an integer input data type') - - @classmethod - def verify_encoded(cls, data): - if isinstance(data, int): - return - if not data.isdecimal(): - raise ValueError('integer (%s) contains non-decimal characters' % data) - assert str(int(data)) == data - -class ApiParamBoolean(ApiParam): - """Base class representing an API parameter of 'boolean' type.""" - @classmethod - def _encode(cls, data): - return bool(data) - -class ApiParamFqdn(ApiParam): - """String, as a list of domain labels concatenated using the full stop (dot, period) character as - separator between labels. Labels are restricted to the Alphanumeric mode character set defined in table 5 - of ISO/IEC 18004""" - @classmethod - def verify_encoded(cls, data): - # FIXME - pass - class param: class Iccid(ApiParamString): """String representation of 19 or 20 digits, where the 20th digit MAY optionally be the padding @@ -172,9 +90,6 @@ class SmdsAddress(ApiParamFqdn): pass - class SmdpAddress(ApiParamFqdn): - pass - class ReleaseFlag(ApiParamBoolean): pass @@ -197,150 +112,12 @@ class NotificationPointStatus(ApiParam): pass - class ResultData(ApiParam): - @classmethod - def _decode(cls, data): - return base64.b64decode(data) + class ResultData(ApiParamBase64): + pass - @classmethod - def _encode(cls, data): - return base64.b64encode(data) - - class JsonResponseHeader(ApiParam): - """SGP.22 section 6.5.1.4.""" - @classmethod - def verify_decoded(cls, data): - fe_status = data.get('functionExecutionStatus') - if not fe_status: - raise ValueError('Missing mandatory functionExecutionStatus in header') - status = fe_status.get('status') - if not status: - raise ValueError('Missing mandatory status in header functionExecutionStatus') - if status not in ['Executed-Success', 'Executed-WithWarning', 'Failed', 'Expired']: - raise ValueError('Unknown/unspecified status "%s"' % status) - - -class HttpStatusError(Exception): - pass - -class HttpHeaderError(Exception): - pass - -class Es2PlusApiError(Exception): - """Exception representing an error at the ES2+ API level (status != Executed).""" - def __init__(self, func_ex_status: dict): - self.status = func_ex_status['status'] - sec = { - 'subjectCode': None, - 'reasonCode': None, - 'subjectIdentifier': None, - 'message': None, - } - actual_sec = func_ex_status.get('statusCodeData', None) - sec.update(actual_sec) - self.subject_code = sec['subjectCode'] - self.reason_code = sec['reasonCode'] - self.subject_id = sec['subjectIdentifier'] - self.message = sec['message'] - - def __str__(self): - return f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")' - -class Es2PlusApiFunction(abc.ABC): +class Es2PlusApiFunction(JsonHttpApiFunction): """Base classs for representing an ES2+ API Function.""" - # the below class variables are expected to be overridden in derived classes - - path = None - # dictionary of input parameters. key is parameter name, value is ApiParam class - input_params = {} - # list of mandatory input parameters - input_mandatory = [] - # dictionary of output parameters. key is parameter name, value is ApiParam class - output_params = {} - # list of mandatory output parameters (for successful response) - output_mandatory = [] - # expected HTTP status code of the response - expected_http_status = 200 - # the HTTP method used (GET, OPTIONS, HEAD, POST, PUT, PATCH or DELETE) - http_method = 'POST' - - def __init__(self, url_prefix: str, func_req_id: str, session): - self.url_prefix = url_prefix - self.func_req_id = func_req_id - self.session = session - - def encode(self, data: dict, func_call_id: str) -> dict: - """Validate an encode input dict into JSON-serializable dict for request body.""" - output = { - 'header': { - 'functionRequesterIdentifier': self.func_req_id, - 'functionCallIdentifier': func_call_id - } - } - for p in self.input_mandatory: - if not p in data: - raise ValueError('Mandatory input parameter %s missing' % p) - for p, v in data.items(): - p_class = self.input_params.get(p) - if not p_class: - logger.warning('Unexpected/unsupported input parameter %s=%s', p, v) - output[p] = v - else: - output[p] = p_class.encode(v) - return output - - - def decode(self, data: dict) -> dict: - """[further] Decode and validate the JSON-Dict of the respnse body.""" - output = {} - # let's first do the header, it's special - if not 'header' in data: - raise ValueError('Mandatory output parameter "header" missing') - hdr_class = self.output_params.get('header') - output['header'] = hdr_class.decode(data['header']) - - if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']: - raise Es2PlusApiError(output['header']['functionExecutionStatus']) - # we can only expect mandatory parameters to be present in case of successful execution - for p in self.output_mandatory: - if p == 'header': - continue - if not p in data: - raise ValueError('Mandatory output parameter "%s" missing' % p) - for p, v in data.items(): - p_class = self.output_params.get(p) - if not p_class: - logger.warning('Unexpected/unsupported output parameter "%s"="%s"', p, v) - output[p] = v - else: - output[p] = p_class.decode(v) - return output - - def call(self, data: dict, func_call_id:str, timeout=10) -> dict: - """Make an API call to the ES2+ API endpoint represented by this object. - Input data is passed in `data` as json-serializable dict. Output data - is returned as json-deserialized dict.""" - url = self.url_prefix + self.path - encoded = json.dumps(self.encode(data, func_call_id)) - headers = { - 'Content-Type': 'application/json', - 'X-Admin-Protocol': 'gsma/rsp/v2.5.0', - } - - logger.debug("HTTP REQ %s - '%s'" % (url, encoded)) - response = self.session.request(self.http_method, url, data=encoded, headers=headers, timeout=timeout) - logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code, response.headers)) - logger.debug("HTTP RSP: %s" % (response.content)) - - if response.status_code != self.expected_http_status: - raise HttpStatusError(response) - if not response.headers.get('Content-Type').startswith(headers['Content-Type']): - raise HttpHeaderError(response) - if not response.headers.get('X-Admin-Protocol', 'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'): - raise HttpHeaderError(response) - - return self.decode(response.json()) - + pass # ES2+ DownloadOrder function (SGP.22 section 5.3.1) class DownloadOrder(Es2PlusApiFunction): @@ -351,7 +128,7 @@ 'profileType': param.ProfileType } output_params = { - 'header': param.JsonResponseHeader, + 'header': JsonResponseHeader, 'iccid': param.Iccid, } output_mandatory = ['header', 'iccid'] @@ -369,10 +146,10 @@ } input_mandatory = ['iccid', 'releaseFlag'] output_params = { - 'header': param.JsonResponseHeader, + 'header': JsonResponseHeader, 'eid': param.Eid, 'matchingId': param.MatchingId, - 'smdpAddress': param.SmdpAddress, + 'smdpAddress': SmdpAddress, } output_mandatory = ['header', 'matchingId'] @@ -387,7 +164,7 @@ } input_mandatory = ['finalProfileStatusIndicator', 'iccid'] output_params = { - 'header': param.JsonResponseHeader, + 'header': JsonResponseHeader, } output_mandatory = ['header'] @@ -399,7 +176,7 @@ } input_mandatory = ['iccid'] output_params = { - 'header': param.JsonResponseHeader, + 'header': JsonResponseHeader, } output_mandatory = ['header'] diff --git a/pySim/esim/http_json_api.py b/pySim/esim/http_json_api.py new file mode 100644 index 0000000..d396b46 --- /dev/null +++ b/pySim/esim/http_json_api.py @@ -0,0 +1,252 @@ +"""GSMA eSIM RSP HTTP/REST/JSON interface according to SGP.22 v2.5""" + +# (C) 2024 by Harald Welte <lafo...@osmocom.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import abc +import requests +import logging +import json +import base64 + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + +class ApiParam(abc.ABC): + """A class reprsenting a single parameter in the API.""" + @classmethod + def verify_decoded(cls, data): + """Verify the decoded reprsentation of a value. Should raise an exception if somthing is odd.""" + pass + + @classmethod + def verify_encoded(cls, data): + """Verify the encoded reprsentation of a value. Should raise an exception if somthing is odd.""" + pass + + @classmethod + def encode(cls, data): + """[Validate and] Encode the given value.""" + cls.verify_decoded(data) + encoded = cls._encode(data) + cls.verify_decoded(encoded) + return encoded + + @classmethod + def _encode(cls, data): + """encoder function, typically [but not always] overridden by derived class.""" + return data + + @classmethod + def decode(cls, data): + """[Validate and] Decode the given value.""" + cls.verify_encoded(data) + decoded = cls._decode(data) + cls.verify_decoded(decoded) + return decoded + + @classmethod + def _decode(cls, data): + """decoder function, typically [but not always] overridden by derived class.""" + return data + +class ApiParamString(ApiParam): + """Base class representing an API parameter of 'string' type.""" + pass + + +class ApiParamInteger(ApiParam): + """Base class representing an API parameter of 'integer' type.""" + @classmethod + def _decode(cls, data): + return int(data) + + @classmethod + def _encode(cls, data): + return str(data) + + @classmethod + def verify_decoded(cls, data): + if not isinstance(data, int): + raise TypeError('Expected an integer input data type') + + @classmethod + def verify_encoded(cls, data): + if isinstance(data, int): + return + if not data.isdecimal(): + raise ValueError('integer (%s) contains non-decimal characters' % data) + assert str(int(data)) == data + +class ApiParamBoolean(ApiParam): + """Base class representing an API parameter of 'boolean' type.""" + @classmethod + def _encode(cls, data): + return bool(data) + +class ApiParamFqdn(ApiParam): + """String, as a list of domain labels concatenated using the full stop (dot, period) character as + separator between labels. Labels are restricted to the Alphanumeric mode character set defined in table 5 + of ISO/IEC 18004""" + @classmethod + def verify_encoded(cls, data): + # FIXME + pass + +class ApiParamBase64(ApiParam): + @classmethod + def _decode(cls, data): + return base64.b64decode(data) + + @classmethod + def _encode(cls, data): + return base64.b64encode(data).decode('ascii') + +class SmdpAddress(ApiParamFqdn): + pass + +class JsonResponseHeader(ApiParam): + """SGP.22 section 6.5.1.4.""" + @classmethod + def verify_decoded(cls, data): + fe_status = data.get('functionExecutionStatus') + if not fe_status: + raise ValueError('Missing mandatory functionExecutionStatus in header') + status = fe_status.get('status') + if not status: + raise ValueError('Missing mandatory status in header functionExecutionStatus') + if status not in ['Executed-Success', 'Executed-WithWarning', 'Failed', 'Expired']: + raise ValueError('Unknown/unspecified status "%s"' % status) + + +class HttpStatusError(Exception): + pass + +class HttpHeaderError(Exception): + pass + +class ApiError(Exception): + """Exception representing an error at the API level (status != Executed).""" + def __init__(self, func_ex_status: dict): + self.status = func_ex_status['status'] + sec = { + 'subjectCode': None, + 'reasonCode': None, + 'subjectIdentifier': None, + 'message': None, + } + actual_sec = func_ex_status.get('statusCodeData', None) + sec.update(actual_sec) + self.subject_code = sec['subjectCode'] + self.reason_code = sec['reasonCode'] + self.subject_id = sec['subjectIdentifier'] + self.message = sec['message'] + + def __str__(self): + return f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")' + +class JsonHttpApiFunction(abc.ABC): + """Base classs for representing an HTTP[s] API Function.""" + # the below class variables are expected to be overridden in derived classes + + path = None + # dictionary of input parameters. key is parameter name, value is ApiParam class + input_params = {} + # list of mandatory input parameters + input_mandatory = [] + # dictionary of output parameters. key is parameter name, value is ApiParam class + output_params = {} + # list of mandatory output parameters (for successful response) + output_mandatory = [] + # expected HTTP status code of the response + expected_http_status = 200 + # the HTTP method used (GET, OPTIONS, HEAD, POST, PUT, PATCH or DELETE) + http_method = 'POST' + + def __init__(self, url_prefix: str, func_req_id: str, session: requests.Session): + self.url_prefix = url_prefix + self.func_req_id = func_req_id + self.session = session + + def encode(self, data: dict, func_call_id: str) -> dict: + """Validate an encode input dict into JSON-serializable dict for request body.""" + output = { + 'header': { + 'functionRequesterIdentifier': self.func_req_id, + 'functionCallIdentifier': func_call_id + } + } + for p in self.input_mandatory: + if not p in data: + raise ValueError('Mandatory input parameter %s missing' % p) + for p, v in data.items(): + p_class = self.input_params.get(p) + if not p_class: + logger.warning('Unexpected/unsupported input parameter %s=%s', p, v) + output[p] = v + else: + output[p] = p_class.encode(v) + return output + + def decode(self, data: dict) -> dict: + """[further] Decode and validate the JSON-Dict of the respnse body.""" + output = {} + # let's first do the header, it's special + if not 'header' in data: + raise ValueError('Mandatory output parameter "header" missing') + hdr_class = self.output_params.get('header') + output['header'] = hdr_class.decode(data['header']) + + if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']: + raise ApiError(output['header']['functionExecutionStatus']) + # we can only expect mandatory parameters to be present in case of successful execution + for p in self.output_mandatory: + if p == 'header': + continue + if not p in data: + raise ValueError('Mandatory output parameter "%s" missing' % p) + for p, v in data.items(): + p_class = self.output_params.get(p) + if not p_class: + logger.warning('Unexpected/unsupported output parameter "%s"="%s"', p, v) + output[p] = v + else: + output[p] = p_class.decode(v) + return output + + def call(self, data: dict, func_call_id:str, timeout=10) -> dict: + """Make an API call to the HTTP API endpoint represented by this object. + Input data is passed in `data` as json-serializable dict. Output data + is returned as json-deserialized dict.""" + url = self.url_prefix + self.path + encoded = json.dumps(self.encode(data, func_call_id)) + headers = { + 'Content-Type': 'application/json', + 'X-Admin-Protocol': 'gsma/rsp/v2.5.0', + } + + logger.debug("HTTP REQ %s - '%s'" % (url, encoded)) + response = self.session.request(self.http_method, url, data=encoded, headers=headers, timeout=timeout) + logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code, response.headers)) + logger.debug("HTTP RSP: %s" % (response.content)) + + if response.status_code != self.expected_http_status: + raise HttpStatusError(response) + if not response.headers.get('Content-Type').startswith(headers['Content-Type']): + raise HttpHeaderError(response) + if not response.headers.get('X-Admin-Protocol', 'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'): + raise HttpHeaderError(response) + + return self.decode(response.json()) -- To view, visit https://gerrit.osmocom.org/c/pysim/+/36972?usp=email To unsubscribe, or for help writing mail filters, visit https://gerrit.osmocom.org/settings Gerrit-Project: pysim Gerrit-Branch: master Gerrit-Change-Id: I468041da40a88875e8df15b04d3ad508e06f16f7 Gerrit-Change-Number: 36972 Gerrit-PatchSet: 6 Gerrit-Owner: laforge <lafo...@osmocom.org> Gerrit-Reviewer: Jenkins Builder Gerrit-Reviewer: fixeria <vyanits...@sysmocom.de> Gerrit-Reviewer: laforge <lafo...@osmocom.org> Gerrit-Reviewer: osmith <osm...@sysmocom.de> Gerrit-MessageType: merged