kaxil commented on code in PR #62790: URL: https://github.com/apache/airflow/pull/62790#discussion_r2956603914
########## providers/ibm/mq/provider.yaml: ########## @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +package-name: apache-airflow-providers-ibm-mq +name: IBM MQ + +state: ready +lifecycle: incubation +source-date-epoch: 1758787200 +description: | + `IBM MQ <https://www.ibm.com/products/mq/>`__ +# Note that those versions are maintained by release manager - do not update them manually +# with the exception of case where other provider in sources has >= new provider version. +# In such case adding >= NEW_VERSION and bumping to NEW_VERSION in a provider have +# to be done in the same PR +versions: + - 0.1.0 + +integrations: + - integration-name: IBM MQ + external-doc-url: https://www.ibm.com/products/mq + logo: /docs/integration-logos/ibm-mq.png + tags: [apache] + Review Comment: `tags: [apache]` looks like a leftover from copying the Kafka provider template. IBM MQ is a commercial product, so this should be `tags: [software]`. ########## providers/ibm/mq/tests/unit/ibm/mq/hooks/test_mq.py: ########## @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from airflow.models import Connection +from airflow.providers.ibm.mq.hooks.mq import IBMMQHook + +MQ_PAYLOAD = """RFH x"MQSTR <mcd><Msd>jms_map</Msd></mcd> <jms><Dst>topic://localhost/topic</Dst><Tms>1772121947476</Tms><Dlv>2</Dlv><Uci dt='bin.hex'>414D5143514D4941303054202020202069774D7092F81057</Uci></jms>L<usr><XMSC_CLIENT_ID>local</XMSC_CLIENT_ID><release>26.01.00</release></usr> 4<mqps><Top>topic</Top></mqps> {}""" + + +async def fake_get(*args, **kwargs): + import ibmmq + + raise ibmmq.MQMIError("connection broken", reason=ibmmq.CMQC.MQRC_CONNECTION_BROKEN) + + [email protected] +class TestIBMMQHook: + """Tests for the IBM MQ hook.""" + + @pytest.fixture(autouse=True) + def setup_connections(self, create_connection_without_db): + # Add a valid MQ connection + create_connection_without_db( + Connection( + conn_id="mq_conn", + conn_type="mq", + host="mq.example.com", + login="user", + password="pass", + port=1414, + extra='{"queue_manager": "QM1", "channel": "DEV.APP.SVRCONN"}', + ) + ) + self.hook = IBMMQHook("mq_conn") + + @patch("airflow.providers.ibm.mq.hooks.mq.get_async_connection", new_callable=AsyncMock) + @patch("ibmmq.connect") + @patch("ibmmq.Queue") + @patch("airflow.providers.ibm.mq.hooks.mq.sync_to_async") + async def test_consume_message( + self, mock_sync_to_async, mock_queue_class, mock_connect, mock_get_async_conn + ): + """Test consuming a single message.""" Review Comment: These tests patch `airflow.providers.ibm.mq.hooks.mq.get_async_connection` but the hook never imports or uses that function. The hook calls `BaseHook.get_connection` synchronously inside `get_conn()`. These `@patch` decorators are no-ops, so the tests aren't exercising the real connection/consume path. The same issue applies to `test_produce_message` (line 96) and `test_consume_connection_broken` (line 127). ########## providers/ibm/mq/docs/connections/mq.rst: ########## @@ -0,0 +1,27 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/connection:mq: + +MQ connection +============= + +The MQ connection type enables connection to an IBM MQ. + +.. |Kafka Connection| image:: mq_connection.png + :width: 400 Review Comment: Copy-paste from the Kafka provider: `|Kafka Connection|` should be `|MQ Connection|`. ########## providers/ibm/mq/src/airflow/providers/ibm/mq/hooks/mq.py: ########## @@ -0,0 +1,322 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json +import logging +import threading +from contextlib import contextmanager, suppress +from typing import Any + +from asgiref.sync import sync_to_async + +from airflow.sdk.bases.hook import BaseHook + + +class IBMMQHook(BaseHook): + conn_name_attr = "conn_id" + default_conn_name = "mq_default" + conn_type = "mq" + hook_name = "IBM MQ" + default_open_options = "MQOO_INPUT_SHARED" + + def __init__( + self, + conn_id: str = default_conn_name, + queue_manager: str | None = None, + channel: str | None = None, + open_options: int | None = None, + ): + super().__init__() + self.conn_id = conn_id + self.queue_manager = queue_manager + self.channel = channel + self.open_options = open_options + self._conn = None + + @classmethod + def get_ui_field_behaviour(cls) -> dict[str, Any]: + """Return custom UI field behaviour for IBM MQ Connection.""" + return { + "hidden_fields": ["schema"], + "placeholders": { + "host": "mq.example.com", + "port": "1414", + "login": "app_user", + "extra": json.dumps( + { + "queue_manager": "QM1", + "channel": "DEV.APP.SVRCONN", + "open_options": cls.default_open_options, + }, + indent=2, + ), + }, + } + + @classmethod + def get_open_options_flags(cls, open_options: int) -> list[str]: + """ + Return the symbolic MQ open option flags set in a given bitmask. + + Each flag corresponds to a constant in `ibmmq.CMQC` that starts with 'MQOO_'. + + :param open_options: The integer bitmask used when opening an MQ queue + (e.g., `MQOO_INPUT_EXCLUSIVE | MQOO_FAIL_IF_QUIESCING`). + + :return: A list of the names of the MQ open flags that are set in the bitmask. + For example, ['MQOO_INPUT_EXCLUSIVE', 'MQOO_FAIL_IF_QUIESCING']. + + Example: + >>> open_options = ibmmq.CMQC.MQOO_INPUT_SHARED | ibmmq.CMQC.MQOO_FAIL_IF_QUIESCING + >>> cls.get_open_options_flags(open_options) + ['MQOO_INPUT_SHARED', 'MQOO_FAIL_IF_QUIESCING'] + """ + import ibmmq + + return [ + name + for name, value in vars(ibmmq.CMQC).items() + if name.startswith("MQOO_") and (open_options & value) + ] + + @contextmanager + def get_conn(self): + """ + Sync context manager for IBM MQ connection lifecycle. + + Must be called from the executor thread (not the event loop thread). + Retrieves the Airflow connection, extracts MQ parameters, and manages + the IBM MQ connection lifecycle. + + :yield: IBM MQ connection object + """ + import ibmmq + + connection = BaseHook.get_connection(self.conn_id) + config = connection.extra_dejson + queue_manager = self.queue_manager or config.get("queue_manager") + channel = self.channel or config.get("channel") + + if not queue_manager: + raise ValueError("queue_manager must be set in Connection extra config or hook init") + if not channel: + raise ValueError("channel must be set in Connection extra config or hook init") + + self.open_options = self.open_options or getattr( + ibmmq.CMQC, + config.get("open_options", self.default_open_options), + ibmmq.CMQC.MQOO_INPUT_EXCLUSIVE, + ) + + csp = ibmmq.CSP() + csp.CSPUserId = connection.login + csp.CSPPassword = connection.password + + conn = None + try: + conn = ibmmq.connect( + queue_manager, + channel, + f"{connection.host}({connection.port})", + csp=csp, + ) + yield conn + finally: + if conn: + with suppress(Exception): + conn.disconnect() + + def _process_message(self, message: bytes) -> str: + """ + Process a raw MQ message. + + If the message contains an RFH2 header, the header is unpacked and the + payload following the header is returned. If unpacking fails, the raw + message is returned decoded as UTF-8. + + :param message: Raw message received from IBM MQ. + :return: Decoded message payload. + """ + import ibmmq + + try: + rfh2 = ibmmq.RFH2() + rfh2.unpack(message) + + payload_offset = rfh2.get_length() + payload = message[payload_offset:] + + decoded = payload.decode("utf-8", errors="ignore") + self.log.info("Message received from MQ (RFH2 decoded): %s", decoded) + return decoded + except ibmmq.PYIFError as error: # RFH2 header not present or unpack failed + self.log.warning( + "Failed to unpack RFH2 header (%s). Returning raw message payload: %s", error, message + ) + return message.decode("utf-8", errors="ignore") + + async def consume(self, queue_name: str, poll_interval: float = 5) -> str | None: + """ + Wait for a single message from the specified IBM MQ queue and return its decoded payload. + + All blocking IBM MQ operations (connect, open queue, get, close, disconnect) run in a + separate thread via sync_to_async to satisfy the IBM MQ C client's thread-affinity + requirement — every operation on a connection must happen from the thread that created it. + + A :class:`threading.Event` stop signal is passed to the worker so that, when this + coroutine is canceled (e.g. because the Airflow triggerer reassigns the watcher to + another pod), the background thread exits cleanly after the current ``q.get`` call + times out (at most ``poll_interval`` seconds). Without this, an orphaned thread could + silently consume a message after cancellation, causing the event to be lost and the + DAG never to run. + + :param queue_name: Name of the IBM MQ queue to consume messages from. + :param poll_interval: Interval in seconds used to wait for messages and to control + how long the underlying MQ get operation blocks before checking again. + :return: The decoded message payload if a message is received, otherwise ``None``. + """ + stop_event = threading.Event() + try: + return await sync_to_async(self._consume_sync)(queue_name, poll_interval, stop_event) + finally: + # Signal background thread to exit cleanly when coroutine is canceled + stop_event.set() + + def _consume_sync( + self, + queue_name: str, + poll_interval: float, + stop_event: threading.Event, + ) -> str | None: + """ + Blocking implementation of :meth:`consume` — must be called from a single thread. + + All IBM MQ handles (queue manager connection, queue) are created **and used** within + this method, satisfying the thread-affinity requirement of the IBM MQ C client library. + The ``stop_event`` is checked between ``q.get`` calls so the thread terminates promptly + after the coroutine side is canceled. + """ + import ibmmq + + od = ibmmq.OD() + od.ObjectName = queue_name + + md = ibmmq.MD() + md.Format = ibmmq.CMQC.MQFMT_STRING + md.CodedCharSetId = 1208 + md.Encoding = ibmmq.CMQC.MQENC_NATIVE + + gmo = ibmmq.GMO() + gmo.Options = ibmmq.CMQC.MQGMO_WAIT | ibmmq.CMQC.MQGMO_NO_SYNCPOINT | ibmmq.CMQC.MQGMO_CONVERT + gmo.WaitInterval = int(poll_interval * 1000) + + try: + with self.get_conn() as conn: + if self.log.isEnabledFor(logging.INFO) and self.open_options is not None: + flag_names = self.get_open_options_flags(self.open_options) + self.log.info( + "Opening MQ queue '%s' with open_options=%s (%s)", + queue_name, + self.open_options, + ", ".join(flag_names), + ) + + q = ibmmq.Queue(conn, od, self.open_options) + try: + # WaitInterval already blocks for poll_interval seconds when no message is + # available, so no additional sleep is needed between iterations. + while not stop_event.is_set(): + try: + message = q.get(None, md, gmo) + if message: + return self._process_message(message) + except ibmmq.MQMIError as e: + if e.reason == ibmmq.CMQC.MQRC_NO_MSG_AVAILABLE: + self.log.debug("No message available...") + continue + if e.reason == ibmmq.CMQC.MQRC_CONNECTION_BROKEN: + self.log.warning( + "MQ connection broken on queue '%s', will exit consume; next trigger instance will reconnect", + queue_name, + ) + return None + self.log.error( + "IBM MQ error on queue '%s': completion_code=%s reason_code=%s", + queue_name, + e.comp, + e.reason, + ) + raise + finally: + with suppress(Exception): + q.close() + except Exception: Review Comment: This outer `except Exception` catches *everything* — including programming bugs like `TypeError` or `AttributeError` — logs them, and returns `None`. Combined with the trigger not retrying (see comment on `triggers/mq.py`), a single bug in message processing silently kills the entire watcher with no retry. Consider re-raising non-MQ exceptions, or at least narrowing the catch to MQ-specific error types. ########## providers/ibm/mq/src/airflow/providers/ibm/mq/triggers/mq.py: ########## @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import asyncio +from typing import Any + +from airflow.providers.ibm.mq.hooks.mq import IBMMQHook +from airflow.providers.ibm.mq.version_compat import AIRFLOW_V_3_0_PLUS +from airflow.triggers.base import TriggerEvent + +if AIRFLOW_V_3_0_PLUS: + from airflow.triggers.base import BaseEventTrigger +else: + from airflow.triggers.base import BaseTrigger as BaseEventTrigger # type: ignore + + +class AwaitMessageTrigger(BaseEventTrigger): + def __init__( + self, + mq_conn_id: str, + queue_name: str, + poll_interval: float = 5, + ) -> None: + super().__init__() + self.mq_conn_id = mq_conn_id + self.queue_name = queue_name + self.poll_interval = poll_interval + + def serialize(self) -> tuple[str, dict[str, Any]]: + return ( + f"{self.__class__.__module__}.{self.__class__.__name__}", + { + "mq_conn_id": self.mq_conn_id, + "queue_name": self.queue_name, + "poll_interval": self.poll_interval, + }, + ) + + async def run(self): + try: + event = await IBMMQHook(self.mq_conn_id).consume( + queue_name=self.queue_name, Review Comment: If `consume()` returns `None` (connection broken, timeout, or any exception swallowed by the hook), the trigger exits silently without yielding a `TriggerEvent`. For an AssetWatcher that should stay alive indefinitely, this kills the watcher and the DAG never triggers. Compare with Kafka's `AwaitMessageTrigger` which uses `while True` to keep polling. This needs a retry loop, probably with a backoff delay on transient failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
