ashb commented on code in PR #45570:
URL: https://github.com/apache/airflow/pull/45570#discussion_r1911507951
##########
airflow/dag_processing/processor.py:
##########
@@ -203,39 +203,30 @@ def start( # type: ignore[override]
target: Callable[[], None] = _parse_file_entrypoint,
**kwargs,
) -> Self:
- return super().start(path, callbacks, target=target, client=None,
**kwargs) # type:ignore[arg-type]
+ proc = super().start(target=target, **kwargs)
+ proc._on_child_started(callbacks, path)
+ return proc
- def _on_child_started( # type: ignore[override]
- self, callbacks: list[CallbackRequest], path: str | os.PathLike[str],
child_comms_fd: int
- ) -> None:
+ def _on_child_started(self, callbacks: list[CallbackRequest], path: str |
os.PathLike[str]) -> None:
msg = DagFileParseRequest(
file=os.fspath(path),
- requests_fd=child_comms_fd,
+ requests_fd=self._requests_fd,
callback_requests=callbacks,
)
self.stdin.write(msg.model_dump_json().encode() + b"\n")
- def handle_requests(self, log: FilteringBoundLogger) -> Generator[None,
bytes, None]:
- # TODO: Make decoder an instance variable, then this can live in the
base class
- decoder = TypeAdapter[ToParent](ToParent)
-
- while True:
- line = yield
-
- try:
- msg = decoder.validate_json(line)
- except Exception:
- log.exception("Unable to decode message", line=line)
- continue
-
- self._handle_request(msg, log) # type: ignore[arg-type]
-
def _handle_request(self, msg: ToParent, log: FilteringBoundLogger) ->
None: # type: ignore[override]
+ # TODO: GetVariable etc -- parsing a dag can run top level code that
asks for an Airflow Variable
+ resp = None
if isinstance(msg, DagFileParsingResult):
self.parsing_result = msg
return
- # GetVariable etc -- parsing a dag can run top level code that asks
for an Airflow Variable
- super()._handle_request(msg, log)
Review Comment:
This would have never worked, it would have tried to use `self.client` which
is None and would have thrown an exception
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]