Repository: incubator-airflow Updated Branches: refs/heads/master b4aa64731 -> 384589890
[AIRFLOW-373] Enhance CLI variables functionality Add export/import to/from json file option for CLI variable command. Add delete variable option for CLI variable command. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/38458989 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/38458989 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/38458989 Branch: refs/heads/master Commit: 38458989009a9708b20f8e77f8e1244a3261cddb Parents: b4aa647 Author: Norman Mu <nor...@agari.com> Authored: Wed Jul 27 15:44:38 2016 -0700 Committer: Sid Anand <siddharthan...@yahoo.com> Committed: Thu Jul 28 16:50:50 2016 -0700 ---------------------------------------------------------------------- airflow/bin/cli.py | 71 +++++++++++++++++++++++++++++++++++++++++++++++-- tests/core.py | 48 +++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/38458989/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index cf05362..2d02f1e 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -161,6 +161,7 @@ def trigger_dag(args): def variables(args): + if args.get: try: var = Variable.get(args.get, @@ -169,15 +170,69 @@ def variables(args): print(var) except ValueError as e: print(e) + if args.delete: + session = settings.Session() + session.query(Variable).filter_by(key=args.delete).delete() + session.commit() + session.close() if args.set: Variable.set(args.set[0], args.set[1]) - if not args.set and not args.get: + # Work around 'import' as a reserved keyword + imp = getattr(args, 'import') + if imp: + if os.path.exists(imp): + import_helper(imp) + else: + print("Missing variables file.") + if args.export: + export_helper(args.export) + if not (args.set or args.get or imp or args.export or args.delete): # list all variables session = settings.Session() vars = session.query(Variable) msg = "\n".join(var.key for var in vars) print(msg) +def import_helper(filepath): + with open(filepath, 'r') as varfile: + var = varfile.read() + + try: + d = json.loads(var) + except Exception: + print("Invalid variables file.") + else: + try: + n = 0 + for k, v in d.items(): + if isinstance(v, dict): + Variable.set(k, v, serialize_json=True) + else: + Variable.set(k, v) + n += 1 + except Exception: + pass + finally: + print("{} of {} variables successfully updated.".format(n, len(d))) + +def export_helper(filepath): + session = settings.Session() + qry = session.query(Variable).all() + session.close() + + var_dict = {} + d = json.JSONDecoder() + for var in qry: + val = None + try: + val = d.decode(var.val) + except Exception: + val = var.val + var_dict[var.key] = val + + with open(filepath, 'w') as varfile: + varfile.write(json.dumps(var_dict, sort_keys=True, indent=4)) + print("{} variables successfully exported to {}".format(len(var_dict), filepath)) def pause(args, dag=None): set_is_paused(True, args, dag) @@ -776,6 +831,18 @@ class CLIFactory(object): ("-j", "--json"), help="Deserialize JSON variable", action="store_true"), + 'var_import': Arg( + ("-i", "--import"), + metavar="FILEPATH", + help="Import variables from JSON file"), + 'var_export': Arg( + ("-e", "--export"), + metavar="FILEPATH", + help="Export variables to JSON file"), + 'var_delete': Arg( + ("-x", "--delete"), + metavar="KEY", + help="Delete a variable"), # kerberos 'principal': Arg( ("principal",), "kerberos principal", @@ -926,7 +993,7 @@ class CLIFactory(object): }, { 'func': variables, 'help': "List all variables", - "args": ('set', 'get', 'json', 'default'), + "args": ('set', 'get', 'json', 'default', 'var_import', 'var_export', 'var_delete'), }, { 'func': kerberos, 'help': "Start a kerberos ticket renewer", http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/38458989/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 36b484b..4b1926c 100644 --- a/tests/core.py +++ b/tests/core.py @@ -898,6 +898,7 @@ class CliTests(unittest.TestCase): ) def test_variables(self): + # Checks if all subcommands are properly received cli.variables(self.parser.parse_args([ 'variables', '-s', 'foo', '{"foo":"bar"}'])) cli.variables(self.parser.parse_args([ @@ -906,7 +907,54 @@ class CliTests(unittest.TestCase): 'variables', '-g', 'baz', '-d', 'bar'])) cli.variables(self.parser.parse_args([ 'variables'])) + cli.variables(self.parser.parse_args([ + 'variables', '-x', 'bar'])) + cli.variables(self.parser.parse_args([ + 'variables', '-i', DEV_NULL])) + cli.variables(self.parser.parse_args([ + 'variables', '-e', DEV_NULL])) + + cli.variables(self.parser.parse_args([ + 'variables', '-s', 'bar', 'original'])) + # First export + cli.variables(self.parser.parse_args([ + 'variables', '-e', 'variables1.json'])) + + first_exp = open('variables1.json', 'r') + + cli.variables(self.parser.parse_args([ + 'variables', '-s', 'bar', 'updated'])) + cli.variables(self.parser.parse_args([ + 'variables', '-s', 'foo', '{"foo":"oops"}'])) + cli.variables(self.parser.parse_args([ + 'variables', '-x', 'foo'])) + # First import + cli.variables(self.parser.parse_args([ + 'variables', '-i', 'variables1.json'])) + + assert models.Variable.get('bar') == 'original' + assert models.Variable.get('foo') == '{"foo": "bar"}' + # Second export + cli.variables(self.parser.parse_args([ + 'variables', '-e', 'variables2.json'])) + + second_exp = open('variables2.json', 'r') + assert second_exp.read() == first_exp.read() + second_exp.close() + first_exp.close() + # Second import + cli.variables(self.parser.parse_args([ + 'variables', '-i', 'variables2.json'])) + + assert models.Variable.get('bar') == 'original' + assert models.Variable.get('foo') == '{"foo": "bar"}' + session = settings.Session() + session.query(Variable).delete() + session.commit() + session.close() + os.remove('variables1.json') + os.remove('variables2.json') class WebUiTests(unittest.TestCase): def setUp(self):