Mforns has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/359442 )

Change subject: [WIP] Modify EL purging script to not use limit/offset
......................................................................

[WIP] Modify EL purging script to not use limit/offset

Bug: T168071
Change-Id: Ic4c1e5a30ce41d8bc5ea3429f716d145118e3e65
---
M modules/role/files/mariadb/eventlogging_cleaner.py
1 file changed, 58 insertions(+), 87 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/puppet 
refs/changes/42/359442/1

diff --git a/modules/role/files/mariadb/eventlogging_cleaner.py 
b/modules/role/files/mariadb/eventlogging_cleaner.py
index 4e536f0..f4fc85e 100644
--- a/modules/role/files/mariadb/eventlogging_cleaner.py
+++ b/modules/role/files/mariadb/eventlogging_cleaner.py
@@ -167,11 +167,12 @@
         self.start = self.relative_ts(newer_than)
         self.end = self.relative_ts(older_than)
         self.batch_size = batch_size
+        self.max_interval_size = timedelta(days=30)
         self.sleep_between_batches = sleep_between_batches
         self.dry_run = dry_run
 
     def relative_ts(self, days):
-        return (self.reference_time - 
timedelta(days=days)).strftime(DATE_FORMAT)
+        return self.reference_time - timedelta(days=days)
 
     def purge(self, table):
         """
@@ -184,8 +185,8 @@
             "LIMIT %(batch_size)s".format(table)
         )
         params = {
-            'start_ts': self.start,
-            'end_ts': self.end,
+            'start_ts': self.start.strftime(DATE_FORMAT),
+            'end_ts': self.end.strftime(DATE_FORMAT),
             'batch_size': self.batch_size,
         }
         result = self.database.execute(command, params, dry_run=self.dry_run)
@@ -196,105 +197,75 @@
             result = self.database.execute(command, params, 
dry_run=self.dry_run)
             time.sleep(self.sleep_between_batches)
 
-    def _get_old_uuids(self, table, offset):
+    def get_interval_factor(self, numrows):
+        """The adapt factor tries to maintain the size of the update interval
+        close to the defined batch size. If the given table has lots of events
+        the update interval should be smaller, and if the table has less events
+        then the interval can be greater, so that we need less update queries.
+        The initial update interval is 1 minute, and then depending on the
+        number of updated rows, the adapt factor will multiply the interval 
size
+        to make it closer to the batch size.
         """
-        Return a list of uuids between self.start and self.end limiting
-        the batch with an offset.
-        """
-        command = (
-            "SELECT uuid from {} WHERE timestamp >= %(start_ts)s "
-            "AND timestamp < %(end_ts)s LIMIT %(batch_size)s OFFSET %(offset)s"
-            .format(table)
+        return min( # the min ensures that the factor is not too big
+            # batch_size / numrows should be 1 if the last update had
+            # a number of rows equal to batch_size. If the update was smaller,
+            # the factor should be proportionally greater, and viceversa.
+            self.batch_size / float(max(numrows, 1)), # max prevents division 
by 0
+            10
         )
-        params = {
-            'start_ts': self.start,
-            'end_ts': self.end,
-            'batch_size': self.batch_size,
-            'offset': offset,
-        }
-        result = self.database.execute(command, params, self.dry_run)
-        if result['rows']:
-            return [x[0] for x in result['rows']]
-        else:
-            return []
+
+    def interval_size_is_ok(self, interval_size, numrows):
+        return (
+            numrows < self.batch_size * 1.2 and (
+                numrows > self.batch_size * 0.8 or
+                interval_size == self.max_interval_size
+            )
+        )
+
+    def get_new_interval_size(self, table, interval_start, interval_size):
+        command_template = (
+            "SELECT count(*) FROM {}"
+            "WHERE timestamp >= {} AND timestamp < {{}}"
+            .format(table, interval_start.strftime(DATE_FORMAT))
+        )
+        interval_end = interval_start + interval_size
+        command = command_template.format(interval_end.strftime(DATE_FORMAT))
+        numrows = self.database.execute(command)['rows'][0][0]
+        while not self.interval_size_is_ok(interval_size, numrows):
+            interval_size = min(
+                interval_size * self.get_interval_factor(numrows),
+                self.max_interval_size
+            )
+            interval_end = interval_start + interval_size
+            command = 
command_template.format(interval_end.strftime(DATE_FORMAT))
+            numrows = self.database.execute(command)['rows'][0][0]
+        return interval_size
 
     def sanitize(self, table):
-        """
-        Set all the fields not in the whitelist (for a given table) to NULL.
-        The schema_prefix is needed since the whitelist contains only 
EventLogging
-        schema/table prefixes.
-        """
-        # Get the table's whitelist prefix to retrieve the list of fields to 
save
-        # from the whitelist
-        table_prefix = table.split('_')[0]
-        # Sanity check
-        if table_prefix not in self.whitelist:
-            raise RuntimeError(
-                "Sanitize has been called for table {}, but its "
-                "prefix {} is not in the whitelist. Aborting as precautionary "
-                "measure since this error condition might indicate a bug in 
the code"
-                .format(table, table_prefix)
-            )
         fields = self.database.get_table_fields(table)
-        # Sanity check
-        for field in self.whitelist[table_prefix]:
-            if field not in fields:
-                raise RuntimeError(
-                    "Sanitize has been called for table {}, but the field {} "
-                    "(belonging to the ones to keep/whitelist) is not part of "
-                    "the table fields. This probably indicates an error in the 
"
-                    "whitelist, please re-check it. Aborting as precautionary "
-                    "measue to avoid unnecessary deletions."
-                    .format(table, field)
-                )
+        # Get the schema name (without revision number)
+        table_prefix = table.split('_')[0]
         fields_to_keep = self.whitelist[table_prefix] + 
list(COMMON_PERSISTENT_FIELDS)
         fields_to_purge = filter(lambda f: f not in fields_to_keep, fields)
-
         values_string = ','.join([field + ' = NULL' for field in 
fields_to_purge])
-        offset = 0
-        uuids_current_batch = self._get_old_uuids(table, offset)
         command_template = (
             "UPDATE {0} "
             "SET {1} "
-            "WHERE uuid IN ({{}})"
+            "WHERE timestamp >= {{}} AND timestamp < {{}}"
         ).format(table, values_string)
-
-        while uuids_current_batch:
-            uuids_no = len(uuids_current_batch)
-            if uuids_no > self.batch_size:
-                log.error("The number of uuids to sanitize {} is bigger "
-                          "than the batch size {}, this condition should not "
-                          "be possible, please review the code/data. "
-                          .format(str(uuids_no), str(self.batch_size)))
-                raise RuntimeError('Sanitization stopped as precautionary 
step.')
-
-            uuids_current_batch_escaped = ["'" + x + "'" for x in 
uuids_current_batch]
+        interval_start = self.start
+        interval_size = timedelta(minutes=1)
+        while interval_start < self.end:
+            interval_size = self.get_new_interval_size(table, interval_start, 
interval_size)
+            interval_end = min(interval_start + interval_size, self.end)
             result = self.database.execute(
-                command_template.format(",".join(uuids_current_batch_escaped)),
+                command_template.format(
+                    interval_start.strftime(DATE_FORMAT),
+                    interval_end.strftime(DATE_FORMAT)
+                ),
                 dry_run=self.dry_run
             )
-            if result['numrows'] < uuids_no:
-                log.warn("The number of uuids to sanitize {} is bigger "
-                         "than the number of updated rows in this batch {}. "
-                         "Data already sanitized?"
-                         .format(uuids_no, result['numrows']))
-            elif result['numrows'] > uuids_no:
-                log.error("The number of uuids to sanitize {} is lower "
-                          "than the number of updated rows in this batch {}. "
-                          "This is definitely an error in the code, please 
review it."
-                          .format(uuids_no, result['numrows']))
-                raise RuntimeError('Sanitization stopped as precautionary 
step.')
-
-            if uuids_no < self.batch_size:
-                # Avoid an extra SQL query to the database if the number of
-                # uuids returned are less than BATCH_SIZE, since this value
-                # means that we have already reached the last batch of uuids
-                # to sanitize.
-                uuids_current_batch = []
-            else:
-                offset += self.batch_size
-                uuids_current_batch = self._get_old_uuids(table, offset)
-
+            interval_start = interval_end
             time.sleep(self.sleep_between_batches)
 
 

-- 
To view, visit https://gerrit.wikimedia.org/r/359442
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic4c1e5a30ce41d8bc5ea3429f716d145118e3e65
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Mforns <mfo...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to