Julien Castets has proposed merging ~jcastets/cloud-init:scaleway-datasource 
into cloud-init:master.

Requested reviews:
  cloud-init commiters (cloud-init-dev)

For more details, see:
https://code.launchpad.net/~jcastets/cloud-init/+git/cloud-init/+merge/325740

Implements Scaleway datasource with user and vendor data.
-- 
Your team cloud-init commiters is requested to review the proposed merge of 
~jcastets/cloud-init:scaleway-datasource into cloud-init:master.
diff --git a/cloudinit/sources/DataSourceScaleway.py b/cloudinit/sources/DataSourceScaleway.py
new file mode 100644
index 0000000..7ac1b1a
--- /dev/null
+++ b/cloudinit/sources/DataSourceScaleway.py
@@ -0,0 +1,264 @@
+# vi: ts=4 expandtab
+#
+#    Author: Julien Castets <castet...@gmail.com>
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License version 3, as
+#    published by the Free Software Foundation.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import json
+import socket
+import time
+
+import requests
+
+# pylint fails to import the two modules below.
+# pylint: disable=E0401
+from requests.packages.urllib3.connection import HTTPConnection
+from requests.packages.urllib3.poolmanager import PoolManager
+
+from cloudinit import log as logging
+from cloudinit import sources
+from cloudinit import url_helper
+from cloudinit import util
+
+
+LOG = logging.getLogger(__name__)
+
+BUILTIN_DS_CONFIG = {
+    'metadata_url': 'http://169.254.42.42/conf?format=json',
+    'userdata_url': 'http://169.254.42.42/user_data/cloud-init',
+    'vendordata_url': 'http://169.254.42.42/vendor_data/cloud-init'
+}
+
+DEF_MD_RETRIES = 5
+DEF_MD_TIMEOUT = 10
+
+
+def on_scaleway(user_data_url, retries):
+    """
+    Check if we are on Scaleway.
+
+    The only backward-compatible way to check if you are on a Scaleway instance
+    is to query the metadata API. For security reasons, the special endpoint
+    /user_data/ can only be accessed from a privileged TCP source port (ie.
+    below 1024), otherwise the API returns a HTTP/403.
+
+    In other words, on Scaleway:
+
+    #> curl http://169.254.42.42/user_data
+    [...] 403 error
+
+    #> curl --local-port 1-1024 http://169.254.42.42/user_data
+    [...] 200 OK
+
+    This function queries the endpoint /user_data/ and returns True if a
+    HTTP/403 is returned.
+    """
+    tries = max(int(retries) + 1, 1)
+    for ntry in range(tries):
+        try:
+            code = requests.head(user_data_url).status_code
+            if code == 403:
+                return True
+
+            # If we are rate limited or if there is a server error, we might
+            # not be talking to the Scaleway metadata API and we need to try
+            # again.
+            if code != 429 and code < 500:
+                return False
+
+        # Couldn't query the API.
+        except (requests.exceptions.ConnectionError,
+                requests.exceptions.Timeout):
+            return False
+
+        # Be nice, and wait a bit before retrying.
+        time.sleep(5)
+
+    return False
+
+
+class SourceAddressAdapter(requests.adapters.HTTPAdapter):
+    """
+    Adapter for requests to choose the local address to bind to.
+    """
+
+    def __init__(self, source_address, **kwargs):
+        self.source_address = source_address
+        super(SourceAddressAdapter, self).__init__(**kwargs)
+
+    def init_poolmanager(self, connections, maxsize, block=False):
+        socket_options = HTTPConnection.default_socket_options + [
+            (socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+        ]
+        self.poolmanager = PoolManager(num_pools=connections,
+                                       maxsize=maxsize,
+                                       block=block,
+                                       source_address=self.source_address,
+                                       socket_options=socket_options)
+
+
+def _get_type_data(typedata_address, timeout, requests_session):
+    """
+    Retrieve user data or vendor data.
+
+    Scaleway userdata/vendordata API returns HTTP/404 if user/vendor data is
+    not set.
+
+    This function calls `url_helper.readurl` but instead of considering
+    HTTP/404 as an error that requires a retry, it considers it as empty
+    user/vendor data.
+
+    Also, be aware the user data/vendor API requires the source port to be
+    below 1024. If requests raises ConnectionError (EADDRINUSE), the caller
+    should retry to call this function on an other port.
+    """
+    try:
+        resp = url_helper.readurl(
+            typedata_address,
+            data=None,
+            timeout=timeout,
+            # It's the caller's responsability to recall this function in case
+            # of exception. Don't let url_helper.readurl() retry by itself.
+            retries=0,
+            session=requests_session,
+            # If the error is a HTTP/404 or a ConnectionError, go into raise
+            # block below.
+            exception_cb=lambda _, exc: exc.code == 404 or (
+                isinstance(exc.cause, requests.exceptions.ConnectionError) and
+                'Address already in use' in exc.message
+            )
+        )
+        return util.decode_binary(resp.contents)
+    except url_helper.UrlError as exc:
+        # Empty user data.
+        if exc.code == 404:
+            return None
+        raise
+
+
+class DataSourceScaleway(sources.DataSource):
+
+    def __init__(self, sys_cfg, distro, paths):
+        LOG.debug('Init Scaleway')
+
+        sources.DataSource.__init__(self, sys_cfg, distro, paths)
+
+        self.metadata = {}
+        self.ds_cfg = util.mergemanydict([
+            util.get_cfg_by_path(sys_cfg, ["datasource", "Scaleway"], {}),
+            BUILTIN_DS_CONFIG
+        ])
+
+        self.metadata_address = self.ds_cfg['metadata_url']
+        self.userdata_address = self.ds_cfg['userdata_url']
+        self.vendordata_address = self.ds_cfg['vendordata_url']
+
+        self.retries = self.ds_cfg.get('retries', DEF_MD_RETRIES)
+        self.timeout = self.ds_cfg.get('timeout', DEF_MD_TIMEOUT)
+
+    def _get_privileged_type(self, type_):
+        assert type_ in ('user', 'vendor')
+
+        type_address = self.userdata_address \
+            if type_ == 'user' else self.vendordata_address
+
+        # Query user/vendor-data. Try to make a request on the first privileged
+        # port available.
+        for port in range(1, max(int(self.retries), 2)):
+            try:
+                LOG.debug(
+                    'Trying to get %s data (bind on port %d)...', type_, port
+                )
+                requests_session = requests.Session()
+                requests_session.mount(
+                    'http://',
+                    SourceAddressAdapter(source_address=('0.0.0.0', port))
+                )
+                data = _get_type_data(
+                    type_address,
+                    timeout=self.timeout,
+                    requests_session=requests_session
+                )
+                LOG.debug('%s-data downloaded', type_)
+                return data
+
+            except url_helper.UrlError as exc:
+                # Local port already in use or HTTP/429.
+                time.sleep(5)
+                last_exc = exc
+                continue
+
+        # Max number of retries reached.
+        raise last_exc
+
+    def _get_metadata(self):
+        resp = url_helper.readurl(self.metadata_address,
+                                  timeout=self.timeout,
+                                  retries=self.retries)
+
+        metadata = json.loads(util.decode_binary(resp.contents))
+
+        metadata['user-data'] = self._get_privileged_type('user')
+        metadata['vendor-data'] = self._get_privileged_type('vendor')
+
+        return metadata
+
+    def get_data(self):
+        if on_scaleway(self.ds_cfg['userdata_url'], self.retries) is False:
+            return False
+
+        metadata = self._get_metadata()
+
+        self.metadata = {
+            'id': metadata['id'],
+            'hostname': metadata['hostname'],
+            'user-data': metadata['user-data'],
+            'vendor-data': metadata['vendor-data'],
+            'ssh_public_keys': [
+                key['key'] for key in metadata['ssh_public_keys']
+            ]
+        }
+        return True
+
+    @property
+    def launch_index(self):
+        return None
+
+    def get_instance_id(self):
+        return self.metadata['id']
+
+    def get_public_ssh_keys(self):
+        return self.metadata['ssh_public_keys']
+
+    def get_hostname(self, fqdn=False, resolve_ip=False):
+        return self.metadata['hostname']
+
+    def get_userdata_raw(self):
+        return self.metadata['user-data']
+
+    @property
+    def availability_zone(self):
+        return None
+
+    @property
+    def region(self):
+        return None
+
+
+datasources = [
+    (DataSourceScaleway, (sources.DEP_FILESYSTEM, sources.DEP_NETWORK)),
+]
+
+
+def get_datasource_list(depends):
+    return sources.list_from_depends(depends, datasources)
diff --git a/cloudinit/url_helper.py b/cloudinit/url_helper.py
index d2b92e6..34458ec 100644
--- a/cloudinit/url_helper.py
+++ b/cloudinit/url_helper.py
@@ -172,7 +172,8 @@ def _get_ssl_args(url, ssl_details):
 
 def readurl(url, data=None, timeout=None, retries=0, sec_between=1,
             headers=None, headers_cb=None, ssl_details=None,
-            check_status=True, allow_redirects=True, exception_cb=None):
+            check_status=True, allow_redirects=True, exception_cb=None,
+            session=None):
     url = _cleanurl(url)
     req_args = {
         'url': url,
@@ -231,7 +232,10 @@ def readurl(url, data=None, timeout=None, retries=0, sec_between=1,
             LOG.debug("[%s/%s] open '%s' with %s configuration", i,
                       manual_tries, url, filtered_req_args)
 
-            r = requests.request(**req_args)
+            if session is None:
+                session = requests.Session()
+            r = session.request(**req_args)
+
             if check_status:
                 r.raise_for_status()
             LOG.debug("Read from %s (%s, %sb) after %s attempts", url,
_______________________________________________
Mailing list: https://launchpad.net/~cloud-init-dev
Post to     : cloud-init-dev@lists.launchpad.net
Unsubscribe : https://launchpad.net/~cloud-init-dev
More help   : https://help.launchpad.net/ListHelp

Reply via email to