ArielGlenn has submitted this change and it was merged.
Change subject: move readline completion and ignored entry lists out to
separate files
......................................................................
move readline completion and ignored entry lists out to separate files
Change-Id: I1e8d632d6443d32afd9f0c81b62998babecd3014
---
M dataretention/retention/cli.py
A dataretention/retention/completion.py
A dataretention/retention/ignores.py
3 files changed, 590 insertions(+), 521 deletions(-)
Approvals:
ArielGlenn: Verified; Looks good to me, approved
diff --git a/dataretention/retention/cli.py b/dataretention/retention/cli.py
index 4394ac5..4541a25 100644
--- a/dataretention/retention/cli.py
+++ b/dataretention/retention/cli.py
@@ -2,7 +2,6 @@
import sys
import time
import json
-import runpy
import readline
import traceback
@@ -22,486 +21,120 @@
import retention.ruleutils
from retention.userconfretriever import RemoteUserCfRetriever
import retention.cliutils
+from retention.ignores import Ignores
+from retention.completion import Completion
-class CommandLine(object):
- '''
- prompt user at the command line for actions to take on a given
- directory or file, show results
- '''
- # todo: down and up should check you really are (descending,
- # ascending path)
+class CurrentEnv(object):
- def __init__(self, store_filepath, timeout, audit_type, hosts_expr=None):
- self.cdb = RuleStore(store_filepath)
- self.cdb.store_db_init(None)
- self.timeout = timeout
- self.audit_type = audit_type
- self.locations = audit_type + "_locations"
- self.hosts_expr = hosts_expr
-
- self.host = None
- self.today = time.strftime("%Y%m%d", time.gmtime())
- self.basedir = None
- self.current_dir = None
- self.prompt = None
- retention.cliutils.init_readline_hist()
- self.hostlist = None
- self.dirs_problem = None
- self.dirs_skipped = None
- self.current_dir_contents_list = None
- self.current_dir_contents_dict = None
- # this is arbitrary, can tweak it later
- # how many levels down we keep in our list of
- # top-level dirs from which the user can start
- # their interactive session
- self.max_depth_top_level = 3
-
- self.choices = None
- self.choice_default = None
-
- self.filtertype = 'all'
-
- # fixme completely wrong
- self.batchno = 1
-
- self.ignored = None
- self.local_ignores = None
-
- self.perhost_ignores = {}
- self.perhost_rules_from_file = None
- self.get_perhostcf_from_file()
- self.perhost_ignores_from_rules = {}
- self.get_perhost_ignores_from_rules()
-
- def get_perhost_ignores_from_rules(self, hosts=None):
- if hosts is None:
- hosts = self.cdb.store_db_list_all_hosts()
- for host in hosts:
- self.perhost_rules_from_store = retention.ruleutils.get_rules(
- self.cdb, host, Status.text_to_status('good'))
-
- if self.perhost_rules_from_store is not None:
- if host not in self.perhost_ignores_from_rules:
- self.perhost_ignores_from_rules[host] = {}
- self.perhost_ignores_from_rules[host]['dirs'] = {}
- self.perhost_ignores_from_rules[host]['dirs']['/'] = []
- self.perhost_ignores_from_rules[host]['files'] = {}
- self.perhost_ignores_from_rules[host]['files']['/'] = []
-
- if (self.perhost_rules_from_file is not None and
- 'ignored_dirs' in self.perhost_rules_from_file and
- host in self.perhost_rules_from_file['ignored_dirs']):
- for path in
self.perhost_rules_from_file['ignored_dirs'][host]:
- if (path.startswith('/') and
- path not in
self.perhost_ignores_from_rules[host][
- 'dirs']['/']):
- if path[-1] == '/':
- path = path[:-1]
- self.perhost_ignores_from_rules[host][
- 'dirs']['/'].append(path)
- if (self.perhost_rules_from_file is not None and
- 'ignored_files' in self.perhost_rules_from_file and
- host in self.perhost_rules_from_file['ignored_files']):
- for path in
self.perhost_rules_from_file['ignored_files'][host]:
- if (path.startswith('/') and
- path not in self.perhost_ignores_from_rules[
- host]['files']['/']):
-
self.perhost_ignores_from_rules[host]['files']['/'].append(path)
-
- def get_perhostcf_from_file(self):
- if os.path.exists('audit_files_perhost_config.py'):
- try:
- self.perhost_rules_from_file = runpy.run_path(
- 'audit_files_perhost_config.py')['perhostcf']
- except:
- self.perhost_rules_from_file = None
-
- if self.perhost_rules_from_file is not None:
- if 'ignored_dirs' in self.perhost_rules_from_file:
- for host in self.perhost_rules_from_file['ignored_dirs']:
- if host not in self.perhost_ignores:
- self.perhost_ignores[host] = {}
- self.perhost_ignores[host]['dirs'] = {}
- self.perhost_ignores[host]['dirs']['/'] = [
- (lambda path: path[:-1] if path[-1] == '/'
- else path)(p)
- for p in self.perhost_rules_from_file[
- 'ignored_dirs'][host]]
- if 'ignored_files' in self.perhost_rules_from_file:
- for host in self.perhost_rules_from_file['ignored_files']:
- if host not in self.perhost_ignores:
- self.perhost_ignores[host] = {}
- self.perhost_ignores[host]['files'] = {}
- self.perhost_ignores[host]['files']['/'] = (
- self.perhost_rules_from_file['ignored_files'][host])
-
- def host_completion(self, text, state):
- if text == "":
- matches = self.hostlist
- else:
- matches = [h for h in self.hostlist
- if h.startswith(text)]
- if len(matches) > 1 and state == 0:
- for m in matches:
- print m,
- print
-
- try:
- return matches[state]
- except IndexError:
- return None
-
- def prompt_for_host(self):
- '''
- prompt user for host in self.hostlist,
- with tab completion
- '''
- readline.set_completer(self.host_completion)
- while True:
- host_todo = raw_input(
- "Host on which to examine dirs/files (blank to exit): ")
- host_todo = host_todo.strip()
- if host_todo == "":
- return None
- if host_todo in self.hostlist:
- return host_todo
- else:
- print "Please choose one of the following hosts:"
- retention.cliutils.print_columns(self.hostlist, 4)
-
- def dir_completion(self, text, state):
- if self.current_dir is None:
- dirs_problem_to_depth = [retention.cliutils.get_path_prefix(
- d, self.max_depth_top_level) for d in self.dirs_problem]
- dirs_skipped = [s for s in self.dirs_skipped
- if s not in dirs_problem_to_depth]
- relevant_dirs = (sorted(list(set(dirs_problem_to_depth)))
- + sorted(list(set(dirs_skipped))))
- else:
- if self.current_dir_contents_list is None:
- self.get_dir_contents(self.current_dir, self.batchno)
- relevant_dirs = sorted([s for s in self.current_dir_contents_dict
- if
self.current_dir_contents_dict[s]['type'] == 'dir'])
- if text == "":
- matches = relevant_dirs
- else:
- depth = text.count(os.path.sep)
- # how many path elts do we have in the text, show
- # matches for basedir of it plus next elt
- matches = ([d for d in relevant_dirs
- if d.startswith(text) and
- d.count(os.path.sep) == depth])
- try:
- return matches[state]
- except IndexError:
- return None
-
- def dir_entries_completion(self, text, state):
- if not self.current_dir_contents_list:
- self.get_dir_contents(self.current_dir, self.batchno)
- entries = sorted([s for s in self.current_dir_contents_dict
- if (self.current_dir_contents_dict[s]['type'] ==
'file' or
- self.current_dir_contents_dict[s]['type'] ==
'dir')])
- if text == "":
- matches = entries
- else:
- depth = text.count(os.path.sep)
- # how many path elts do we have in the text, show
- # matches for basedir of it plus next elt
- matches = ([d for d in entries
- if d.startswith(text) and
- d.count(os.path.sep) == depth])
- try:
- return matches[state]
- except IndexError:
- return None
-
- def prompt_for_dir(self):
- '''
- prompt user for host in self.hostlist,
- with tab completion
- '''
-
- readline.set_completer(self.dir_completion)
- dir_todo = raw_input("Directory (blank to exit): ")
- dir_todo = dir_todo.strip()
- if dir_todo == "":
- return None
- else:
- return dir_todo
-
- def choices_completion(self, text, state):
- matches = self.choices
- if text == "":
- matches = [self.choice_default]
- try:
- return matches[state]
- except IndexError:
- return None
-
- def do_one_host(self, host, report):
- self.set_host(host)
- if not retention.utils.running_locally(self.host):
- self.get_perhost_ignores_from_rules([host])
-
- if retention.utils.running_locally(self.host):
- self.dirs_problem, self.dirs_skipped =
retention.remotefileauditor.get_dirs_toexamine(report)
- else:
- if host not in report:
- self.dirs_problem = None
- self.dirs_skipped = None
- else:
- self.dirs_problem, self.dirs_skipped =
retention.remotefileauditor.get_dirs_toexamine(report[host])
- if self.dirs_problem is None and self.dirs_skipped is None:
- print "No report available from this host"
- elif len(self.dirs_problem) == 0 and len(self.dirs_skipped) == 0:
- print "No problem dirs and no skipped dirs on this host"
- else:
- dirs_problem_to_depth = [retention.cliutils.get_path_prefix(
- d, self.max_depth_top_level)
- for d in self.dirs_problem]
- dirs_skipped = [s for s in self.dirs_skipped
- if s not in dirs_problem_to_depth]
- relevant_dirs = (sorted(list(set(dirs_problem_to_depth)))
- + sorted(list(set(dirs_skipped))))
- while True:
- dir_todo = self.prompt_for_dir()
- if dir_todo is None:
- print "Done with this host"
- break
- elif dir_todo not in relevant_dirs:
- print "Please choose one of the following directories:"
- # fixme another arbitrary setting
- retention.cliutils.print_columns(relevant_dirs, 5)
- else:
- self.basedir = None
- self.current_dir = None
- self.do_one_directory(dir_todo)
-
- def run(self, report, ignored):
- '''
- call with full report output (not summary) across
- hosts, this will permit the user to examine
- directories and files of specified hosts and
- add/update rules for those dirs and files
- '''
- self.ignored = ignored
- if retention.utils.running_locally(self.hosts_expr):
- host_todo = "localhost"
- self.do_one_host(host_todo, report)
- return
-
- self.hostlist = report.keys()
- while True:
- host_todo = self.prompt_for_host()
- if host_todo is None:
- print "exiting at user request"
- break
- else:
- local_ign = RemoteUserCfRetriever(host_todo, self.timeout,
self.audit_type)
- self.local_ignores = local_ign.run(True)
- local_ignored_dirs, local_ignored_files =
LocalHomesAuditor.process_local_ignores(
- self.local_ignores, self.ignored)
- self.do_one_host(host_todo, report)
-
- def set_host(self, host):
+ def __init__(self, host=None, hostlist=None, path=None, problems=None,
skipped=None):
self.host = host
+ self.hostlist = hostlist
+ self.cwdir = path
+ self.problem_dirs = problems
+ self.skipped_dirs = skipped
- def do_one_directory(self, path):
+ def clear(self):
+ self.host = None
+ self.hostlist = None
+ self.cwdir = None
+ self.problem_dirs = None
+ self.skipped_dirs = None
+
+ def set_hosts(self, hostlist):
+ self.hostlist = hostlist
+
+ def set_reported_dirs(self, problems, skipped):
+ self.problem_dirs = problems
+ self.skipped_dirs = skipped
+
+
+class CurrentDirContents(object):
+ '''
+ keep track of current directory contents
+ '''
+ def __init__(self, timeout):
+ self.timeout = timeout
+ self.entries = None
+ self.entries_dict = None
+ self.host = None
+ self.path = None
+
+ def get(self, host, path, batchno, force=False):
'''
- given a list which contains absolute paths for the
- subdirectories / files of a given directory, (we don't
- go more than one level down, it's likely to be too much),
- ask the user what status to give this directory, and
- show the user information for each contained dir/file if
- desired, as well as info about the directory
+ via salt get the directory contents for the first N = 1000
+ entries, unsorted.
+ if the contents for this host and path have already been
+ retrieved and not yet tossed/replaced, don't get them
+ again unless 'force' is True; you may want this in case
+ you expect the directory contents to have been updated
+ since the last invocation
'''
- while True:
- todo = self.get_do_command(path)
- if todo is None:
- break
- def get_do_command(self, path):
- command = self.show_menu(path, 'top')
- return self.do_command(command, 'top', path)
-
- def show_menu(self, path, level):
- if level == 'top':
- self.choices = ['S', 'E', 'I', 'F', 'R', 'Q']
- self.choice_default = 'S'
- readline.set_completer(self.choices_completion)
- command = raw_input("S(set status)/E(examine directory)/"
- "Filter directory listings/"
- "I(ignore)/R(manage rules)/Q(quit menu) [S]: ")
- command = command.strip()
- if command == "":
- command = self.choice_default
- elif level == 'status':
- self.choices = Status.STATUSES + ['Q']
- self.choice_default = Status.text_to_status('good')
- readline.set_completer(self.choices_completion)
- statuses_text = Status.get_statuses_prompt(", ")
- command = raw_input(statuses_text + ", Q(quit status menu) [%s]: "
- % self.choice_default)
- command = command.strip()
- if command == "":
- command = self.choice_default
- elif command == 'Q' or command == 'q':
- level = 'top'
- elif level == 'examine':
- self.choices = ['D', 'U', 'E', 'F', 'C', 'R', 'M', 'Q']
- self.choice_default = 'E'
- readline.set_completer(self.choices_completion)
- command = raw_input("D(down a level)/U(up a level)/E(show
entries)/"
- "C(show contents of file)/R(show rules)/"
- "F(filter directory listings/"
- "M(mark file(s))/Q(quit examine menu) [E]: ")
- command = command.strip()
- if command == "":
- command = self.choice_default
- elif command == 'Q' or command == 'q':
- level = 'top'
- elif level == 'rule':
- self.choices = ['S', 'C', 'A', 'R', 'E', 'I', 'Q']
- self.choice_default = 'D'
- readline.set_completer(self.choices_completion)
- command = raw_input("S(show all rules of type)/D(show rules
covering dir)/"
- "C(show rules covering dir contents)/"
- "A(add rule to rules store)/"
- "R(remove rule from rules store/"
- "E(export rules from store to file)/"
- "I(import rules from file to store)/Q(quit
rule menu) [D]: ")
- command = command.strip()
- if command == "":
- command = self.choice_default
- elif command == 'Q' or command == 'q':
- level = 'top'
- else:
- command = None
- return command
-
- def get_dir_contents(self, path, batchno):
- # via salt get the directory contents for the first N = 1000
- # entries, unsorted.
+ if (host is not None and path is not None and
+ host == self.host and path == self.path and
+ (not force or (self.entries is not None))):
+ return
# fixme batchno? batchno should increment too
# for now more than 1000 entries in a dir = we silently toss them
- direxamin = RemoteDirExaminer(path, self.host, batchno, 1000,
self.timeout, prettyprint=False)
+ direxamin = RemoteDirExaminer(path, host, batchno, 1000, self.timeout,
prettyprint=False)
contents = direxamin.run(True)
- if contents is not None:
- contents = contents.split("\n")
-
- self.current_dir_contents_list = []
- self.current_dir_contents_dict = {}
-
if contents is None:
return
+
+ contents = contents.split("\n")
+ self.host = host
+ self.path = path
+
+ self.entries = []
+ self.entries_dict = {}
for item in contents:
try:
result = json.loads(item, object_hook=JsonHelper.decode_dict)
- self.current_dir_contents_list.append(result)
- self.current_dir_contents_dict[result['path']] = result
+ self.entries.append(result)
+ self.entries_dict[result['path']] = result
except:
print "WARNING: problem getting dir contents, retrieved", item
# exc_type, exc_value, exc_traceback = sys.exc_info()
# sys.stderr.write(repr(traceback.format_exception(
# exc_type, exc_value, exc_traceback)))
- def get_file_contents(self, path):
- # get 20 lines and hope that's enough for the user to evaluate
- # fixme the number of lines should be configurable
- fileexamin = RemoteFileExaminer(path, self.host, 20, self.timeout,
quiet=True)
- contents = fileexamin.run()
- return contents
- def get_basedir_from_path(self, path):
- for location in Config.cf[self.locations]:
- if path == location or path.startswith(location + os.path.sep):
- return location
- # fixme is this really the right fallback? check it
- return '/'
-
- def entry_is_not_ignored(self, path, entrytype):
- basedir = self.get_basedir_from_path(path)
- if self.audit_type == 'logs' and entrytype == 'file':
- path = LocalLogsAuditor.normalize(path)
-
- if entrytype == 'file':
- if retention.fileutils.file_is_ignored(path, basedir,
self.ignored):
- return False
-
- # check perhost file
- if self.host in self.perhost_ignores:
- if retention.fileutils.file_is_ignored(
- path, basedir,
- self.perhost_ignores[self.host]):
- return False
-
- # check perhost rules
- if self.host in self.perhost_ignores_from_rules:
- if retention.fileutils.file_is_ignored(
- path, basedir,
- self.perhost_ignores_from_rules[self.host]):
- return False
-
- elif entrytype == 'dir':
- if retention.fileutils.dir_is_ignored(path, self.ignored):
- return False
-
- # check perhost file
- if self.host in self.perhost_ignores:
- if retention.fileutils.dir_is_ignored(
- path, self.perhost_ignores[self.host]):
- return False
-
- # check perhost rules
- if self.host in self.perhost_ignores_from_rules:
- if retention.fileutils.dir_is_ignored(
- path, self.perhost_ignores_from_rules[self.host]):
- return False
- else:
- # unknown type, I guess we skip it then
- return False
-
- return True
-
- def show_dir_contents(self, path, batchno):
- self.get_dir_contents(path, batchno)
+ def show(self, host, path, batchno, filtertype, check_not_ignored,
force=False):
+ self.get(host, path, batchno, force)
# fixme this 50 is pretty arbitrary oh well
justify = 50
- keys = self.current_dir_contents_dict.keys()
- if self.filtertype == 'file':
+ keys = self.entries_dict.keys()
+ if filtertype == 'file':
items = (sorted(
item for item in keys
- if self.current_dir_contents_dict[item]['type'] == 'file'))
- elif self.filtertype == 'dir':
+ if self.entries_dict[item]['type'] == 'file'))
+ elif filtertype == 'dir':
items = (sorted(
item for item in keys
- if self.current_dir_contents_dict[item]['type'] == 'dir'))
- elif self.filtertype == 'all':
+ if self.entries_dict[item]['type'] == 'dir'))
+ elif filtertype == 'all':
items = sorted(
item for item in keys
- if self.current_dir_contents_dict[item]['type'] == 'dir')
+ if self.entries_dict[item]['type'] == 'dir')
items = items + sorted(
item for item in keys
- if self.current_dir_contents_dict[item]['type'] == 'file')
- elif self.filtertype == 'check':
+ if self.entries_dict[item]['type'] == 'file')
+ elif filtertype == 'check':
items = sorted(
item for item in keys
- if self.current_dir_contents_dict[item]['type'] == 'dir'
- and self.entry_is_not_ignored(
- self.current_dir_contents_dict[item]['path'],
- self.current_dir_contents_dict[item]['type']))
+ if self.entries_dict[item]['type'] == 'dir'
+ and check_not_ignored(self.entries_dict[item]['path'],
+ self.entries_dict[item]['type']))
items = items + sorted(
item for item in keys
- if self.current_dir_contents_dict[item]['type'] == 'file'
- and self.entry_is_not_ignored(
- self.current_dir_contents_dict[item]['path'],
- self.current_dir_contents_dict[item]['type']))
+ if self.entries_dict[item]['type'] == 'file'
+ and check_not_ignored(self.entries_dict[item]['path'],
+ self.entries_dict[item]['type']))
page = 1
num_per_page = 50 # another arbitrary value
@@ -523,7 +156,7 @@
continue
try:
result = FileInfo.format_pretty_output_from_dict(
- self.current_dir_contents_dict[item],
path_justify=justify)
+ self.entries_dict[item], path_justify=justify)
except:
print "item is", item
exc_type, exc_value, exc_traceback = sys.exc_info()
@@ -544,37 +177,262 @@
else:
num_to_show = num_per_page
+ def clear(self):
+ self.entries = None
+ self.entries_dict = None
+ self.host = None
+ self.path = None
+
+
+class CommandLine(object):
+ '''
+ prompt user at the command line for actions to take on a given
+ directory or file, show results
+ '''
+ # todo: down and up should check you really are (descending,
+ # ascending path)
+
+ def __init__(self, store_filepath, timeout, audit_type, hosts_expr=None):
+ self.cdb = RuleStore(store_filepath)
+ self.cdb.store_db_init(None)
+ self.timeout = timeout
+ self.audit_type = audit_type
+ self.locations = audit_type + "_locations"
+ self.hosts_expr = hosts_expr
+
+ self.host = None
+ self.today = time.strftime("%Y%m%d", time.gmtime())
+ self.basedir = None
+ self.prompt = None
+ retention.cliutils.init_readline_hist()
+ # this is arbitrary, can tweak it later
+ # how many levels down we keep in our list of
+ # top-level dirs from which the user can start
+ # their interactive session
+ self.max_depth_top_level = 3
+
+ self.filtertype = 'all'
+
+ # fixme completely wrong
+ self.batchno = 1
+
+ self.ignored = None
+ self.local_ignores = None
+
+ self.ignores = Ignores(self.cdb)
+ self.ignores.get_perhost_from_rules()
+ self.dircontents = CurrentDirContents(self.timeout)
+ self.cenv = CurrentEnv()
+ self.cmpl = Completion(self.dircontents, self.cenv,
self.max_depth_top_level)
+
+ def do_one_host(self, host, report):
+ self.set_host(host)
+ self.ignores.get_perhost_from_rules([host])
+
+ if host not in report:
+ dirs_problem = None
+ dirs_skipped = None
+ else:
+ dirs_problem, dirs_skipped =
retention.remotefileauditor.get_dirs_toexamine(report[host])
+ self.cenv.set_reported_dirs(dirs_problem, dirs_skipped)
+ if self.cenv.problem_dirs is None and self.cenv.skipped_dirs is None:
+ print "No report available from this host"
+ elif len(self.cenv.problem_dirs) == 0 and len(self.cenv.skipped_dirs)
== 0:
+ print "No problem dirs and no skipped dirs on this host"
+ else:
+ dirs_problem_to_depth = [retention.cliutils.get_path_prefix(
+ d, self.max_depth_top_level)
+ for d in dirs_problem]
+ dirs_skipped = [s for s in dirs_skipped
+ if s not in dirs_problem_to_depth]
+ relevant_dirs = (sorted(list(set(dirs_problem_to_depth)))
+ + sorted(list(set(dirs_skipped))))
+ while True:
+ dir_todo = self.cmpl.prompt_for_dir()
+ if dir_todo is None:
+ print "Done with this host"
+ break
+ elif dir_todo not in relevant_dirs:
+ print "Please choose one of the following directories:"
+ # fixme another arbitrary setting
+ retention.cliutils.print_columns(relevant_dirs, 5)
+ else:
+ self.basedir = None
+ self.cenv.cwdir = None
+ self.do_one_directory(dir_todo)
+
+ def run(self, report, ignored):
+ '''
+ call with full report output (not summary) across
+ hosts, this will permit the user to examine
+ directories and files of specified hosts and
+ add/update rules for those dirs and files
+ '''
+ self.ignored = ignored
+
+ self.cenv.set_hosts(report.keys())
+ while True:
+ host_todo = self.cmpl.prompt_for_host()
+ if host_todo is None:
+ print "exiting at user request"
+ break
+ else:
+ local_ign = RemoteUserCfRetriever(host_todo, self.timeout,
self.audit_type)
+ self.local_ignores = local_ign.run(True)
+ local_ignored_dirs, local_ignored_files =
LocalHomesAuditor.process_local_ignores(
+ self.local_ignores, self.ignored)
+ self.do_one_host(host_todo, report)
+
+ def set_host(self, host):
+ self.cenv.host = host
+
+ def do_one_directory(self, path):
+ '''
+ given a list which contains absolute paths for the
+ subdirectories / files of a given directory, (we don't
+ go more than one level down, it's likely to be too much),
+ ask the user what status to give this directory, and
+ show the user information for each contained dir/file if
+ desired, as well as info about the directory
+ '''
+ while True:
+ todo = self.get_do_command(path)
+ if todo is None:
+ break
+
+ def get_do_command(self, path):
+ command = self.show_menu(path, 'top')
+ return self.do_command(command, 'top', path)
+
+ def get_menu_entry(self, choices, default, text):
+ self.cmpl.set_choices_completion(choices, default)
+ command = raw_input(text + " [%s]: " % default)
+ command = command.strip()
+ if command == "":
+ command = default
+ return command
+
+ def show_menu(self, path, level):
+ if level == 'top':
+ text = ("S(set status)/E(examine directory)/"
+ "Filter directory listings/"
+ "I(ignore)/R(manage rules)/Q(quit menu)")
+ command = self.get_menu_entry(['S', 'E', 'I', 'F', 'R', 'Q'], 'S',
text)
+ elif level == 'status':
+ text = Status.get_statuses_prompt(", ") + ", Q(quit status menu)"
+ command = self.get_menu_entry(Status.STATUSES + ['Q'], text,
Status.text_to_status('good'))
+ if command == 'Q' or command == 'q':
+ level = 'top'
+ elif level == 'examine':
+ text = ("D(down a level)/U(up a level)/E(show entries)/"
+ "C(show contents of file)/R(show rules)/"
+ "F(filter directory listings/"
+ "M(mark file(s))/Q(quit examine menu)")
+ command = self.get_menu_entry(['D', 'U', 'E', 'F', 'C', 'R', 'M',
'Q'], 'E', text)
+ if command == 'Q' or command == 'q':
+ level = 'top'
+ elif level == 'rule':
+ text = ("S(show all rules of type)/D(show rules covering dir)/"
+ "C(show rules covering dir contents)/"
+ "A(add rule to rules store)/"
+ "R(remove rule from rules store/"
+ "E(export rules from store to file)/"
+ "I(import rules from file to store)/Q(quit rule menu)")
+ command = self.get_menu_entry(['S', 'C', 'A', 'R', 'E', 'I', 'Q'],
'D', text)
+ if command == 'Q' or command == 'q':
+ level = 'top'
+ else:
+ command = None
+ return command
+
+ def get_file_contents(self, path):
+ # get 20 lines and hope that's enough for the user to evaluate
+ # fixme the number of lines should be configurable
+ fileexamin = RemoteFileExaminer(path, self.cenv.host, 20,
self.timeout, quiet=True)
+ contents = fileexamin.run()
+ return contents
+
+ def get_basedir_from_path(self, path):
+ for location in Config.cf[self.locations]:
+ if path == location or path.startswith(location + os.path.sep):
+ return location
+ # fixme is this really the right fallback? check it
+ return '/'
+
+ def entry_is_not_ignored(self, path, entrytype):
+ basedir = self.get_basedir_from_path(path)
+ if self.audit_type == 'logs' and entrytype == 'file':
+ path = LocalLogsAuditor.normalize(path)
+
+ if entrytype == 'file':
+ if retention.fileutils.file_is_ignored(path, basedir,
self.ignored):
+ return False
+
+ # check perhost file
+ if self.cenv.host in self.ignores.perhost_ignores:
+ if retention.fileutils.file_is_ignored(
+ path, basedir,
+ self.ignores.perhost_ignores[self.cenv.host]):
+ return False
+
+ # check perhost rules
+ if self.cenv.host in self.ignores.perhost_ignores_from_rules:
+ if retention.fileutils.file_is_ignored(
+ path, basedir,
+
self.ignores.perhost_ignores_from_rules[self.cenv.host]):
+ return False
+
+ elif entrytype == 'dir':
+ if retention.fileutils.dir_is_ignored(path, self.ignored):
+ return False
+
+ # check perhost file
+ if self.cenv.host in self.ignores.perhost_ignores:
+ if retention.fileutils.dir_is_ignored(
+ path, self.ignores.perhost_ignores[self.cenv.host]):
+ return False
+
+ # check perhost rules
+ if self.cenv.host in self.ignores.perhost_ignores_from_rules:
+ if retention.fileutils.dir_is_ignored(
+ path,
self.ignores.perhost_ignores_from_rules[self.cenv.host]):
+ return False
+ else:
+ # unknown type, I guess we skip it then
+ return False
+
+ return True
+
# fixme use this (also make it have first + last, more helpful prolly)
def set_prompt(self):
- if self.current_dir is None:
+ if self.cenv.cwdir is None:
self.prompt = "> "
- elif len(self.current_dir) < 10:
- self.prompt = self.current_dir + ">"
+ elif len(self.cenv.cwdir) < 10:
+ self.prompt = self.cenv.cwdir + ">"
else:
- self.prompt = "..." + self.current_dir[-7:] + ">"
+ self.prompt = "..." + self.cenv.cwdir[-7:] + ">"
def get_entries_from_wildcard(self, file_expr):
'''
- get entries from current_dir that match the
+ get entries from cwdir that match the
expression
'''
# fixme that dang batchno, what a bad idea it was
- if self.current_dir_contents_list is None:
- self.get_dir_contents(self.current_dir, 1)
+ self.dircontents.get(self.cenv.host, self.cenv.cwdir, 1)
# one wildcard only, them's the breaks
if '*' in file_expr:
start, end = file_expr.split('*', 1)
- return [c for c in self.current_dir_contents_dict
+ return [c for c in self.dircontents.entries_dict
if (c.startswith(start) and
c.endswith(end) and
len(c) >= len(start) + len(end))]
- elif file_expr in self.current_dir_contents_dict:
+ elif file_expr in self.dircontents.entries_dict:
return [file_expr]
else:
return []
def do_mark(self):
- readline.set_completer(self.dir_entries_completion)
+ readline.set_completer(self.cmpl.dir_entries_completion)
file_expr = raw_input("file or dirname expression (empty to quit): ")
file_expr = file_expr.strip()
if file_expr == '':
@@ -585,18 +443,17 @@
entries_todo = self.get_entries_from_wildcard(file_expr)
else:
entries_todo = [file_expr]
- if not self.current_dir_contents_list:
- self.get_dir_contents(self.current_dir, self.batchno)
- if not self.current_dir_contents_list:
- print 'failed to get directory contents for', self.current_dir
+ self.dircontents.get(self.cenv.host, self.cenv.cwdir, self.batchno)
+ if not self.dircontents.entries:
+ print 'failed to get directory contents for', self.cenv.cwdir
print 'marking dirs/files regardless'
for entry in entries_todo:
- if entry not in self.current_dir_contents_dict:
+ if entry not in self.dircontents.entries_dict:
print 'skipping %s, not in current dir listing' % entry
- print self.current_dir_contents_dict
+ print self.dircontents.entries_dict
continue
filetype = retention.ruleutils.entrytype_to_text(
- self.current_dir_contents_dict[entry]['type'])
+ self.dircontents.entries_dict[entry]['type'])
if filetype == 'link':
print 'No need to mark', file_expr, 'links are always skipped'
continue
@@ -604,28 +461,27 @@
print 'Not a dir or regular file, no need to mark, skipping'
continue
status = Status.text_to_status('good')
- retention.ruleutils.do_add_rule(self.cdb, file_expr, filetype,
status, self.host)
+ retention.ruleutils.do_add_rule(self.cdb, file_expr, filetype,
status, self.cenv.host)
return True
def do_rule(self, command):
if command == 'A' or command == 'a':
# fixme need different completer here I think, that
- # completes relative to self.current_dir
+ # completes relative to self.cwdir
readline.set_completer(None)
path = raw_input("path or wildcard expr in rule (empty to quit): ")
path = path.strip()
if path == '':
return True
- self.choices = Status.STATUSES + ['Q']
- self.choice_default = Status.text_to_status('good')
- readline.set_completer(self.choices_completion)
+ default = Status.text_to_status('good')
+ self.cmpl.set_choices_completion(Status.STATUSES + ['Q'], default)
while True:
statuses_text = Status.get_statuses_prompt(", ")
status = raw_input(statuses_text + " Q(quit)) [%s]: " %
- self.choice_default)
+ default)
status = status.strip()
if status == "":
- status = self.choice_default
+ status = default
if status[0].upper() in Status.STATUSES:
status = status[0].upper()
break
@@ -639,29 +495,28 @@
# in the last component... someday
if path[0] != os.path.sep:
- path = os.path.join(self.current_dir, path)
+ path = os.path.join(self.cenv.cwdir, path)
if path[-1] == os.path.sep:
path = path[:-1]
filetype = retention.ruleutils.text_to_entrytype('dir')
else:
filetype = retention.ruleutils.text_to_entrytype('file')
- retention.ruleutils.do_add_rule(self.cdb, path, filetype, status,
self.host)
+ retention.ruleutils.do_add_rule(self.cdb, path, filetype, status,
self.cenv.host)
# update the ignores list since we have a new rule
- self.perhost_ignores_from_rules = {}
- self.get_perhost_ignores_from_rules([self.host])
+ self.ignores.perhost_ignores_from_rules = {}
+ self.ignores.get_perhost_from_rules([self.cenv.host])
return True
elif command == 'S' or command == 's':
- self.choices = ['A'] + Status.STATUSES + ['Q']
- self.choice_default = Status.text_to_status('problem')
- readline.set_completer(self.choices_completion)
+ default = Status.text_to_status('problem')
+ self.cmpl.set_choices_completion(['A'] + Status.STATUSES + ['Q'],
default)
while True:
statuses_text = Status.get_statuses_prompt(", ")
status = raw_input("status type A(all), " + statuses_text +
- ", Q(quit)) [%s]: " % self.choice_default)
+ ", Q(quit)) [%s]: " % default)
status = status.strip()
if status == "":
- status = self.choice_default
+ status = default
if status == 'q' or status == 'Q':
return None
@@ -675,40 +530,38 @@
if prefix == "":
prefix = "/"
if status == 'a' or status == 'A':
- retention.ruleutils.show_rules(self.cdb, self.host,
prefix=prefix)
+ retention.ruleutils.show_rules(self.cdb, self.cenv.host,
prefix=prefix)
return True
elif status[0].upper() in Status.STATUSES:
- retention.ruleutils.show_rules(self.cdb, self.host,
status[0].upper(),
+ retention.ruleutils.show_rules(self.cdb, self.cenv.host,
status[0].upper(),
prefix=prefix)
return True
elif command == 'D' or command == 'd':
- if not self.current_dir_contents_list:
- self.get_dir_contents(self.current_dir, self.batchno)
- retention.ruleutils.get_rules_for_path(self.cdb, self.current_dir,
self.host)
+ self.dircontents.get(self.cenv.host, self.cenv.cwdir, self.batchno)
+ retention.ruleutils.get_rules_for_path(self.cdb, self.cenv.cwdir,
self.cenv.host)
return True
elif command == 'C' or command == 'c':
- if not self.current_dir_contents_list:
- self.get_dir_contents(self.current_dir, self.batchno)
- retention.ruleutils.get_rules_for_entries(self.cdb,
self.current_dir,
-
self.current_dir_contents_dict,
- self.host)
+ self.dircontents.get(self.cenv.host, self.cenv.cwdir, self.batchno)
+ retention.ruleutils.get_rules_for_entries(self.cdb,
self.cenv.cwdir,
+
self.dircontents.entries_dict,
+ self.cenv.host)
return True
elif command == 'R' or command == 'r':
# fixme need different completer here I think, that
- # completes relative to self.current_dir
+ # completes relative to self.cwdir
readline.set_completer(None)
path = raw_input("path or wildcard expr in rule (empty to quit): ")
path = path.strip()
if path == '':
return True
elif path[0] != os.path.sep:
- path = os.path.join(self.current_dir, path)
+ path = os.path.join(self.cenv.cwdir, path)
if path[-1] == os.path.sep:
path = path[:-1]
- retention.ruleutils.do_remove_rule(self.cdb, path, self.host)
+ retention.ruleutils.do_remove_rule(self.cdb, path, self.cenv.host)
# update the ignores list since we removed a rule
- self.perhost_ignores_from_rules = {}
- self.get_perhost_ignores_from_rules([self.host])
+ self.ignores.perhost_ignores_from_rules = {}
+ self.ignores.get_perhost_from_rules([self.cenv.host])
return True
elif command == 'I' or command == 'i':
readline.set_completer(None)
@@ -719,7 +572,7 @@
if not retention.cliutils.check_rules_path(rules_path):
print "bad rules file path specified, aborting"
else:
- retention.ruleutils.import_rules(self.cdb, rules_path,
self.host)
+ retention.ruleutils.import_rules(self.cdb, rules_path,
self.cenv.host)
return True
elif command == 'E' or command == 'e':
readline.set_completer(None)
@@ -730,7 +583,7 @@
if not retention.cliutils.check_rules_path(rules_path):
print "bad rules file path specified, aborting"
else:
- retention.ruleutils.export_rules(self.cdb, rules_path,
self.host)
+ retention.ruleutils.export_rules(self.cdb, rules_path,
self.cenv.host)
return True
elif command == 'Q' or command == 'q':
print "quitting this level"
@@ -747,7 +600,7 @@
if filename == '':
return
if filename[0] != os.path.sep:
- filename = os.path.join(self.current_dir, filename)
+ filename = os.path.join(self.cenv.cwdir, filename)
contents = self.get_file_contents(filename)
if contents is not None:
print contents
@@ -755,9 +608,8 @@
print "failed to get contents of file"
def do_filter(self):
- self.choices = ['A', 'D', 'F', 'C', 'Q']
- self.choice_default = 'C'
- readline.set_completer(self.choices_completion)
+ default = 'C'
+ self.cmpl.set_choices_completion(['A', 'D', 'F', 'C', 'Q'], default)
while True:
filtertype = raw_input("filter A(all), D(directories only),"
" F(files only),"
@@ -765,7 +617,7 @@
" Q(quit)) [?]: ")
filtertype = filtertype.strip()
if filtertype == "":
- filtertype = self.choice_default
+ filtertype = default
if filtertype == 'a' or filtertype == 'A':
self.filtertype = 'all'
return True
@@ -788,7 +640,7 @@
if command == 'D' or command == 'd':
while True:
# prompt user for dir to descend
- readline.set_completer(self.dir_completion)
+ readline.set_completer(self.cmpl.dir_completion)
directory = raw_input("directory name (empty to quit): ")
directory = directory.strip()
if directory == '':
@@ -796,30 +648,28 @@
if directory[-1] == os.path.sep:
directory = directory[:-1]
if (directory[0] == '/' and
- not directory.startswith(self.current_dir +
+ not directory.startswith(self.cenv.cwdir +
os.path.sep)):
print 'New directory is not a subdirectory of',
- print self.current_dir, "skipping"
+ print self.cenv.cwdir, "skipping"
else:
- self.current_dir = os.path.join(self.current_dir,
- directory)
- self.current_dir_contents_list = None
- self.current_dir_contents_dict = None
+ self.cenv.cwdir = os.path.join(self.cenv.cwdir,
+ directory)
+ self.dircontents.clear()
self.set_prompt()
- print 'Now at', self.current_dir
+ print 'Now at', self.cenv.cwdir
return True
elif command == 'U' or command == 'u':
- if self.current_dir != self.basedir:
- self.current_dir = os.path.dirname(self.current_dir)
- self.current_dir_contents_list = None
- self.current_dir_contents_dict = None
+ if self.cenv.cwdir != self.basedir:
+ self.cenv.cwdir = os.path.dirname(self.cenv.cwdir)
+ self.dircontents.clear()
self.set_prompt()
- print 'Now at', self.current_dir
+ print 'Now at', self.cenv.cwdir
else:
- print 'Already at top', self.current_dir
+ print 'Already at top', self.cenv.cwdir
return True
elif command == 'E' or command == 'e':
- self.show_dir_contents(self.current_dir, 1)
+ self.dircontents.show(self.cenv.host, self.cenv.cwdir, 1,
self.filtertype, self.entry_is_not_ignored)
return True
elif command == 'C' or command == 'c':
self.do_file_contents()
@@ -830,8 +680,8 @@
elif command == 'R' or command == 'r':
continuing = True
while continuing:
- command = self.show_menu(self.current_dir, 'rule')
- continuing = self.do_command(command, 'rule', self.current_dir)
+ command = self.show_menu(self.cenv.cwdir, 'rule')
+ continuing = self.do_command(command, 'rule', self.cenv.cwdir)
return True
elif command == 'M' or command == 'm':
return self.do_mark()
@@ -850,14 +700,14 @@
continuing = self.do_command(command, 'status', dir_path)
return True
elif command == 'E' or command == 'e':
- self.show_dir_contents(self.current_dir, 1)
+ self.dircontents.show(self.cenv.host, self.cenv.cwdir, 1,
self.filtertype, self.entry_is_not_ignored)
continuing = True
while continuing:
# fixme this should let the user page through batches,
# not use '1' every time
- command = self.show_menu(self.current_dir, 'examine')
+ command = self.show_menu(self.cenv.cwdir, 'examine')
continuing = self.do_command(command, 'examine',
- self.current_dir)
+ self.cenv.cwdir)
return True
elif command == 'F' or command == 'f':
self.do_filter()
@@ -868,8 +718,8 @@
elif command == 'R' or command == 'r':
continuing = True
while continuing:
- command = self.show_menu(self.current_dir, 'rule')
- continuing = self.do_command(command, 'rule', self.current_dir)
+ command = self.show_menu(self.cenv.cwdir, 'rule')
+ continuing = self.do_command(command, 'rule', self.cenv.cwdir)
return True
elif command == 'Q' or command == 'q':
return None
@@ -880,8 +730,8 @@
def do_command(self, command, level, dir_path):
if self.basedir is None:
self.basedir = dir_path
- if self.current_dir is None:
- self.current_dir = dir_path
+ if self.cenv.cwdir is None:
+ self.cenv.cwdir = dir_path
if command is None:
return
@@ -894,7 +744,7 @@
# type is dir every time
retention.ruleutils.do_add_rule(self.cdb, dir_path,
retention.ruleutils.text_to_entrytype('dir'),
- command, self.host)
+ command, self.cenv.host)
return None
elif command == 'Q' or command == 'q':
return None
diff --git a/dataretention/retention/completion.py
b/dataretention/retention/completion.py
new file mode 100644
index 0000000..59f9931
--- /dev/null
+++ b/dataretention/retention/completion.py
@@ -0,0 +1,130 @@
+import os
+import sys
+import readline
+
+sys.path.append('/srv/audits/retention/scripts/')
+
+import retention.remotefileauditor
+import retention.utils
+import retention.fileutils
+import retention.ruleutils
+import retention.cliutils
+
+class Completion(object):
+ '''
+ user prompts, tab completion of entries
+ '''
+ def __init__(self, dircontents, cenv, max_depth_top_level):
+ self.dircontents = dircontents
+ self.cenv = cenv
+ self.max_depth_top_level = max_depth_top_level
+ self.choices = []
+ self.choice_default = None
+ self.batchno = 1 # gotta fix this sometime
+
+ def host_completion(self, text, state):
+ if text == "":
+ matches = self.cenv.hostlist
+ else:
+ matches = [h for h in self.cenv.hostlist
+ if h.startswith(text)]
+ if len(matches) > 1 and state == 0:
+ for match in matches:
+ print match,
+ print
+
+ try:
+ return matches[state]
+ except IndexError:
+ return None
+
+ def prompt_for_host(self):
+ '''
+ prompt user for host in hostlist,
+ with tab completion
+ '''
+ readline.set_completer(self.host_completion)
+ while True:
+ host_todo = raw_input(
+ "Host on which to examine dirs/files (blank to exit): ")
+ host_todo = host_todo.strip()
+ if host_todo == "":
+ return None
+ if host_todo in self.cenv.hostlist:
+ return host_todo
+ else:
+ print "Please choose one of the following hosts:"
+ retention.cliutils.print_columns(self.cenv.hostlist, 4)
+
+ def dir_completion(self, text, state):
+ if self.cenv.cwdir is None:
+ dirs_problem_to_depth = [retention.cliutils.get_path_prefix(
+ d, self.max_depth_top_level) for d in self.cenv.problem_dirs]
+ dirs_skipped = [s for s in self.cenv.skipped_dirs
+ if s not in dirs_problem_to_depth]
+ relevant_dirs = (sorted(list(set(dirs_problem_to_depth)))
+ + sorted(list(set(dirs_skipped))))
+ else:
+ self.dircontents.get(self.cenv.host, self.cenv.cwdir, self.batchno)
+ relevant_dirs = sorted([s for s in self.dircontents.entries_dict
+ if
self.dircontents.entries_dict[s]['type'] == 'dir'])
+ if text == "":
+ matches = relevant_dirs
+ else:
+ depth = text.count(os.path.sep)
+ # how many path elts do we have in the text, show
+ # matches for basedir of it plus next elt
+ matches = ([d for d in relevant_dirs
+ if d.startswith(text) and
+ d.count(os.path.sep) == depth])
+ try:
+ return matches[state]
+ except IndexError:
+ return None
+
+ def dir_entries_completion(self, text, state):
+ self.dircontents.get(self.cenv.host, self.cenv.cwdir, self.batchno)
+ entries = sorted([s for s in self.dircontents.entries_dict
+ if (self.dircontents.entries_dict[s]['type'] ==
'file' or
+ self.dircontents.entries_dict[s]['type'] ==
'dir')])
+ if text == "":
+ matches = entries
+ else:
+ depth = text.count(os.path.sep)
+ # how many path elts do we have in the text, show
+ # matches for basedir of it plus next elt
+ matches = ([d for d in entries
+ if d.startswith(text) and
+ d.count(os.path.sep) == depth])
+ try:
+ return matches[state]
+ except IndexError:
+ return None
+
+ def prompt_for_dir(self):
+ '''
+ prompt user for host in self.hostlist,
+ with tab completion
+ '''
+
+ readline.set_completer(self.dir_completion)
+ dir_todo = raw_input("Directory (blank to exit): ")
+ dir_todo = dir_todo.strip()
+ if dir_todo == "":
+ return None
+ else:
+ return dir_todo
+
+ def set_choices_completion(self, choices, default):
+ self.choices = choices
+ self.choice_default = default
+ readline.set_completer(self.choices_completion)
+
+ def choices_completion(self, text, state):
+ matches = self.choices
+ if text == "":
+ matches = [self.choice_default]
+ try:
+ return matches[state]
+ except IndexError:
+ return None
diff --git a/dataretention/retention/ignores.py
b/dataretention/retention/ignores.py
new file mode 100644
index 0000000..f324eac
--- /dev/null
+++ b/dataretention/retention/ignores.py
@@ -0,0 +1,89 @@
+import os
+import sys
+import runpy
+
+sys.path.append('/srv/audits/retention/scripts/')
+
+from retention.status import Status
+import retention.remotefileauditor
+import retention.utils
+import retention.fileutils
+import retention.ruleutils
+import retention.cliutils
+
+class Ignores(object):
+ '''
+ collection of files and directories ignored by the audit
+ on a given host
+ '''
+
+ def __init__(self, cdb):
+ self.cdb = cdb
+ self.perhost_rules_from_file = None
+ self.hosts = self.cdb.store_db_list_all_hosts()
+ self.perhost_ignores = {}
+ self.perhost_ignores_from_rules = {}
+ self.perhost_rules_from_store = {}
+ self.get_perhost_cf_from_file()
+
+ def get_perhost_from_rules(self, hosts=None):
+ if hosts == None:
+ hosts = self.hosts
+ for host in hosts:
+ self.perhost_rules_from_store = retention.ruleutils.get_rules(
+ self.cdb, host, Status.text_to_status('good'))
+
+ if self.perhost_rules_from_store is not None:
+ if host not in self.perhost_ignores_from_rules:
+ self.perhost_ignores_from_rules[host] = {}
+ self.perhost_ignores_from_rules[host]['dirs'] = {}
+ self.perhost_ignores_from_rules[host]['dirs']['/'] = []
+ self.perhost_ignores_from_rules[host]['files'] = {}
+ self.perhost_ignores_from_rules[host]['files']['/'] = []
+
+ if (self.perhost_rules_from_file is not None and
+ 'ignored_dirs' in self.perhost_rules_from_file and
+ host in self.perhost_rules_from_file['ignored_dirs']):
+ for path in
self.perhost_rules_from_file['ignored_dirs'][host]:
+ if (path.startswith('/') and
+ path not in
self.perhost_ignores_from_rules[host][
+ 'dirs']['/']):
+ if path[-1] == '/':
+ path = path[:-1]
+ self.perhost_ignores_from_rules[host][
+ 'dirs']['/'].append(path)
+ if (self.perhost_rules_from_file is not None and
+ 'ignored_files' in self.perhost_rules_from_file and
+ host in self.perhost_rules_from_file['ignored_files']):
+ for path in
self.perhost_rules_from_file['ignored_files'][host]:
+ if (path.startswith('/') and
+ path not in self.perhost_ignores_from_rules[
+ host]['files']['/']):
+
self.perhost_ignores_from_rules[host]['files']['/'].append(path)
+
+ def get_perhost_cf_from_file(self):
+ if os.path.exists('audit_files_perhost_config.py'):
+ try:
+ self.perhost_rules_from_file = runpy.run_path(
+ 'audit_files_perhost_config.py')['perhostcf']
+ except:
+ self.perhost_rules_from_file = None
+
+ if self.perhost_rules_from_file is not None:
+ if 'ignored_dirs' in self.perhost_rules_from_file:
+ for host in self.perhost_rules_from_file['ignored_dirs']:
+ if host not in self.perhost_ignores:
+ self.perhost_ignores[host] = {}
+ self.perhost_ignores[host]['dirs'] = {}
+ self.perhost_ignores[host]['dirs']['/'] = [
+ (lambda path: path[:-1] if path[-1] == '/'
+ else path)(p)
+ for p in self.perhost_rules_from_file[
+ 'ignored_dirs'][host]]
+ if 'ignored_files' in self.perhost_rules_from_file:
+ for host in self.perhost_rules_from_file['ignored_files']:
+ if host not in self.perhost_ignores:
+ self.perhost_ignores[host] = {}
+ self.perhost_ignores[host]['files'] = {}
+ self.perhost_ignores[host]['files']['/'] = (
+ self.perhost_rules_from_file['ignored_files'][host])
--
To view, visit https://gerrit.wikimedia.org/r/233463
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I1e8d632d6443d32afd9f0c81b62998babecd3014
Gerrit-PatchSet: 2
Gerrit-Project: operations/software
Gerrit-Branch: master
Gerrit-Owner: ArielGlenn <[email protected]>
Gerrit-Reviewer: ArielGlenn <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits