Julien Castets has proposed merging ~jcastets/cloud-init:scaleway-datasource into cloud-init:master.
Requested reviews: cloud-init commiters (cloud-init-dev) For more details, see: https://code.launchpad.net/~jcastets/cloud-init/+git/cloud-init/+merge/325740 Implements Scaleway datasource with user and vendor data. -- Your team cloud-init commiters is requested to review the proposed merge of ~jcastets/cloud-init:scaleway-datasource into cloud-init:master.
diff --git a/cloudinit/sources/DataSourceScaleway.py b/cloudinit/sources/DataSourceScaleway.py new file mode 100644 index 0000000..7ac1b1a --- /dev/null +++ b/cloudinit/sources/DataSourceScaleway.py @@ -0,0 +1,264 @@ +# vi: ts=4 expandtab +# +# Author: Julien Castets <castet...@gmail.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json +import socket +import time + +import requests + +# pylint fails to import the two modules below. +# pylint: disable=E0401 +from requests.packages.urllib3.connection import HTTPConnection +from requests.packages.urllib3.poolmanager import PoolManager + +from cloudinit import log as logging +from cloudinit import sources +from cloudinit import url_helper +from cloudinit import util + + +LOG = logging.getLogger(__name__) + +BUILTIN_DS_CONFIG = { + 'metadata_url': 'http://169.254.42.42/conf?format=json', + 'userdata_url': 'http://169.254.42.42/user_data/cloud-init', + 'vendordata_url': 'http://169.254.42.42/vendor_data/cloud-init' +} + +DEF_MD_RETRIES = 5 +DEF_MD_TIMEOUT = 10 + + +def on_scaleway(user_data_url, retries): + """ + Check if we are on Scaleway. + + The only backward-compatible way to check if you are on a Scaleway instance + is to query the metadata API. For security reasons, the special endpoint + /user_data/ can only be accessed from a privileged TCP source port (ie. + below 1024), otherwise the API returns a HTTP/403. + + In other words, on Scaleway: + + #> curl http://169.254.42.42/user_data + [...] 403 error + + #> curl --local-port 1-1024 http://169.254.42.42/user_data + [...] 200 OK + + This function queries the endpoint /user_data/ and returns True if a + HTTP/403 is returned. + """ + tries = max(int(retries) + 1, 1) + for ntry in range(tries): + try: + code = requests.head(user_data_url).status_code + if code == 403: + return True + + # If we are rate limited or if there is a server error, we might + # not be talking to the Scaleway metadata API and we need to try + # again. + if code != 429 and code < 500: + return False + + # Couldn't query the API. + except (requests.exceptions.ConnectionError, + requests.exceptions.Timeout): + return False + + # Be nice, and wait a bit before retrying. + time.sleep(5) + + return False + + +class SourceAddressAdapter(requests.adapters.HTTPAdapter): + """ + Adapter for requests to choose the local address to bind to. + """ + + def __init__(self, source_address, **kwargs): + self.source_address = source_address + super(SourceAddressAdapter, self).__init__(**kwargs) + + def init_poolmanager(self, connections, maxsize, block=False): + socket_options = HTTPConnection.default_socket_options + [ + (socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + ] + self.poolmanager = PoolManager(num_pools=connections, + maxsize=maxsize, + block=block, + source_address=self.source_address, + socket_options=socket_options) + + +def _get_type_data(typedata_address, timeout, requests_session): + """ + Retrieve user data or vendor data. + + Scaleway userdata/vendordata API returns HTTP/404 if user/vendor data is + not set. + + This function calls `url_helper.readurl` but instead of considering + HTTP/404 as an error that requires a retry, it considers it as empty + user/vendor data. + + Also, be aware the user data/vendor API requires the source port to be + below 1024. If requests raises ConnectionError (EADDRINUSE), the caller + should retry to call this function on an other port. + """ + try: + resp = url_helper.readurl( + typedata_address, + data=None, + timeout=timeout, + # It's the caller's responsability to recall this function in case + # of exception. Don't let url_helper.readurl() retry by itself. + retries=0, + session=requests_session, + # If the error is a HTTP/404 or a ConnectionError, go into raise + # block below. + exception_cb=lambda _, exc: exc.code == 404 or ( + isinstance(exc.cause, requests.exceptions.ConnectionError) and + 'Address already in use' in exc.message + ) + ) + return util.decode_binary(resp.contents) + except url_helper.UrlError as exc: + # Empty user data. + if exc.code == 404: + return None + raise + + +class DataSourceScaleway(sources.DataSource): + + def __init__(self, sys_cfg, distro, paths): + LOG.debug('Init Scaleway') + + sources.DataSource.__init__(self, sys_cfg, distro, paths) + + self.metadata = {} + self.ds_cfg = util.mergemanydict([ + util.get_cfg_by_path(sys_cfg, ["datasource", "Scaleway"], {}), + BUILTIN_DS_CONFIG + ]) + + self.metadata_address = self.ds_cfg['metadata_url'] + self.userdata_address = self.ds_cfg['userdata_url'] + self.vendordata_address = self.ds_cfg['vendordata_url'] + + self.retries = self.ds_cfg.get('retries', DEF_MD_RETRIES) + self.timeout = self.ds_cfg.get('timeout', DEF_MD_TIMEOUT) + + def _get_privileged_type(self, type_): + assert type_ in ('user', 'vendor') + + type_address = self.userdata_address \ + if type_ == 'user' else self.vendordata_address + + # Query user/vendor-data. Try to make a request on the first privileged + # port available. + for port in range(1, max(int(self.retries), 2)): + try: + LOG.debug( + 'Trying to get %s data (bind on port %d)...', type_, port + ) + requests_session = requests.Session() + requests_session.mount( + 'http://', + SourceAddressAdapter(source_address=('0.0.0.0', port)) + ) + data = _get_type_data( + type_address, + timeout=self.timeout, + requests_session=requests_session + ) + LOG.debug('%s-data downloaded', type_) + return data + + except url_helper.UrlError as exc: + # Local port already in use or HTTP/429. + time.sleep(5) + last_exc = exc + continue + + # Max number of retries reached. + raise last_exc + + def _get_metadata(self): + resp = url_helper.readurl(self.metadata_address, + timeout=self.timeout, + retries=self.retries) + + metadata = json.loads(util.decode_binary(resp.contents)) + + metadata['user-data'] = self._get_privileged_type('user') + metadata['vendor-data'] = self._get_privileged_type('vendor') + + return metadata + + def get_data(self): + if on_scaleway(self.ds_cfg['userdata_url'], self.retries) is False: + return False + + metadata = self._get_metadata() + + self.metadata = { + 'id': metadata['id'], + 'hostname': metadata['hostname'], + 'user-data': metadata['user-data'], + 'vendor-data': metadata['vendor-data'], + 'ssh_public_keys': [ + key['key'] for key in metadata['ssh_public_keys'] + ] + } + return True + + @property + def launch_index(self): + return None + + def get_instance_id(self): + return self.metadata['id'] + + def get_public_ssh_keys(self): + return self.metadata['ssh_public_keys'] + + def get_hostname(self, fqdn=False, resolve_ip=False): + return self.metadata['hostname'] + + def get_userdata_raw(self): + return self.metadata['user-data'] + + @property + def availability_zone(self): + return None + + @property + def region(self): + return None + + +datasources = [ + (DataSourceScaleway, (sources.DEP_FILESYSTEM, sources.DEP_NETWORK)), +] + + +def get_datasource_list(depends): + return sources.list_from_depends(depends, datasources) diff --git a/cloudinit/url_helper.py b/cloudinit/url_helper.py index d2b92e6..34458ec 100644 --- a/cloudinit/url_helper.py +++ b/cloudinit/url_helper.py @@ -172,7 +172,8 @@ def _get_ssl_args(url, ssl_details): def readurl(url, data=None, timeout=None, retries=0, sec_between=1, headers=None, headers_cb=None, ssl_details=None, - check_status=True, allow_redirects=True, exception_cb=None): + check_status=True, allow_redirects=True, exception_cb=None, + session=None): url = _cleanurl(url) req_args = { 'url': url, @@ -231,7 +232,10 @@ def readurl(url, data=None, timeout=None, retries=0, sec_between=1, LOG.debug("[%s/%s] open '%s' with %s configuration", i, manual_tries, url, filtered_req_args) - r = requests.request(**req_args) + if session is None: + session = requests.Session() + r = session.request(**req_args) + if check_status: r.raise_for_status() LOG.debug("Read from %s (%s, %sb) after %s attempts", url,
_______________________________________________ Mailing list: https://launchpad.net/~cloud-init-dev Post to : cloud-init-dev@lists.launchpad.net Unsubscribe : https://launchpad.net/~cloud-init-dev More help : https://help.launchpad.net/ListHelp