Copilot commented on code in PR #12939:
URL: https://github.com/apache/trafficserver/pull/12939#discussion_r2886851312


##########
tools/hrw4u/src/visitor.py:
##########
@@ -211,17 +233,445 @@ def _resolve_identifier_with_validation(self, name: str) 
-> tuple[str, bool]:
 
         return symbol, default_expr
 
+    def _get_value_text(self, val_ctx) -> str:
+        if val_ctx.paramRef():
+            name = val_ctx.paramRef().IDENT().getText()
+            if name not in self._proc_bindings:
+                raise Hrw4uSyntaxError(
+                    self.filename, val_ctx.start.line, val_ctx.start.column, 
f"'${name}' used outside procedure context", "")
+            return self._proc_bindings[name]
+        return val_ctx.getText()
+
+    def _collect_proc_params(self, param_list_ctx) -> list[ProcParam]:
+        return [ProcParam(name=p.IDENT().getText(), default_ctx=p.value() if 
p.value() else None) for p in param_list_ctx.param()]
+
+    def _load_proc_file(self, path: Path, load_stack: list[str], use_spec: str 
| None = None) -> None:
+        """Parse a procedure file and register its declarations."""
+        abs_path = str(path.resolve())
+        if abs_path in self._proc_loaded:
+            return
+        if abs_path in load_stack:
+            cycle = ' -> '.join([*load_stack, abs_path])
+            raise Hrw4uSyntaxError(str(path), 1, 0, f"circular use dependency: 
{cycle}", "")
+
+        # Derive expected namespace prefix from the use spec.
+        # 'Apple::Common' → 'Apple::', 'Apple::Simple::All' → 'Apple::Simple::'
+        expected_ns = None
+        if use_spec and '::' in use_spec:
+            expected_ns = use_spec[:use_spec.rindex('::') + 2]
+
+        text = path.read_text(encoding='utf-8')
+        listener = ThrowingErrorListener(filename=str(path))
+
+        lexer = hrw4uLexer(InputStream(text))
+        lexer.removeErrorListeners()
+        lexer.addErrorListener(listener)
+
+        stream = CommonTokenStream(lexer)
+        parser = hrw4uParser(stream)
+        parser.removeErrorListeners()
+        parser.addErrorListener(listener)
+        parser.errorHandler = BailErrorStrategy()
+        tree = parser.program()
+
+        new_stack = [*load_stack, abs_path]
+        found_proc = False
+
+        for item in tree.programItem():
+            if item.useDirective():
+                spec = item.useDirective().QUALIFIED_IDENT().getText()
+                sub_path = resolve_use_path(spec, self._proc_search_paths)
+                if sub_path is None:
+                    raise Hrw4uSyntaxError(
+                        str(path),
+                        item.useDirective().start.line, 0, f"use '{spec}': 
file not found in procedures path", "")
+                self._load_proc_file(sub_path, new_stack, use_spec=spec)
+                found_proc = True
+            elif item.procedureDecl():
+                ctx = item.procedureDecl()
+                name = ctx.QUALIFIED_IDENT().getText()
+                if '::' not in name:
+                    raise Hrw4uSyntaxError(
+                        str(path), ctx.start.line, ctx.start.column, 
f"procedure name '{name}' must be qualified (e.g. 'ns::name')",
+                        "")
+                if expected_ns and not name.startswith(expected_ns):
+                    raise Hrw4uSyntaxError(
+                        str(path), ctx.start.line, ctx.start.column,
+                        f"procedure '{name}' does not match namespace 
'{expected_ns[:-2]}' "
+                        f"(expected from 'use {use_spec}')", "")
+                if name in self._proc_registry:
+                    existing = self._proc_registry[name]
+                    raise Hrw4uSyntaxError(
+                        str(path), ctx.start.line, 0, f"procedure '{name}' 
already declared in {existing.source_file}", "")
+                params = self._collect_proc_params(ctx.paramList()) if 
ctx.paramList() else []
+                self._proc_registry[name] = ProcSig(name, params, ctx.block(), 
str(path), text)
+                found_proc = True
+
+        if not found_proc:
+            raise Hrw4uSyntaxError(str(path), 1, 0, f"no 'procedure' 
declarations found in {path.name}", "")
+
+        self._proc_loaded.add(abs_path)
+
+    def _visit_block_items(self, block_ctx) -> None:
+        """Visit a block's items at the current indent level (no extra indent 
added)."""
+        for item in block_ctx.blockItem():
+            if item.statement():
+                self.visit(item.statement())
+            elif item.conditional():
+                self.emit_statement("if")
+                saved = self.stmt_indent, self.cond_indent
+                self.stmt_indent += 1
+                self.cond_indent = self.stmt_indent
+                self.visit(item.conditional())
+                self.stmt_indent, self.cond_indent = saved
+                self.emit_statement("endif")
+            elif item.commentLine() and self.preserve_comments:
+                self.visit(item.commentLine())
+
+    def _bind_proc_args(self, sig: ProcSig, call_ctx) -> dict[str, str]:
+        """Resolve call arguments against a procedure signature, returning 
bindings."""
+        call_args: list[str] = []
+        if call_ctx.argumentList():
+            for val_ctx in call_ctx.argumentList().value():
+                text = self._get_value_text(val_ctx)
+                if text.startswith('"') and text.endswith('"'):
+                    text = self._substitute_strings(text, call_ctx)[1:-1]
+                call_args.append(text)
+
+        required = sum(1 for p in sig.params if p.default_ctx is None)
+        if not (required <= len(call_args) <= len(sig.params)):
+            expected = f"{required}-{len(sig.params)}" if required < 
len(sig.params) else str(len(sig.params))
+            raise Hrw4uSyntaxError(
+                self.filename, call_ctx.start.line, call_ctx.start.column,
+                f"procedure '{sig.qualified_name}': expected {expected} 
arg(s), got {len(call_args)}", "")
+
+        bindings: dict[str, str] = {}
+        for i, param in enumerate(sig.params):
+            if i < len(call_args):
+                bindings[param.name] = call_args[i]
+            else:
+                default = self._get_value_text(param.default_ctx)
+                if default.startswith('"') and default.endswith('"'):
+                    default = default[1:-1]
+                bindings[param.name] = default

Review Comment:
   `_bind_proc_args()` can crash when a required parameter appears after an 
optional one (e.g. `procedure ns::p($a="x", $b)`). In that case `required` 
allows too few call args, and the `else` branch calls `_get_value_text(None)` 
for the missing required param, raising an unhelpful AttributeError. Consider 
validating in `visitProcedureDecl()` that all non-default params precede 
default params, and/or explicitly error when `param.default_ctx is None` and no 
arg was provided.



##########
tools/hrw4u/src/visitor.py:
##########
@@ -211,17 +233,445 @@ def _resolve_identifier_with_validation(self, name: str) 
-> tuple[str, bool]:
 
         return symbol, default_expr
 
+    def _get_value_text(self, val_ctx) -> str:
+        if val_ctx.paramRef():
+            name = val_ctx.paramRef().IDENT().getText()
+            if name not in self._proc_bindings:
+                raise Hrw4uSyntaxError(
+                    self.filename, val_ctx.start.line, val_ctx.start.column, 
f"'${name}' used outside procedure context", "")
+            return self._proc_bindings[name]

Review Comment:
   Several `Hrw4uSyntaxError` instances created here pass `source_line=""`, 
which means the rendered error output loses the actual line-of-source context 
even though `val_ctx` has access to the input stream. Consider extracting the 
source line from `val_ctx.start.getInputStream()` (or raising a simpler 
exception and letting `trap()/hrw4u_error()` wrap it) so errors include the 
offending line and caret position.



##########
tools/hrw4u/tests/test_units.py:
##########
@@ -284,5 +251,74 @@ def test_regex_validator_factory(self):
             validator("TEST123")
 
 
+class TestHumanizeErrorMessage:
+
+    def test_replaces_qualified_ident(self):
+        msg = "mismatched input 'Apple' expecting QUALIFIED_IDENT"
+        result = humanize_error_message(msg)
+        assert "QUALIFIED_IDENT" not in result
+        assert "qualified name" in result
+
+    def test_replaces_punctuation_tokens(self):
+        msg = "mismatched input '{' expecting LPAREN"
+        result = humanize_error_message(msg)
+        assert "LPAREN" not in result
+        assert "'('" in result
+
+    def test_replaces_multiple_tokens(self):
+        msg = "expecting {IDENT, QUALIFIED_IDENT, SEMICOLON}"
+        result = humanize_error_message(msg)
+        assert "IDENT" not in result
+        assert "QUALIFIED_IDENT" not in result
+        assert "SEMICOLON" not in result
+
+    def test_preserves_normal_text(self):
+        msg = "unknown procedure 'test::foo': not loaded via 'use'"
+        result = humanize_error_message(msg)
+        assert result == msg
+
+    def test_no_partial_word_replacement(self):
+        msg = "some IDENTIFIER_LIKE text"
+        result = humanize_error_message(msg)
+        assert result == msg
+
+
+class TestParseErrorMessages:
+    """Verify that ANTLR parse errors use human-friendly token names."""
+
+    def _collect_errors(self, text: str) -> list[str]:
+        collector = ErrorCollector(max_errors=10)
+        try:
+            create_parse_tree(text, "<test>", hrw4uLexer, hrw4uParser, "test", 
collect_errors=True, max_errors=10)
+        except Exception:
+            pass
+        return [str(e) for e in collector.errors]

Review Comment:
   `TestParseErrorMessages._collect_errors()` instantiates an `ErrorCollector` 
but never wires it into parsing; `create_parse_tree()` creates and returns its 
own collector. As written this helper always returns an empty list and can 
mislead future tests. Either remove `_collect_errors()` or use the `collector` 
returned by `create_parse_tree()` (or extend `create_parse_tree()` to accept an 
injected collector).



##########
tools/hrw4u/src/visitor.py:
##########
@@ -211,17 +233,445 @@ def _resolve_identifier_with_validation(self, name: str) 
-> tuple[str, bool]:
 
         return symbol, default_expr
 
+    def _get_value_text(self, val_ctx) -> str:
+        if val_ctx.paramRef():
+            name = val_ctx.paramRef().IDENT().getText()
+            if name not in self._proc_bindings:
+                raise Hrw4uSyntaxError(
+                    self.filename, val_ctx.start.line, val_ctx.start.column, 
f"'${name}' used outside procedure context", "")
+            return self._proc_bindings[name]
+        return val_ctx.getText()
+
+    def _collect_proc_params(self, param_list_ctx) -> list[ProcParam]:
+        return [ProcParam(name=p.IDENT().getText(), default_ctx=p.value() if 
p.value() else None) for p in param_list_ctx.param()]
+
+    def _load_proc_file(self, path: Path, load_stack: list[str], use_spec: str 
| None = None) -> None:
+        """Parse a procedure file and register its declarations."""
+        abs_path = str(path.resolve())
+        if abs_path in self._proc_loaded:
+            return
+        if abs_path in load_stack:
+            cycle = ' -> '.join([*load_stack, abs_path])
+            raise Hrw4uSyntaxError(str(path), 1, 0, f"circular use dependency: 
{cycle}", "")
+
+        # Derive expected namespace prefix from the use spec.
+        # 'Apple::Common' → 'Apple::', 'Apple::Simple::All' → 'Apple::Simple::'
+        expected_ns = None
+        if use_spec and '::' in use_spec:
+            expected_ns = use_spec[:use_spec.rindex('::') + 2]
+
+        text = path.read_text(encoding='utf-8')
+        listener = ThrowingErrorListener(filename=str(path))
+
+        lexer = hrw4uLexer(InputStream(text))
+        lexer.removeErrorListeners()
+        lexer.addErrorListener(listener)
+
+        stream = CommonTokenStream(lexer)
+        parser = hrw4uParser(stream)
+        parser.removeErrorListeners()
+        parser.addErrorListener(listener)
+        parser.errorHandler = BailErrorStrategy()
+        tree = parser.program()
+
+        new_stack = [*load_stack, abs_path]
+        found_proc = False
+
+        for item in tree.programItem():
+            if item.useDirective():
+                spec = item.useDirective().QUALIFIED_IDENT().getText()
+                sub_path = resolve_use_path(spec, self._proc_search_paths)
+                if sub_path is None:
+                    raise Hrw4uSyntaxError(
+                        str(path),
+                        item.useDirective().start.line, 0, f"use '{spec}': 
file not found in procedures path", "")
+                self._load_proc_file(sub_path, new_stack, use_spec=spec)
+                found_proc = True
+            elif item.procedureDecl():
+                ctx = item.procedureDecl()
+                name = ctx.QUALIFIED_IDENT().getText()
+                if '::' not in name:
+                    raise Hrw4uSyntaxError(
+                        str(path), ctx.start.line, ctx.start.column, 
f"procedure name '{name}' must be qualified (e.g. 'ns::name')",
+                        "")
+                if expected_ns and not name.startswith(expected_ns):
+                    raise Hrw4uSyntaxError(
+                        str(path), ctx.start.line, ctx.start.column,
+                        f"procedure '{name}' does not match namespace 
'{expected_ns[:-2]}' "
+                        f"(expected from 'use {use_spec}')", "")
+                if name in self._proc_registry:
+                    existing = self._proc_registry[name]
+                    raise Hrw4uSyntaxError(
+                        str(path), ctx.start.line, 0, f"procedure '{name}' 
already declared in {existing.source_file}", "")
+                params = self._collect_proc_params(ctx.paramList()) if 
ctx.paramList() else []
+                self._proc_registry[name] = ProcSig(name, params, ctx.block(), 
str(path), text)
+                found_proc = True
+
+        if not found_proc:
+            raise Hrw4uSyntaxError(str(path), 1, 0, f"no 'procedure' 
declarations found in {path.name}", "")
+
+        self._proc_loaded.add(abs_path)
+
+    def _visit_block_items(self, block_ctx) -> None:
+        """Visit a block's items at the current indent level (no extra indent 
added)."""
+        for item in block_ctx.blockItem():
+            if item.statement():
+                self.visit(item.statement())
+            elif item.conditional():
+                self.emit_statement("if")
+                saved = self.stmt_indent, self.cond_indent
+                self.stmt_indent += 1
+                self.cond_indent = self.stmt_indent
+                self.visit(item.conditional())
+                self.stmt_indent, self.cond_indent = saved
+                self.emit_statement("endif")
+            elif item.commentLine() and self.preserve_comments:
+                self.visit(item.commentLine())
+
+    def _bind_proc_args(self, sig: ProcSig, call_ctx) -> dict[str, str]:
+        """Resolve call arguments against a procedure signature, returning 
bindings."""
+        call_args: list[str] = []
+        if call_ctx.argumentList():
+            for val_ctx in call_ctx.argumentList().value():
+                text = self._get_value_text(val_ctx)
+                if text.startswith('"') and text.endswith('"'):
+                    text = self._substitute_strings(text, call_ctx)[1:-1]
+                call_args.append(text)
+
+        required = sum(1 for p in sig.params if p.default_ctx is None)
+        if not (required <= len(call_args) <= len(sig.params)):
+            expected = f"{required}-{len(sig.params)}" if required < 
len(sig.params) else str(len(sig.params))
+            raise Hrw4uSyntaxError(
+                self.filename, call_ctx.start.line, call_ctx.start.column,
+                f"procedure '{sig.qualified_name}': expected {expected} 
arg(s), got {len(call_args)}", "")
+
+        bindings: dict[str, str] = {}
+        for i, param in enumerate(sig.params):
+            if i < len(call_args):
+                bindings[param.name] = call_args[i]
+            else:
+                default = self._get_value_text(param.default_ctx)
+                if default.startswith('"') and default.endswith('"'):
+                    default = default[1:-1]
+                bindings[param.name] = default
+
+        return bindings
+
+    @contextmanager
+    def _proc_context(self, sig: ProcSig, bindings: dict[str, str]):
+        """Context manager that saves/restores procedure expansion state."""
+        saved_bindings = self._proc_bindings
+        saved_stack = self._proc_call_stack
+        saved_filename = self.filename
+
+        self._proc_bindings = bindings
+        self._proc_call_stack = [*saved_stack, sig.qualified_name]
+        self.filename = sig.source_file
+        try:
+            yield
+        finally:
+            self._proc_bindings = saved_bindings
+            self._proc_call_stack = saved_stack
+            self.filename = saved_filename
 
-#
-# Visitor Methods
-#
+    def _expand_proc_as_section_body(self, block_ctx, hook: str, 
in_statement_block: bool) -> bool:
+        """Expand a procedure body using section-body semantics (hook 
re-emission).
+
+        Returns the final in_statement_block state.
+        """
+        items = block_ctx.blockItem()
+        is_first = not in_statement_block
+
+        for idx, item in enumerate(items):
+            is_conditional = item.conditional() is not None
+            is_comment = item.commentLine() is not None
+            proc_info = self._get_proc_call_info(item)
+
+            if is_comment:
+                if self.preserve_comments:
+                    self.visit(item.commentLine())
+            elif proc_info:
+                _, call_ctx = proc_info
+                in_statement_block = self._section_expand_proc_call(call_ctx, 
hook, in_statement_block, is_first and idx == 0)
+            elif is_conditional or not in_statement_block:
+                if not (is_first and idx == 0):
+                    self._flush_condition()
+                    self.output.append("")
+
+                self._emit_section_header(hook, [])
+
+                if is_conditional:
+                    self.visit(item)
+                    in_statement_block = False
+                else:
+                    in_statement_block = True
+                    with self.stmt_indented():
+                        self.visit(item.statement())
+            else:
+                with self.stmt_indented():
+                    self.visit(item.statement())
+
+        return in_statement_block
+
+    def _section_expand_proc_call(self, call_ctx, hook: str, 
in_statement_block: bool, is_first_item: bool) -> bool:
+        """Expand a procedure call within a section body context. Returns 
in_statement_block."""
+        name = call_ctx.funcName.text
+        sig = self._proc_registry[name]
+        bindings = self._bind_proc_args(sig, call_ctx)
+
+        with self._proc_context(sig, bindings):
+            return self._expand_proc_as_section_body(sig.body_ctx, hook, 
in_statement_block)
+
+    def _get_proc_call_info(self, item) -> tuple[ProcSig, Any] | None:
+        """If item (sectionBody or blockItem) is a procedure call, return 
(sig, call_ctx)."""
+        stmt = item.statement()
+        if stmt and stmt.functionCall():
+            func_name = stmt.functionCall().funcName.text
+            sig = self._proc_registry.get(func_name)
+            if sig:
+                return sig, stmt.functionCall()
+        return None
+
+    def _expand_procedure_call(self, call_ctx) -> None:
+        """Expand a procedure call inline at the current indent level."""
+        name = call_ctx.funcName.text
+        sig = self._proc_registry.get(name)
+
+        if sig is None:
+            raise Hrw4uSyntaxError(
+                self.filename, call_ctx.start.line, call_ctx.start.column, 
f"unknown procedure '{name}': not loaded via 'use'", "")
+
+        if name in self._proc_call_stack:
+            cycle = ' -> '.join([*self._proc_call_stack, name])
+            raise Hrw4uSyntaxError(
+                self.filename, call_ctx.start.line, call_ctx.start.column, 
f"circular procedure call: {cycle}", "")
+
+        bindings = self._bind_proc_args(sig, call_ctx)
+
+        with self._proc_context(sig, bindings):
+            self._visit_block_items(sig.body_ctx)
+
+    @staticmethod
+    def _get_source_text(ctx, source_text: str) -> str:
+        """Extract original source text for a parse tree node."""
+        return source_text[ctx.start.start:ctx.stop.stop + 1]
+
+    def _flatten_substitute_params(self, text: str, bindings: dict[str, str]) 
-> str:
+        """Replace $param references in source text with bound values."""
+        if not bindings:
+            return text
+        return self._PARAM_REF_PATTERN.sub(lambda m: bindings.get(m.group(1), 
m.group(0)), text)
+
+    def _flatten_reindent(self, text: str, indent: str, source_indent: str | 
None = None) -> list[str]:
+        """Re-indent text: replace source indentation with target indent, 
preserving relative nesting.
+
+        If source_indent is None, it is auto-detected from the first non-empty 
line.
+        """
+        lines: list[str] = []
+
+        for line in text.splitlines():
+            stripped = line.strip()
+            if not stripped:
+                lines.append("")
+                continue
+
+            if source_indent is None:
+                source_indent = line[:len(line) - len(line.lstrip())]
+
+            if line.startswith(source_indent):
+                lines.append(f"{indent}{line[len(source_indent):]}")
+            else:
+                lines.append(f"{indent}{stripped}")
+
+        return lines
+
+    @staticmethod
+    def _source_indent_at(ctx, source_text: str) -> str:
+        """Detect the source indentation of a parse tree node from its 
position in source text."""
+        start = ctx.start.start
+        line_start = source_text.rfind('\n', 0, start)
+        line_start = 0 if line_start == -1 else line_start + 1
+        prefix = source_text[line_start:start]
+        return prefix if prefix.isspace() or not prefix else ""
+
+    def _has_proc_calls(self, ctx) -> bool:
+        """Check if a block or conditional contains any procedure calls 
(recursively)."""
+        if hasattr(ctx, 'blockItem'):
+            for item in ctx.blockItem():
+                if self._get_proc_call_info(item):
+                    return True
+                if item.conditional() and 
self._has_proc_calls(item.conditional()):
+                    return True
+            return False
+
+        if self._has_proc_calls(ctx.ifStatement().block()):
+            return True
+        for elif_ctx in ctx.elifClause():
+            if self._has_proc_calls(elif_ctx.block()):
+                return True
+        if ctx.elseClause() and self._has_proc_calls(ctx.elseClause().block()):
+            return True
+        return False
+
+    def _flatten_items(self, items, indent: str, source_text: str, bindings: 
dict[str, str] | None = None) -> list[str]:
+        """Flatten a list of sectionBody or blockItem nodes, expanding 
procedure calls."""
+        if bindings is None:
+            bindings = {}
+        lines: list[str] = []
+
+        for item in items:
+            if item.commentLine() is not None:
+                if self.preserve_comments:
+                    comment_text = self._get_source_text(item.commentLine(), 
source_text)
+                    lines.extend(self._flatten_reindent(comment_text, indent))
+                continue
+
+            proc_info = self._get_proc_call_info(item)
+            if proc_info:
+                sig, call_ctx = proc_info
+                nested_bindings = self._bind_proc_args(sig, call_ctx)
+                lines.extend(self._flatten_items(sig.body_ctx.blockItem(), 
indent, sig.source_text, nested_bindings))
+                continue
+
+            if item.conditional() and self._has_proc_calls(item.conditional()):
+                lines.extend(self._flatten_conditional(item.conditional(), 
indent, source_text, bindings))
+                continue
+
+            item_text = self._get_source_text(item, source_text)
+            item_text = self._flatten_substitute_params(item_text, bindings)
+            source_indent = self._source_indent_at(item, source_text)
+            lines.extend(self._flatten_reindent(item_text, indent, 
source_indent))
+
+        return lines
+
+    def _flatten_conditional(self, cond_ctx, indent: str, source_text: str, 
bindings: dict[str, str]) -> list[str]:
+        """Flatten a conditional block, expanding proc calls within its 
branches."""
+        lines: list[str] = []
+        inner_indent = indent + "    "

Review Comment:
   `flatten()` hard-codes indentation width (`"    "` / `" " * 4`). Since the 
rest of hrw4u formatting uses `SystemDefaults.INDENT_SPACES`, this can drift if 
the project-wide indent changes. Consider deriving both `indent` and 
`inner_indent` from `SystemDefaults.INDENT_SPACES` instead of embedding 4-space 
literals.
   ```suggestion
           inner_indent = indent + (" " * SystemDefaults.INDENT_SPACES)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to