This is an automated email from the git hooks/post-receive script. tille pushed a commit to branch master in repository cwltool.
commit 7009b0d7f68352d99b123944c1104bad21ea61fb Author: Andreas Tille <[email protected]> Date: Mon Dec 19 22:14:05 2016 +0100 New upstream version 1.0.20161216212910 --- PKG-INFO | 2 +- cwltool.egg-info/PKG-INFO | 2 +- cwltool.egg-info/requires.txt | 10 ++-- cwltool/builder.py | 8 ++- cwltool/cwlrdf.py | 8 +-- cwltool/draft2tool.py | 122 ++++++++++++++++++++++-------------------- cwltool/expression.py | 66 ++++++++++++++++------- cwltool/job.py | 2 +- cwltool/load_tool.py | 37 +++++++------ cwltool/main.py | 14 ++--- cwltool/pathmapper.py | 27 +++++----- cwltool/process.py | 77 +++++++++++++++----------- cwltool/sandboxjs.py | 24 +++++++-- cwltool/update.py | 56 +++++++++++++------ cwltool/workflow.py | 39 ++++++++------ setup.cfg | 2 +- setup.py | 10 ++-- 17 files changed, 301 insertions(+), 205 deletions(-) diff --git a/PKG-INFO b/PKG-INFO index b5c4a12..1681910 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 1.1 Name: cwltool -Version: 1.0.20161207161158 +Version: 1.0.20161216212910 Summary: Common workflow language reference implementation Home-page: https://github.com/common-workflow-language/cwltool Author: Common workflow language working group diff --git a/cwltool.egg-info/PKG-INFO b/cwltool.egg-info/PKG-INFO index b5c4a12..1681910 100644 --- a/cwltool.egg-info/PKG-INFO +++ b/cwltool.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 1.1 Name: cwltool -Version: 1.0.20161207161158 +Version: 1.0.20161216212910 Summary: Common workflow language reference implementation Home-page: https://github.com/common-workflow-language/cwltool Author: Common workflow language working group diff --git a/cwltool.egg-info/requires.txt b/cwltool.egg-info/requires.txt index f443740..961c503 100644 --- a/cwltool.egg-info/requires.txt +++ b/cwltool.egg-info/requires.txt @@ -1,8 +1,8 @@ setuptools -requests>=1.0 -ruamel.yaml == 0.12.4 +requests >= 1.0 +ruamel.yaml >= 0.12.4, < 0.12.5 rdflib >= 4.2.0, < 4.3.0 -shellescape -schema-salad >= 1.21.20161206204028, < 2 -typing >= 3.5.2 +shellescape >= 3.4.1, < 3.5 +schema-salad >= 2.1.20161216210732, < 3 +typing >= 3.5.2, < 3.6 cwltest >= 1.0.20160907111242 diff --git a/cwltool/builder.py b/cwltool/builder.py index 8ddc797..40cb789 100644 --- a/cwltool/builder.py +++ b/cwltool/builder.py @@ -3,6 +3,7 @@ from .utils import aslist from . import expression import avro import schema_salad.validate as validate +from schema_salad.sourceline import SourceLine from typing import Any, Callable, Text, Type, Union from .errors import WorkflowException from .stdfsaccess import StdFsAccess @@ -36,6 +37,7 @@ class Builder(object): self.stagedir = None # type: Text self.make_fs_access = None # type: Type[StdFsAccess] self.build_job_script = None # type: Callable[[List[str]], Text] + self.debug = False # type: bool def bind_input(self, schema, datum, lead_pos=[], tail_pos=[]): # type: (Dict[Text, Any], Any, Union[int, List[int]], List[int]) -> List[Dict[Text, Any]] @@ -155,7 +157,8 @@ class Builder(object): def generate_arg(self, binding): # type: (Dict[Text,Any]) -> List[Text] value = binding.get("datum") if "valueFrom" in binding: - value = self.do_eval(binding["valueFrom"], context=value) + with SourceLine(binding, "valueFrom", WorkflowException): + value = self.do_eval(binding["valueFrom"], context=value) prefix = binding.get("prefix") sep = binding.get("separate", True) @@ -203,4 +206,5 @@ class Builder(object): self.outdir, self.tmpdir, self.resources, context=context, pull_image=pull_image, - timeout=self.timeout) + timeout=self.timeout, + debug=self.debug) diff --git a/cwltool/cwlrdf.py b/cwltool/cwlrdf.py index b3b6739..950357b 100644 --- a/cwltool/cwlrdf.py +++ b/cwltool/cwlrdf.py @@ -1,13 +1,13 @@ import json import urlparse from .process import Process -from schema_salad.ref_resolver import Loader +from schema_salad.ref_resolver import Loader, ContextType from schema_salad.jsonld_context import makerdf from rdflib import Graph, plugin, URIRef from rdflib.serializer import Serializer from typing import Any, Dict, IO, Text, Union -def gather(tool, ctx): # type: (Process, Loader.ContextType) -> Graph +def gather(tool, ctx): # type: (Process, ContextType) -> Graph g = Graph() def visitor(t): @@ -17,7 +17,7 @@ def gather(tool, ctx): # type: (Process, Loader.ContextType) -> Graph return g def printrdf(wf, ctx, sr, stdout): - # type: (Process, Loader.ContextType, Text, IO[Any]) -> None + # type: (Process, ContextType, Text, IO[Any]) -> None stdout.write(gather(wf, ctx).serialize(format=sr)) def lastpart(uri): # type: (Any) -> Text @@ -158,7 +158,7 @@ def dot_without_parameters(g, stdout): # type: (Graph, IO[Any]) -> None def printdot(wf, ctx, stdout, include_parameters=False): - # type: (Process, Loader.ContextType, Any, bool) -> None + # type: (Process, ContextType, Any, bool) -> None g = gather(wf, ctx) stdout.write("digraph {") diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index a1b2a09..0a5b5ad 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -25,6 +25,8 @@ from .pathmapper import PathMapper from .job import CommandLineJob from .stdfsaccess import StdFsAccess +from schema_salad.sourceline import SourceLine, indent + ACCEPTLIST_EN_STRICT_RE = re.compile(r"^[a-zA-Z0-9._+-]+$") ACCEPTLIST_EN_RELAXED_RE = re.compile(r"^[ a-zA-Z0-9._+-]+$") # with spaces ACCEPTLIST_RE = ACCEPTLIST_EN_STRICT_RE @@ -159,11 +161,7 @@ class CommandLineTool(Process): def makePathMapper(self, reffiles, stagedir, **kwargs): # type: (List[Any], Text, **Any) -> PathMapper dockerReq, _ = self.get_requirement("DockerRequirement") - try: - return PathMapper(reffiles, kwargs["basedir"], stagedir) - except OSError as e: - if e.errno == errno.ENOENT: - raise WorkflowException(u"Missing input file %s" % e) + return PathMapper(reffiles, kwargs["basedir"], stagedir) def job(self, joborder, output_callback, **kwargs): # type: (Dict[Text, Text], Callable[..., Any], **Any) -> Generator[Union[CommandLineJob, CallbackJob], None, None] @@ -286,18 +284,21 @@ class CommandLineTool(Process): adjustDirObjs(builder.bindings, _check_adjust) if self.tool.get("stdin"): - j.stdin = builder.do_eval(self.tool["stdin"]) - reffiles.append({"class": "File", "path": j.stdin}) + with SourceLine(self.tool, "stdin", validate.ValidationException): + j.stdin = builder.do_eval(self.tool["stdin"]) + reffiles.append({"class": "File", "path": j.stdin}) if self.tool.get("stderr"): - j.stderr = builder.do_eval(self.tool["stderr"]) - if os.path.isabs(j.stderr) or ".." in j.stderr: - raise validate.ValidationException("stderr must be a relative path") + with SourceLine(self.tool, "stderr", validate.ValidationException): + j.stderr = builder.do_eval(self.tool["stderr"]) + if os.path.isabs(j.stderr) or ".." in j.stderr: + raise validate.ValidationException("stderr must be a relative path, got '%s'" % j.stderr) if self.tool.get("stdout"): - j.stdout = builder.do_eval(self.tool["stdout"]) - if os.path.isabs(j.stdout) or ".." in j.stdout or not j.stdout: - raise validate.ValidationException("stdout must be a relative path") + with SourceLine(self.tool, "stdout", validate.ValidationException): + j.stdout = builder.do_eval(self.tool["stdout"]) + if os.path.isabs(j.stdout) or ".." in j.stdout or not j.stdout: + raise validate.ValidationException("stdout must be a relative path, got '%s'" % j.stdout) if _logger.isEnabledFor(logging.DEBUG): _logger.debug(u"[job %s] command line bindings is %s", j.name, json.dumps(builder.bindings, indent=4)) @@ -389,17 +390,18 @@ class CommandLineTool(Process): if _logger.isEnabledFor(logging.DEBUG): _logger.debug(u"Raw output from %s: %s", custom_output, json.dumps(ret, indent=4)) else: - for port in ports: - fragment = shortname(port["id"]) - try: - ret[fragment] = self.collect_output(port, builder, outdir, fs_access, compute_checksum=compute_checksum) - except Exception as e: - _logger.debug( - u"Error collecting output for parameter '%s'" - % shortname(port["id"]), exc_info=True) - raise WorkflowException( - u"Error collecting output for parameter '%s': %s" - % (shortname(port["id"]), e)) + for i, port in enumerate(ports): + with SourceLine(ports, i, WorkflowException): + fragment = shortname(port["id"]) + try: + ret[fragment] = self.collect_output(port, builder, outdir, fs_access, compute_checksum=compute_checksum) + except Exception as e: + _logger.debug( + u"Error collecting output for parameter '%s'" + % shortname(port["id"]), exc_info=True) + raise WorkflowException( + u"Error collecting output for parameter '%s':\n%s" + % (shortname(port["id"]), indent(unicode(e)))) if ret: adjustFileObjs(ret, @@ -427,24 +429,25 @@ class CommandLineTool(Process): revmap = partial(revmap_file, builder, outdir) if "glob" in binding: - for gb in aslist(binding["glob"]): - gb = builder.do_eval(gb) - if gb: - globpatterns.extend(aslist(gb)) - - for gb in globpatterns: - if gb.startswith(outdir): - gb = gb[len(outdir)+1:] - elif gb == ".": - gb = outdir - elif gb.startswith("/"): - raise WorkflowException("glob patterns must not start with '/'") - try: - r.extend([{"location": g, - "class": "File" if fs_access.isfile(g) else "Directory"} - for g in fs_access.glob(fs_access.join(outdir, gb))]) - except (OSError, IOError) as e: - _logger.warn(Text(e)) + with SourceLine(binding, "glob", WorkflowException): + for gb in aslist(binding["glob"]): + gb = builder.do_eval(gb) + if gb: + globpatterns.extend(aslist(gb)) + + for gb in globpatterns: + if gb.startswith(outdir): + gb = gb[len(outdir)+1:] + elif gb == ".": + gb = outdir + elif gb.startswith("/"): + raise WorkflowException("glob patterns must not start with '/'") + try: + r.extend([{"location": g, + "class": "File" if fs_access.isfile(g) else "Directory"} + for g in fs_access.glob(fs_access.join(outdir, gb))]) + except (OSError, IOError) as e: + _logger.warn(Text(e)) for files in r: if files["class"] == "Directory" and "listing" not in files: @@ -479,11 +482,13 @@ class CommandLineTool(Process): single = True if "outputEval" in binding: - r = builder.do_eval(binding["outputEval"], context=r) + with SourceLine(binding, "outputEval", WorkflowException): + r = builder.do_eval(binding["outputEval"], context=r) if single: if not r and not optional: - raise WorkflowException("Did not find output file with glob pattern: '{}'".format(globpatterns)) + with SourceLine(binding, "glob", WorkflowException): + raise WorkflowException("Did not find output file with glob pattern: '{}'".format(globpatterns)) elif not r and optional: pass elif isinstance(r, list): @@ -498,20 +503,21 @@ class CommandLineTool(Process): Callable[[Any], Any], revmap)) if "secondaryFiles" in schema: - for primary in aslist(r): - if isinstance(primary, dict): - primary["secondaryFiles"] = [] - for sf in aslist(schema["secondaryFiles"]): - if isinstance(sf, dict) or "$(" in sf or "${" in sf: - sfpath = builder.do_eval(sf, context=primary) - if isinstance(sfpath, basestring): - sfpath = revmap({"location": sfpath, "class": "File"}) - else: - sfpath = {"location": substitute(primary["location"], sf), "class": "File"} - - for sfitem in aslist(sfpath): - if fs_access.exists(sfitem["location"]): - primary["secondaryFiles"].append(sfitem) + with SourceLine(schema, "secondaryFiles", WorkflowException): + for primary in aslist(r): + if isinstance(primary, dict): + primary["secondaryFiles"] = [] + for sf in aslist(schema["secondaryFiles"]): + if isinstance(sf, dict) or "$(" in sf or "${" in sf: + sfpath = builder.do_eval(sf, context=primary) + if isinstance(sfpath, basestring): + sfpath = revmap({"location": sfpath, "class": "File"}) + else: + sfpath = {"location": substitute(primary["location"], sf), "class": "File"} + + for sfitem in aslist(sfpath): + if fs_access.exists(sfitem["location"]): + primary["secondaryFiles"].append(sfitem) if not r and optional: r = None diff --git a/cwltool/expression.py b/cwltool/expression.py index fd38cfe..96cbe99 100644 --- a/cwltool/expression.py +++ b/cwltool/expression.py @@ -3,6 +3,7 @@ import json import logging import os import re +import copy from typing import Any, AnyStr, Union, Text, Dict, List import schema_salad.validate as validate @@ -106,33 +107,54 @@ def scanner(scan): # type: (Text) -> List[int] else: return None -def next_seg(remain, obj): # type: (Text, Any)->Text +def next_seg(remain, obj): # type: (Text, Any) -> Any if remain: m = segment_re.match(remain) + key = None # type: Union[str, int] if m.group(0)[0] == '.': - return next_seg(remain[m.end(0):], obj[m.group(0)[1:]]) + key = m.group(0)[1:] elif m.group(0)[1] in ("'", '"'): key = m.group(0)[2:-2].replace("\\'", "'").replace('\\"', '"') - return next_seg(remain[m.end(0):], obj[key]) + + if key: + if isinstance(obj, list) and key == "length" and not remain[m.end(0):]: + return len(obj) + if not isinstance(obj, dict): + raise WorkflowException(" is a %s, cannot index on string '%s'" % (type(obj).__name__, key)) + if key not in obj: + raise WorkflowException(" does not contain key '%s'" % key) else: - key = m.group(0)[1:-1] - return next_seg(remain[m.end(0):], obj[int(key)]) + try: + key = int(m.group(0)[1:-1]) + except ValueError as v: + raise WorkflowException(unicode(v)) + if not isinstance(obj, list): + raise WorkflowException(" is a %s, cannot index on int '%s'" % (type(obj).__name__, key)) + if key >= len(obj): + raise WorkflowException(" list index %i out of range" % key) + try: + return next_seg(remain[m.end(0):], obj[key]) + except WorkflowException as w: + raise WorkflowException("%s%s" % (m.group(0), w)) else: return obj -def evaluator(ex, jslib, obj, fullJS=False, timeout=None): - # type: (Text, Text, Dict[Text, Any], bool, int) -> JSON +def evaluator(ex, jslib, obj, fullJS=False, timeout=None, debug=False): + # type: (Text, Text, Dict[Text, Any], bool, int, bool) -> JSON m = param_re.match(ex) if m: - return next_seg(m.group(0)[m.end(1) - m.start(0):-1], obj[m.group(1)]) + try: + return next_seg(m.group(0)[m.end(1) - m.start(0):-1], obj[m.group(1)]) + except Exception as w: + raise WorkflowException("%s%s" % (m.group(1), w)) elif fullJS: - return sandboxjs.execjs(ex, jslib, timeout=timeout) + return sandboxjs.execjs(ex, jslib, timeout=timeout, debug=debug) else: raise sandboxjs.JavascriptException("Syntax error in parameter reference '%s' or used Javascript code without specifying InlineJavascriptRequirement.", ex) def interpolate(scan, rootvars, - timeout=None, fullJS=None, jslib=""): - # type: (Text, Dict[Text, Any], int, bool, Union[str, Text]) -> JSON + timeout=None, fullJS=None, jslib="", debug=False): + # type: (Text, Dict[Text, Any], int, bool, Union[str, Text], bool) -> JSON scan = scan.strip() parts = [] w = scanner(scan) @@ -141,7 +163,7 @@ def interpolate(scan, rootvars, if scan[w[0]] == '$': e = evaluator(scan[w[0]+1:w[1]], jslib, rootvars, fullJS=fullJS, - timeout=timeout) + timeout=timeout, debug=debug) if w[0] == 0 and w[1] == len(scan): return e leaf = json.dumps(e, sort_keys=True) @@ -158,10 +180,10 @@ def interpolate(scan, rootvars, return ''.join(parts) def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources, - context=None, pull_image=True, timeout=None): - # type: (Union[dict, AnyStr], Dict[Text, Union[Dict, List, Text]], List[Dict[Text, Any]], Text, Text, Dict[Text, Union[int, Text]], Any, bool, int) -> Any + context=None, pull_image=True, timeout=None, debug=False): + # type: (Union[dict, AnyStr], Dict[Text, Union[Dict, List, Text]], List[Dict[Text, Any]], Text, Text, Dict[Text, Union[int, Text]], Any, bool, int, bool) -> Any - runtime = resources.copy() + runtime = copy.copy(resources) runtime["tmpdir"] = tmpdir runtime["outdir"] = outdir @@ -179,10 +201,14 @@ def do_eval(ex, jobinput, requirements, outdir, tmpdir, resources, jslib = jshead(r.get("expressionLib", []), rootvars) break - return interpolate(ex, - rootvars, - timeout=timeout, - fullJS=fullJS, - jslib=jslib) + try: + return interpolate(ex, + rootvars, + timeout=timeout, + fullJS=fullJS, + jslib=jslib, + debug=debug) + except Exception as e: + raise WorkflowException("Expression evaluation error:\n%s" % e) else: return ex diff --git a/cwltool/job.py b/cwltool/job.py index 40d369b..4aca3d5 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -305,7 +305,7 @@ class CommandLineJob(object): _logger.exception("Exception while running job") processStatus = "permanentFail" except WorkflowException as e: - _logger.error(u"Error while running job: %s" % e) + _logger.error(u"[job %s] Job error:\n%s" % (self.name, e)) processStatus = "permanentFail" except Exception as e: _logger.exception("Exception while running job") diff --git a/cwltool/load_tool.py b/cwltool/load_tool.py index 2f3d887..f2b3dbb 100644 --- a/cwltool/load_tool.py +++ b/cwltool/load_tool.py @@ -7,15 +7,16 @@ import logging import re import urlparse +from typing import Any, AnyStr, Callable, cast, Dict, Text, Tuple, Union +from ruamel.yaml.comments import CommentedSeq, CommentedMap +from avro.schema import Names +import requests.sessions + from schema_salad.ref_resolver import Loader, Fetcher import schema_salad.validate as validate from schema_salad.validate import ValidationException import schema_salad.schema as schema -import requests - -from typing import Any, AnyStr, Callable, cast, Dict, Text, Tuple, Union - -from avro.schema import Names +from schema_salad.sourceline import cmap from . import update from . import process @@ -28,14 +29,14 @@ def fetch_document(argsworkflow, # type: Union[Text, dict[Text, Any]] resolver=None, # type: Callable[[Loader, Union[Text, dict[Text, Any]]], Text] fetcher_constructor=None # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher] ): - # type: (...) -> Tuple[Loader, Dict[Text, Any], Text] + # type: (...) -> Tuple[Loader, CommentedMap, Text] """Retrieve a CWL document.""" document_loader = Loader({"cwl": "https://w3id.org/cwl/cwl#", "id": "@id"}, fetcher_constructor=fetcher_constructor) uri = None # type: Text - workflowobj = None # type: Dict[Text, Any] + workflowobj = None # type: CommentedMap if isinstance(argsworkflow, basestring): split = urlparse.urlsplit(argsworkflow) if split.scheme: @@ -54,8 +55,8 @@ def fetch_document(argsworkflow, # type: Union[Text, dict[Text, Any]] fileuri = urlparse.urldefrag(uri)[0] workflowobj = document_loader.fetch(fileuri) elif isinstance(argsworkflow, dict): - workflowobj = argsworkflow uri = "#" + Text(id(argsworkflow)) + workflowobj = cast(CommentedMap, cmap(argsworkflow, fn=uri)) else: raise ValidationException("Must be URI or object: '%s'" % argsworkflow) @@ -106,7 +107,7 @@ def _convert_stdstreams_to_files(workflowobj): _convert_stdstreams_to_files(entry) def validate_document(document_loader, # type: Loader - workflowobj, # type: Dict[Text, Any] + workflowobj, # type: CommentedMap uri, # type: Text enable_dev=False, # type: bool strict=True, # type: bool @@ -144,8 +145,8 @@ def validate_document(document_loader, # type: Loader workflowobj["cwlVersion"] = "draft-2" if workflowobj["cwlVersion"] == "draft-2": - workflowobj = update._draft2toDraft3dev1( - workflowobj, document_loader, uri, update_steps=False) + workflowobj = cast(CommentedMap, cmap(update._draft2toDraft3dev1( + workflowobj, document_loader, uri, update_steps=False))) if "@graph" in workflowobj: workflowobj["$graph"] = workflowobj["@graph"] del workflowobj["@graph"] @@ -156,21 +157,23 @@ def validate_document(document_loader, # type: Loader if isinstance(avsc_names, Exception): raise avsc_names + processobj = None # type: Union[CommentedMap, CommentedSeq, unicode] document_loader = Loader(sch_document_loader.ctx, schemagraph=sch_document_loader.graph, idx=document_loader.idx, cache=sch_document_loader.cache, fetcher_constructor=fetcher_constructor) workflowobj["id"] = fileuri processobj, metadata = document_loader.resolve_all(workflowobj, fileuri) - if not isinstance(processobj, (dict, list)): + if not isinstance(processobj, (CommentedMap, CommentedSeq)): raise ValidationException("Workflow must be a dict or list.") if not metadata: if not isinstance(processobj, dict): raise ValidationException("Draft-2 workflows must be a dict.") - metadata = {"$namespaces": processobj.get("$namespaces", {}), - "$schemas": processobj.get("$schemas", []), - "cwlVersion": processobj["cwlVersion"]} + metadata = cast(CommentedMap, cmap({"$namespaces": processobj.get("$namespaces", {}), + "$schemas": processobj.get("$schemas", []), + "cwlVersion": processobj["cwlVersion"]}, + fn=fileuri)) _convert_stdstreams_to_files(workflowobj) @@ -180,8 +183,8 @@ def validate_document(document_loader, # type: Loader schema.validate_doc(avsc_names, processobj, document_loader, strict) if metadata.get("cwlVersion") != update.LATEST: - processobj = update.update( - processobj, document_loader, fileuri, enable_dev, metadata) + processobj = cast(CommentedMap, cmap(update.update( + processobj, document_loader, fileuri, enable_dev, metadata))) if jobobj: metadata[u"cwl:defaults"] = jobobj diff --git a/cwltool/main.py b/cwltool/main.py index b5f74b8..a8279bb 100755 --- a/cwltool/main.py +++ b/cwltool/main.py @@ -22,6 +22,7 @@ from schema_salad.ref_resolver import Loader, Fetcher import schema_salad.validate as validate import schema_salad.jsonld_context import schema_salad.makedoc +from schema_salad.sourceline import strip_dup_lineno from . import workflow from .errors import WorkflowException, UnsupportedRequirement @@ -43,7 +44,6 @@ _logger.setLevel(logging.INFO) def arg_parser(): # type: () -> argparse.ArgumentParser parser = argparse.ArgumentParser(description='Reference executor for Common Workflow Language') - parser.add_argument("--conformance-test", action="store_true") parser.add_argument("--basedir", type=Text) parser.add_argument("--outdir", type=Text, default=os.path.abspath('.'), help="Output directory, default current directory") @@ -113,9 +113,6 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup.add_argument("--disable-pull", default=True, action="store_false", help="Do not try to pull Docker images", dest="enable_pull") - parser.add_argument("--dry-run", action="store_true", - help="Load and validate but do not execute") - parser.add_argument("--rdf-serializer", help="Output RDF serialization format used by --print-rdf (one of turtle (default), n3, nt, xml)", default="turtle") @@ -134,6 +131,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup.add_argument("--print-input-deps", action="store_true", help="Print input object document dependencies.") exgroup.add_argument("--pack", action="store_true", help="Combine components into single document and print.") exgroup.add_argument("--version", action="store_true", help="Print version and exit") + exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.") exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--strict", action="store_true", help="Strict validation (unrecognized or out of place fields are error)", @@ -599,7 +597,8 @@ def main(argsl=None, # type: List[str] 'workflow': None, 'job_order': None, 'pack': False, - 'on_error': 'continue'}.iteritems(): + 'on_error': 'continue', + 'relax_path_checks': False}.iteritems(): if not hasattr(args, k): setattr(args, k, v) @@ -637,6 +636,9 @@ def main(argsl=None, # type: List[str] preprocess_only=args.print_pre or args.pack, fetcher_constructor=fetcher_constructor) + if args.validate: + return 0 + if args.pack: stdout.write(print_pack(document_loader, processobj, uri, metadata)) return 0 @@ -742,7 +744,7 @@ def main(argsl=None, # type: List[str] except WorkflowException as exc: _logger.error( u"Workflow error, try again with --debug for more " - "information:\n %s", exc, exc_info=args.debug) + "information:\n%s", strip_dup_lineno(unicode(exc)), exc_info=args.debug) return 1 except Exception as exc: _logger.error( diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index ac811d9..b50538a 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -7,6 +7,7 @@ import urlparse from functools import partial from typing import Any, Callable, Set, Text, Tuple, Union import schema_salad.validate as validate +from schema_salad.sourceline import SourceLine _logger = logging.getLogger("cwltool") @@ -166,7 +167,17 @@ class PathMapper(object): if copy: self._pathmap[path] = MapperEnt(ab, tgt, "WritableFile") else: - self._pathmap[path] = MapperEnt(ab, tgt, "File") + with SourceLine(obj, "location", validate.ValidationException): + # Dereference symbolic links + deref = ab + st = os.lstat(deref) + while stat.S_ISLNK(st.st_mode): + rl = os.readlink(deref) + deref = rl if os.path.isabs(rl) else os.path.join( + os.path.dirname(deref), rl) + st = os.lstat(deref) + + self._pathmap[path] = MapperEnt(deref, tgt, "File") self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir) def setup(self, referenced_files, basedir): @@ -180,20 +191,6 @@ class PathMapper(object): stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4()) self.visit(fob, stagedir, basedir) - # Dereference symbolic links - for path, (ab, tgt, type) in self._pathmap.items(): - if type != "File": # or not os.path.exists(ab): - continue - deref = ab - st = os.lstat(deref) - while stat.S_ISLNK(st.st_mode): - rl = os.readlink(deref) - deref = rl if os.path.isabs(rl) else os.path.join( - os.path.dirname(deref), rl) - st = os.lstat(deref) - - self._pathmap[path] = MapperEnt(deref, tgt, "File") - def mapper(self, src): # type: (Text) -> MapperEnt if u"#" in src: i = src.index(u"#") diff --git a/cwltool/process.py b/cwltool/process.py index b4e9001..38547c8 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -17,6 +17,7 @@ import abc import schema_salad.validate as validate import schema_salad.schema from schema_salad.ref_resolver import Loader +from schema_salad.sourceline import SourceLine import avro.schema from typing import (Any, AnyStr, Callable, cast, Dict, List, Generator, IO, Text, Tuple, Union) @@ -25,6 +26,9 @@ from rdflib.namespace import RDFS, OWL from rdflib import Graph from pkg_resources import resource_stream + +from ruamel.yaml.comments import CommentedSeq, CommentedMap + from .utils import aslist, get_feature from .stdfsaccess import StdFsAccess from .builder import Builder, adjustFileObjs, adjustDirObjs @@ -128,9 +132,10 @@ def checkRequirements(rec, supportedProcessRequirements): # type: (Any, Iterable[Any]) -> None if isinstance(rec, dict): if "requirements" in rec: - for r in rec["requirements"]: - if r["class"] not in supportedProcessRequirements: - raise UnsupportedRequirement(u"Unsupported requirement %s" % r["class"]) + for i, r in enumerate(rec["requirements"]): + with SourceLine(rec["requirements"], i, UnsupportedRequirement): + if r["class"] not in supportedProcessRequirements: + raise UnsupportedRequirement(u"Unsupported requirement %s" % r["class"]) for d in rec: checkRequirements(rec[d], supportedProcessRequirements) if isinstance(rec, list): @@ -281,15 +286,16 @@ def checkFormat(actualFile, inputFormats, ontology): def fillInDefaults(inputs, job): # type: (List[Dict[Text, Text]], Dict[Text, Union[Dict[Text, Any], List, Text]]) -> None - for inp in inputs: - if shortname(inp[u"id"]) in job: - pass - elif shortname(inp[u"id"]) not in job and u"default" in inp: - job[shortname(inp[u"id"])] = copy.copy(inp[u"default"]) - elif shortname(inp[u"id"]) not in job and inp[u"type"][0] == u"null": - pass - else: - raise validate.ValidationException("Missing input parameter `%s`" % shortname(inp["id"])) + for e, inp in enumerate(inputs): + with SourceLine(inputs, e, WorkflowException): + if shortname(inp[u"id"]) in job: + pass + elif shortname(inp[u"id"]) not in job and u"default" in inp: + job[shortname(inp[u"id"])] = copy.copy(inp[u"default"]) + elif shortname(inp[u"id"]) not in job and aslist(inp[u"type"])[0] == u"null": + pass + else: + raise WorkflowException("Missing required input parameter `%s`" % shortname(inp["id"])) def avroize_type(field_type, name_prefix=""): @@ -432,23 +438,23 @@ class Process(object): builder.job = cast(Dict[Text, Union[Dict[Text, Any], List, Text]], copy.deepcopy(joborder)) - fillInDefaults(self.tool[u"inputs"], builder.job) - normalizeFilesDirs(builder.job) - # Validate job order try: + fillInDefaults(self.tool[u"inputs"], builder.job) + normalizeFilesDirs(builder.job) validate.validate_ex(self.names.get_name("input_record_schema", ""), builder.job) - except validate.ValidationException as e: - raise WorkflowException("Error validating input record, " + Text(e)) + except (validate.ValidationException, WorkflowException) as e: + raise WorkflowException("Invalid job input record:\n" + Text(e)) builder.files = [] - builder.bindings = [] + builder.bindings = CommentedSeq() builder.schemaDefs = self.schemaDefs builder.names = self.names builder.requirements = self.requirements builder.hints = self.hints builder.resources = {} builder.timeout = kwargs.get("eval_timeout") + builder.debug = kwargs.get("debug") dockerReq, is_req = self.get_requirement("DockerRequirement") @@ -484,6 +490,9 @@ class Process(object): if self.tool.get("arguments"): for i, a in enumerate(self.tool["arguments"]): + lc = self.tool["arguments"].lc.data[i] + fn = self.tool["arguments"].lc.filename + builder.bindings.lc.add_kv_line_col(len(builder.bindings), lc) if isinstance(a, dict): a = copy.copy(a) if a.get("position"): @@ -492,15 +501,21 @@ class Process(object): a["position"] = [0, i] builder.bindings.append(a) elif ("$(" in a) or ("${" in a): - builder.bindings.append({ - "position": [0, i], - "valueFrom": a - }) + cm = CommentedMap(( + ("position", [0, i]), + ("valueFrom", a) + )) + cm.lc.add_kv_line_col("valueFrom", lc) + cm.lc.filename = fn + builder.bindings.append(cm) else: - builder.bindings.append({ - "position": [0, i], - "datum": a - }) + cm = CommentedMap(( + ("position", [0, i]), + ("datum", a) + )) + cm.lc.add_kv_line_col("datum", lc) + cm.lc.filename = fn + builder.bindings.append(cm) builder.bindings.sort(key=lambda a: a["position"]) @@ -551,8 +566,9 @@ class Process(object): def validate_hints(self, avsc_names, hints, strict): # type: (Any, List[Dict[Text, Any]], bool) -> None - for r in hints: - try: + for i, r in enumerate(hints): + sl = SourceLine(hints, i, validate.ValidationException) + with sl: if avsc_names.get_name(r["class"], "") is not None: plain_hint = dict((key,r[key]) for key in r if key not in self.doc_loader.identifiers) # strip identifiers @@ -560,10 +576,7 @@ class Process(object): avsc_names.get_name(plain_hint["class"], ""), plain_hint, strict=strict) else: - _logger.info(Text(validate.ValidationException( - u"Unknown hint %s" % (r["class"])))) - except validate.ValidationException as v: - raise validate.ValidationException(u"Validating hint `%s`: %s" % (r["class"], Text(v))) + _logger.info(sl.makeError(u"Unknown hint %s" % (r["class"]))) def get_requirement(self, feature): # type: (Any) -> Tuple[Any, bool] return get_feature(self, feature) diff --git a/cwltool/sandboxjs.py b/cwltool/sandboxjs.py index 0885d71..70a9f19 100644 --- a/cwltool/sandboxjs.py +++ b/cwltool/sandboxjs.py @@ -73,7 +73,7 @@ def new_js_proc(): return nodejs -def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) -> JSON +def execjs(js, jslib, timeout=None, debug=False): # type: (Union[Mapping, Text], Any, int, bool) -> JSON if not hasattr(localdata, "proc") or localdata.proc.poll() is not None: localdata.proc = new_js_proc() @@ -128,13 +128,29 @@ def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) - stderrdata = stderr_buf.getvalue() def fn_linenum(): # type: () -> Text - return u"\n".join(u"%04i %s" % (i+1, b) for i, b in enumerate(fn.split("\n"))) + lines = fn.splitlines() + ofs = 0 + maxlines = 99 + if len(lines) > maxlines: + ofs = len(lines)-maxlines + lines = lines[-maxlines:] + return u"\n".join(u"%02i %s" % (i+ofs+1, b) for i, b in enumerate(lines)) + + def stdfmt(data): # type: (unicode) -> unicode + if "\n" in data: + return "\n" + data.strip() + return data + + if debug: + info = u"returncode was: %s\nscript was:\n%s\nstdout was: %s\nstderr was: %s\n" % (nodejs.returncode, fn_linenum(), stdfmt(stdoutdata), stdfmt(stderrdata)) + else: + info = stdfmt(stderrdata) if nodejs.poll() not in (None, 0): if killed: - raise JavascriptException(u"Long-running script killed after %s seconds.\nscript was:\n%s\n" % (timeout, fn_linenum())) + raise JavascriptException(u"Long-running script killed after %s seconds: %s" % (timeout, info)) else: - raise JavascriptException(u"Returncode was: %s\nscript was:\n%s\nstdout was: '%s'\nstderr was: '%s'\n" % (nodejs.returncode, fn_linenum(), stdoutdata, stderrdata)) + raise JavascriptException(info) else: try: return json.loads(stdoutdata) diff --git a/cwltool/update.py b/cwltool/update.py index 7dfacce..762ad03 100644 --- a/cwltool/update.py +++ b/cwltool/update.py @@ -3,11 +3,14 @@ import urlparse import json import re import traceback +import copy from schema_salad.ref_resolver import Loader import schema_salad.validate from typing import Any, Callable, Dict, List, Text, Tuple, Union # pylint: disable=unused-import +from ruamel.yaml.comments import CommentedSeq, CommentedMap + from .utils import aslist def findId(doc, frg): # type: (Any, Any) -> Dict @@ -28,7 +31,9 @@ def findId(doc, frg): # type: (Any, Any) -> Dict def fixType(doc): # type: (Any) -> Any if isinstance(doc, list): - return [fixType(f) for f in doc] + for i, f in enumerate(doc): + doc[i] = fixType(f) + return doc if isinstance(doc, (str, Text)): if doc not in ( @@ -82,7 +87,8 @@ def _draft2toDraft3dev1(doc, loader, baseuri, update_steps=True): doc[a] = _draft2toDraft3dev1(doc[a], loader, baseuri) if isinstance(doc, list): - return [_draft2toDraft3dev1(a, loader, baseuri) for a in doc] + for i, a in enumerate(doc): + doc[i] = _draft2toDraft3dev1(a, loader, baseuri) return doc except Exception as e: @@ -165,7 +171,10 @@ def _draftDraft3dev1toDev2(doc, loader, baseuri): "expressionLib": [updateScript(sc) for sc in aslist(r["engineConfig"])] }) added = True - doc["requirements"] = [rq for rq in doc["requirements"] if rq["class"] != "ExpressionEngineRequirement"] + for i, rq in enumerate(doc["requirements"]): + if rq["class"] == "ExpressionEngineRequirement": + del doc["requirements"][i] + break break else: doc["requirements"] = [] @@ -173,7 +182,8 @@ def _draftDraft3dev1toDev2(doc, loader, baseuri): doc["requirements"].append({"class":"InlineJavascriptRequirement"}) elif isinstance(doc, list): - return [_draftDraft3dev1toDev2(a, loader, baseuri) for a in doc] + for i, a in enumerate(doc): + doc[i] = _draftDraft3dev1toDev2(a, loader, baseuri) return doc @@ -213,7 +223,8 @@ def _draftDraft3dev2toDev3(doc, loader, baseuri): doc[a] = _draftDraft3dev2toDev3(doc[a], loader, baseuri) if isinstance(doc, list): - return [_draftDraft3dev2toDev3(a, loader, baseuri) for a in doc] + for i, a in enumerate(doc): + doc[i] = _draftDraft3dev2toDev3(a, loader, baseuri) return doc except Exception as e: @@ -269,7 +280,8 @@ def _draftDraft3dev3toDev4(doc, loader, baseuri): doc[a] = _draftDraft3dev3toDev4(doc[a], loader, baseuri) if isinstance(doc, list): - return [_draftDraft3dev3toDev4(a, loader, baseuri) for a in doc] + for i, a in enumerate(doc): + doc[i] = _draftDraft3dev3toDev4(a, loader, baseuri) return doc except Exception as e: @@ -303,7 +315,8 @@ def _draftDraft3dev4toDev5(doc, loader, baseuri): doc[a] = _draftDraft3dev4toDev5(doc[a], loader, baseuri) if isinstance(doc, list): - return [_draftDraft3dev4toDev5(a, loader, baseuri) for a in doc] + for i, a in enumerate(doc): + doc[i] = _draftDraft3dev4toDev5(a, loader, baseuri) return doc except Exception as e: @@ -351,7 +364,8 @@ def _draft3toDraft4dev1(doc, loader, baseuri): for key, value in doc.items(): doc[key] = _draft3toDraft4dev1(value, loader, baseuri) elif isinstance(doc, list): - doc = [_draft3toDraft4dev1(item, loader, baseuri) for item in doc] + for i, a in enumerate(doc): + doc[i] = _draft3toDraft4dev1(a, loader, baseuri) return doc @@ -370,7 +384,8 @@ def _draft4Dev1toDev2(doc, loader, baseuri): for key, value in doc.items(): doc[key] = _draft4Dev1toDev2(value, loader, baseuri) elif isinstance(doc, list): - doc = [_draft4Dev1toDev2(item, loader, baseuri) for item in doc] + for i, a in enumerate(doc): + doc[i] = _draft4Dev1toDev2(a, loader, baseuri) return doc @@ -403,7 +418,8 @@ def _draft4Dev2toDev3(doc, loader, baseuri): for key, value in doc.items(): doc[key] = _draft4Dev2toDev3(value, loader, baseuri) elif isinstance(doc, list): - doc = [_draft4Dev2toDev3(item, loader, baseuri) for item in doc] + for i, a in enumerate(doc): + doc[i] = _draft4Dev2toDev3(a, loader, baseuri) return doc @@ -421,7 +437,8 @@ def _draft4Dev3to1_0dev4(doc, loader, baseuri): for key, value in doc.items(): doc[key] = _draft4Dev3to1_0dev4(value, loader, baseuri) elif isinstance(doc, list): - doc = [_draft4Dev3to1_0dev4(item, loader, baseuri) for item in doc] + for i, a in enumerate(doc): + doc[i] = _draft4Dev3to1_0dev4(a, loader, baseuri) return doc def draft4Dev3to1_0dev4(doc, loader, baseuri): @@ -471,17 +488,24 @@ def identity(doc, loader, baseuri): # pylint: disable=unused-argument return (doc, doc["cwlVersion"]) def checkversion(doc, metadata, enable_dev): - # type: (Union[List[Dict[Text, Any]], Dict[Text, Any]], Dict[Text, Any], bool) -> Tuple[Dict[Text, Any], Text] # pylint: disable=line-too-long + # type: (Union[CommentedSeq, CommentedMap], CommentedMap, bool) -> Tuple[Dict[Text, Any], Text] # pylint: disable=line-too-long """Checks the validity of the version of the give CWL document. Returns the document and the validated version string. """ - if isinstance(doc, list): - metadata = metadata.copy() + + cdoc = None # type: CommentedMap + if isinstance(doc, CommentedSeq): + lc = metadata.lc + metadata = copy.copy(metadata) + metadata.lc.data = copy.copy(lc.data) + metadata.lc.filename = lc.filename metadata[u"$graph"] = doc cdoc = metadata - else: + elif isinstance(doc, CommentedMap): cdoc = doc + else: + raise Exception("Expected CommentedMap or CommentedSeq") version = cdoc[u"cwlVersion"] @@ -503,7 +527,7 @@ def checkversion(doc, metadata, enable_dev): return (cdoc, version) def update(doc, loader, baseuri, enable_dev, metadata): - # type: (Union[List[Dict[Text, Any]], Dict[Text, Any]], Loader, Text, bool, Any) -> Dict[Text, Any] + # type: (Union[CommentedSeq, CommentedMap], Loader, Text, bool, Any) -> dict (cdoc, version) = checkversion(doc, metadata, enable_dev) diff --git a/cwltool/workflow.py b/cwltool/workflow.py index f2a07d0..9f93b9d 100644 --- a/cwltool/workflow.py +++ b/cwltool/workflow.py @@ -1,23 +1,27 @@ -from . import job -from . import draft2tool -from .utils import aslist -from .process import Process, get_feature, empty_subtree, shortname, uniquename -from .errors import WorkflowException import copy import logging import random import os from collections import namedtuple import functools -import schema_salad.validate as validate import urlparse import tempfile import shutil import json -import schema_salad + +from typing import Any, Callable, cast, Generator, Iterable, List, Text, Union + +import schema_salad.validate as validate +from schema_salad.sourceline import SourceLine + +from . import job +from . import draft2tool +from .utils import aslist +from .process import Process, get_feature, empty_subtree, shortname, uniquename +from .errors import WorkflowException from . import expression from .load_tool import load_tool -from typing import Any, Callable, cast, Generator, Iterable, List, Text, Union + _logger = logging.getLogger("cwltool") @@ -344,14 +348,15 @@ class WorkflowJob(object): if "outdir" in kwargs: del kwargs["outdir"] - for i in self.tool["inputs"]: - iid = shortname(i["id"]) - if iid in joborder: - self.state[i["id"]] = WorkflowStateItem(i, copy.deepcopy(joborder[iid])) - elif "default" in i: - self.state[i["id"]] = WorkflowStateItem(i, copy.deepcopy(i["default"])) - else: - raise WorkflowException(u"Input '%s' not in input object and does not have a default value." % (i["id"])) + for e, i in enumerate(self.tool["inputs"]): + with SourceLine(self.tool["inputs"], e, WorkflowException): + iid = shortname(i["id"]) + if iid in joborder: + self.state[i["id"]] = WorkflowStateItem(i, copy.deepcopy(joborder[iid])) + elif "default" in i: + self.state[i["id"]] = WorkflowStateItem(i, copy.deepcopy(i["default"])) + else: + raise WorkflowException(u"Input '%s' not in input object and does not have a default value." % (i["id"])) for s in self.steps: for out in s.tool["outputs"]: @@ -460,7 +465,7 @@ class WorkflowStep(Process): param = {} # type: Dict[Text, Any] inputid = step_entry else: - param = step_entry.copy() + param = copy.copy(step_entry) inputid = step_entry["id"] shortinputid = shortname(inputid) diff --git a/setup.cfg b/setup.cfg index 8930401..da4d62a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,7 +5,7 @@ exclude = cwltool/schemas [easy_install] [egg_info] -tag_build = .20161207161158 +tag_build = .20161216212910 tag_date = 0 tag_svn_revision = 0 diff --git a/setup.py b/setup.py index 405f0fc..9d5c14c 100755 --- a/setup.py +++ b/setup.py @@ -43,12 +43,12 @@ setup(name='cwltool', 'cwlNodeEngine.js']}, install_requires=[ 'setuptools', - 'requests>=1.0', - 'ruamel.yaml == 0.12.4', + 'requests >= 1.0', + 'ruamel.yaml >= 0.12.4, < 0.12.5', 'rdflib >= 4.2.0, < 4.3.0', - 'shellescape', - 'schema-salad >= 1.21.20161206204028, < 2', - 'typing >= 3.5.2', + 'shellescape >= 3.4.1, < 3.5', + 'schema-salad >= 2.1.20161216210732, < 3', + 'typing >= 3.5.2, < 3.6', 'cwltest >= 1.0.20160907111242'], test_suite='tests', tests_require=[], -- Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/cwltool.git _______________________________________________ debian-med-commit mailing list [email protected] http://lists.alioth.debian.org/cgi-bin/mailman/listinfo/debian-med-commit
