Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
ashb closed issue #50185: Trigger runner process locked with multiple Workflow triggers URL: https://github.com/apache/airflow/issues/50185 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967452565 To put it another way, a feature of the approach with delegating execution to another thread is that the code is executed independently of the synchronous code in the current thread. This means that any operation that has been delegated by the asynchronous code and that the synchronous code needs to wait for will be able to complete in parallel. This eliminates the dependency, and thus bypasses the problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967405096 > > the synchronous code will not be able to interrupt its execution to resume execution of asynchronous code. It will only work if they are different threads. > > And we don't get that via `sync_to_async` as all calls to that use a single Thread? `sync_to_async()` delegates execution to a worker thread. This eliminates the requirement to switch from synchronous to asynchronous code (since asynchronous code just waits for a future object), which in turn bypasses the problem - you can even synchronize in synchronous code, as long as synchronous code does not depend on asynchronous code in the same thread. Of course, this is true only when `sync_to_async()` is used for full operations - if it is used for single reads that are part of the whole `get_message()`, the problem remains. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
ashb commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967384988 > the synchronous code will not be able to interrupt its execution to resume execution of asynchronous code. It will only work if they are different threads. And we don't get that via `sync_to_async` as all calls to that use a single Thread? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967340252 > * The code that pulls off of the requests Q and writes to requests/reads from stdin is 100% async code However, I read it again and realized that this is an attempt to delegate work to asynchronous code from synchronous code. No, it will not work that way because, again, the synchronous code will not be able to interrupt its execution to resume execution of asynchronous code. It will only work if they are different threads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967315728 > Actually, how about this [@x42005e1f](https://github.com/x42005e1f): If queues are also used for asynchronous code, the problem remains. Let's imagine that asynchronous code sent a request and then the event loop switched execution to another task that called the sync send code. Then the first response will be for the asynchronous code, and the synchronous code will wait forever for that response to be removed from the queue, which will never happen because execution will never switch back to the asynchronous code. This is now not a deadlock, but an eternal load on the processor. If the queues are only for synchronous code, then yes, that might solve the problem. However, in any case, I suggest removing `self._q_responses`, since such polls are almost always a bad thing. Just send a future (`concurrent.futures.Future`) along with the frame. Call `future.result()` in the sync send code, and let the producer put the response in the future via `future.set_result(response)`. Then the consumer will wake up and get their response from the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
ashb commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967206575 Actually, how about this @x42005e1f: - We remove the lock on CommsDecoder. - We create a new subclass called TriggererCommsDecoder (so we know when we are operating in this hybrid sync/async env) - On that we have two queues - one for requests, the other for responses. - The code that pulls off of the requests Q and writes to requests/reads from stdin is 100% async code - The sync send code (in my Pr I combined `send_msg() + get_response()` into a single function as it's a nicer API anyway) does something like this: ```python req_id = next(self.id_counter) self._q_requests.put(Frame(..., id=req_id) while True: resp = self._q_responses.peek() if resp.id != req_id: # Response to someone else's message. Try again continue self._q_responses.get() # Though this feels racy? Maybe we need a sanity check and to requeue it if the req_id doesn't match? ``` Is that actually any better (clearer/easier to understand/more performant) than just using a n=1 Thread pool? WDYT as well @gopidesupavan? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
ashb commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967156151 Thanks, yeah. I'm only just now getting back around to looking at this issue and the linked PR in detail. I don't have any reader/writer thread in my WIP right now, and wasn't thinking of adding any, so I don't think my changes will in help at all this way. (In my head I was thinking "we can just send all the responses from sync code, make note of the request_id, then have asnyc code wait for a response to that matching request_id. But that won't help squat here) Thanks very much for the detailed and helpful comments @x42005e1f. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967111308 > Given that constraint (that we sort of really want to support this code, sadly) I'm not sure my queue etc approach will help at all. As I said in the [comment on the linked PR](https://github.com/apache/airflow/pull/51279#issuecomment-2934712813), the problem is tied to the async -> sync case in the same thread, which is fundamentally unsolvable. However, it is enough to make synchronous and asynchronous codes independent of each other to bypass the problem. For example, if every read is actually performed in a worker thread. If your approach has something similar to this, or otherwise removes the dependency (created due to synchronization), then it solves this issue as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
ashb commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967053571 Thinking about this a bit more and re-reading the your super helpful comments, I'm not sure this approach will get around the fundamental problem of sync_to_async. We can't easily (today/now) add async versions of get_connection etc, so we have the issue that the calling code ends up doing: ```python connection = await sync_to_async(self.get_connection)(self.aws_conn_id) ``` Given that constraint (that we sort of really want to support this code, sadly) I'm not sure my queue etc approach will help at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2966998036 > I'm adding a request ID (integer, atomic auto inc using `itertools.counter`) and I was wondering about if we could remove the lock and replace it with some other primitive from aiologic to send the response instead back to async code (possibly a queue? Maybe a flag? Not sure tbh, just a thought) If you only need to wait for a result, currently the most appropriate option is to use either a high-level event (`aiologic.Event` if there are many waiters of the same result) or a low-level event (`aiologic.lowlevel.create__event()` (>=0.15.0) / `aiologic.lowlevel.Event` (<0.15.0) if there is only one waiter). Just set the event after the result is saved somewhere. If you want to sequentially wait for multiple results, then yes, `aiologic.Queue` might be a good option. You may also be interested in [`culsans.Queue`](https://github.com/x42005e1f/culsans), which has more features. Flags (`aiologic.Flag` (>=0.15.0) / `aiologic.lowlevel.Flag` (<0.15.0)) have a slightly different meaning. They are used in situations where you want to know which thread (or state) was first. The analogy from which they get their name is related to conquering the mountain / moon / and so on. I have a plan to add some kind of Future / AsyncResult in the future, but I have not decided on the interface yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
ashb commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2966720479 As part of addressing [#46426](https://github.com/apache/airflow/issues/46426) (which I'm working on right now) I'm totally overhauling the protocol between supervisor and child process. I'm adding a request ID (integer, atomic auto inc using `itertools.counter`) and I was wondering about if we could remove the lock and replace it with some other primitive from aiologic to send the response instead back to async code (possibly a queue? Maybe a flag? Not sure tbh, just a thought) My WIP branch is https://github.com/astronomer/airflow/tree/rework-tasksdk-supervisor-comms-protocol -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2928285691 By the way, below is an excerpt from my unwritten section for the aiologic documentation, which illustrates just how expensive context switching can actually be. It does not show the whole picture, but it may be useful for understanding what seldom-mentioned problems asynchronous programming has, especially in [lock-free and wait-free](https://concurrencyfreaks.blogspot.com/2013/05/lock-free-and-wait-free-definition-and.html) contexts. challenges.md Challenges == Bridging between concurrency libraries is not the only thing that aiologic was designed to do. The purpose of this section is to show some of the problems considered in its design, in the hope that the interested reader will be able to make the best use of this library by clearly understanding its ideas. A world full of squares --- How much time are you willing to spend to get all the threads up and running? This may seem like a strange question, but it is not as simple as it seems at first glance. Living in the world of data, we tend to consider the time complexity of the algorithms we know. But what about the asynchronous world? We are used to seeing this world as a black box, forgetting that it is built on the same algorithms, albeit at a level that is not always available to us. And that comes at a price. Suppose we want to launch N threads to perform some long work. Whether it is for parallel processing of some NumPy arrays, for network operations, for simulating some game processes - it does not matter. Here is an example that models our task: ```python import threading import time N = ... stopped = False def work(i): global stopped if i == N - 1: # the last thread stopped = True # stop the work while not stopped: time.sleep(0) # do some work for i in range(N): threading.Thread(target=work, args=[i]).start() ``` In this example, we run the work in separate threads until the last thread starts. Let's see what happens if we set different N's. * N=100: 0.17 seconds * N=200: 0.55 seconds * N=300: 1.19 seconds * N=400: 2.14 seconds * N=500: 3.14 seconds * N=600: 4.69 seconds * N=700: 6.38 seconds * N=800: 8.11 seconds * N=900: 10.48 seconds * N=1000: 12.95 seconds Whoa! Increasing the number of threads by only 10 times increased the time by over 50 times! We can clearly see that the dependence of execution time on the number of threads is not linear - in fact, it is quadratic. Why does this happen? Starting a thread in Python is based on two main operations: 1. Asking the operating system to start a thread. 2. Waiting for that thread to start (e.g. to detect memory leaks). Starting each new thread forces the main thread to do a context switch. However, the operating system needs to emulate the concurrent execution of all threads, so the picture will usually not look like ping-pong between the main thread and the newly created thread - it will need to give CPU resources to the already running threads to execute as well. With a fair scheduling strategy that might look something like this: 1. main thread 2. thread 1 -> main thread 3. thread 1 -> thread 2 -> main thread 4. thread 1 -> thread 2 -> thread 3 -> main thread 5. ... With each new thread, the required number of context switches to start the next one increases. We see a triangle, which becomes a *square* when the constant is discarded - that is where the quadratic complexity comes from! Again, this is not specific to Apache Airflow, however as a learning material it may have some value. Just as a footnote to how non-trivial the impact of approaches that rely on context switching is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2928182102 Cool thanks , can you please have a look ? https://github.com/apache/airflow/pull/51279 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2928146671 The difference can be seen only in some very subtle cases where performing context switching might be too expensive (usually a large number of running threads that give O(n) time, possibly with additional cycles), but this is hardly the Apache Airflow case. And the futures solution to the problem is explained quite simply. When we use any kind of synchronization in the same thread, we have a deadlock because the previous context cannot continue its execution. However, when we use ThreadPoolExecutor, both operations are executed separately, because of which they can be executed without deadlocks - the second synchronous wait can be safely done without completing the asynchronous first one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2928074586 @x42005e1f looks like the solution working, i dont see any too much performance issue. may be in my local environment un noticeable. and good to know the locks are not required as it uses future object to wait. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2925180245 > > [@x42005e1f](https://github.com/x42005e1f) sorry was having hard time last two days at my day job, back to this now, Could you please elaborate with some example i couldn't think of what you referring sorry? > > Collisions occur due to simultaneous reading of a single descriptor - usually the one referenced by `sys.stdin`. In `CommsDecoder.get_message()`, it is read synchronously via `sys.stdin` directly. In `TriggerRunner.sync_state_to_supervisor()`, it is read asynchronously via `asyncio.StreamReader`. The idea is that to resolve collisions and avoid deadlocks, it is sufficient to allow the file descriptor to be read in mixed mode in the same thread, but associate the read with the one who sent the request. > > When using the lock described above (which can be implemented via GLock or independently), there is only one such mixed read situation - async -> sync. In this case we can, for example, do three things: > > 1. Replace `asyncio.StreamReader` with `concurrent.futures.ThreadPoolExecutor(1)`. > 2. Call `executor.submit()` in `TriggerRunner.sync_state_to_supervisor()`, store a reference to the future in `SUPERVISOR_COMMS`, wait for the future asynchronously with `asyncio.wrap_future()`. > 3. In `CommsDecoder.get_message()`, wait for the future synchronously, if any, and only then read the `sys.stdin` content itself. > > That way we can eliminate this type of collision with minimal pain. At least hypothetically. Looks promising, looking at now. :) thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2925164338 > Also note that if we make all communication as `executor.submit()` calls, we can do without synchronization. Since the calls will be executed by a single worker thread, and the futures can be waited for either synchronously or asynchronously. The disadvantage is only a slight performance loss. > > Obviously, this will also mean that you will no longer need aiologic, at least not for the kind of `SUPERVISOR_COMMS` synchronization you have now. Well, sophisticated solutions are rarely ever really needed in a proper architecture. Agree, generally nowadays people are using triggers largely, not sure about the how much performance it hit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2924992192 Also note that if we make all communication as `executor.submit()` calls, we can do without synchronization. Since the calls will be executed by a single worker thread, and the futures can be waited for either synchronously or asynchronously. The disadvantage is only a slight performance loss. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2924954561 > [@x42005e1f](https://github.com/x42005e1f) sorry was having hard time last two days at my day job, back to this now, Could you please elaborate with some example i couldn't think of what you referring sorry? Collisions occur due to simultaneous reading of a single descriptor - usually the one referenced by `sys.stdin`. In `CommsDecoder.get_message()`, it is read synchronously via `sys.stdin` directly. In `TriggerRunner.sync_state_to_supervisor()`, it is read asynchronously via `asyncio.StreamReader`. The idea is that to resolve collisions and avoid deadlocks, it is sufficient to allow the file descriptor to be read in mixed mode in the same thread, but associate the read with the one who sent the request. When using the lock described above (which can be implemented via GLock or independently), there is only one such mixed read situation - async -> sync. In this case we can, for example, do three things: 1. Replace `asyncio.StreamReader` with `concurrent.futures.ThreadPoolExecutor(1)`. 2. Call `executor.submit()` in `TriggerRunner.sync_state_to_supervisor()`, store a reference to the future in `SUPERVISOR_COMMS`, wait for the future asynchronously with `asyncio.wrap_future()`. 3. In `CommsDecoder.get_message()`, wait for the future synchronously, if any, and only then read the `sys.stdin` content itself. That way we can eliminate this type of collision with minimal pain. At least hypothetically. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2924809405 @x42005e1f sorry was having hard time last two days at my day job, back to this now, Could you please elaborate with some example i couldn't think of what you referring sorry? If not i will go ahead and try implement the above Commslock approach with sync_or_async. i feel its worth doing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917903234 Perhaps I should clarify the thought. Instead of waiting in the synchronous function to finish reading the message in the asynchronous task, which obviously will never complete without switching back to the event loop, we can finish reading the message in the synchronous function. Once we finish reading that message, we will send that remainder to the asynchronous task on the next switch. And before that, the synchronous function will make the request it wants to make. Also, instead of adding a separate asynchronous lock, we can introduce separate `default_group_factory` for green (sync) and async APIs respectively (`=current_thread_ident` for the former, `=current_async_task_ident` for the latter). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917869438 > [@kaxil](https://github.com/kaxil) [@ashb](https://github.com/ashb) It seems to me that having a separate channel for triggerer workloads and API communication would be ideal in this scenario to avoid deadlocks IMHO. There is also a very audacious solution. Let's go back to the thread-level lock. If we add an asynchronous lock to it for the current event loop, it will: 1. Guarantee exclusive access for the thread. 2. Guarantee exclusive access for the task. 3. But allow access through a synchronous function called by another task. And let's just suspend the message transmission to the asynchronous task and let the response come to the synchronous function first, and resume sending to the asynchronous function afterwards. This way we will eliminate both collisions and deadlocks - due to cooperative multitasking it can be done relatively easily. But this mechanism will have to be implemented properly. > can this `greenback.ensure_portal()` be opened on global level in trigger process, why i am asking is its bit problematic to add this before every function that calls from triggerer. Also users have their own custom triggers they have to update this before every function call. Just call somewhere within the current task that will make sync-or-async calls. `greenback.ensure_portal()` keeps track of registered tasks, so it can be called multiple times. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917855355 > > [@x42005e1f](https://github.com/x42005e1f) please add your thoughts. > > I do not think that making the function return type dependent on runtime is a good pattern. This approach is poorly compatible with static analyzers and it will also not work correctly in all cases. Instead, I suggest shifting the focus not on how to create greenlets, but on how to pass awaitable objects from them. Here is how it can be implemented with a ready-made example for `aiologic.Lock`: > > from __future__ import annotations > > import inspect > import sys > > from collections.abc import Awaitable, Callable > from functools import wraps > from typing import TypeVar, cast > > import aiologic > import anyio > import greenback > > if sys.version_info >= (3, 10): > from typing import ParamSpec > else: > from typing_extensions import ParamSpec > > _T = TypeVar("_T") > _P = ParamSpec("_P") > > > def sync_or_async( > sync_func: Callable[_P, _T], > async_func: Callable[_P, _T] | Callable[_P, Awaitable[_T]], > ) -> Callable[_P, _T]: > if inspect.iscoroutinefunction(async_func): > async_impl = greenback.autoawait(async_func) > else: > async_impl = async_func > > async_impl = cast(Callable[_P, _T], async_impl) > > @wraps(sync_func) > def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: > if greenback.has_portal(): > return async_impl(*args, **kwargs) > else: > return sync_func(*args, **kwargs) > > return wrapper > > > class CommsLock(aiologic.Lock): > __slots__ = () > > async def _async_acquire_with_timeout( > self, > /, > *, > blocking: bool = True, > timeout: float | None = None, > ) -> bool: > with anyio.move_on_after(timeout): > return await self.async_acquire(blocking=blocking) > > return False > > green_acquire = sync_or_async( > aiologic.Lock.green_acquire, > _async_acquire_with_timeout, # type: ignore[arg-type] > ) > green_release = sync_or_async( > aiologic.Lock.green_release, > aiologic.Lock.async_release, > ) > lock = CommsLock() > > async def noop() -> None: > pass > > async def holding() -> None: > async with lock: > await asyncio.sleep(0) # switch back to the event loop > > def acquire_release() -> None: > with lock: > pass # do something > > # make CommsLock implicitly awaitable for the current task > await greenback.ensure_portal() > > # hold the lock with another task > holder = asyncio.create_task(holding()) > await asyncio.sleep(0) > assert lock.locked() > > # a task to verify that CommsLock does indeed yield to the event loop > task = asyncio.create_task(noop()) > assert not task.done() > > acquire_release() # sync-or-async call > assert task.done() # there was a context switch! can this `greenback.ensure_portal()` be opened on global level in trigger process, why i am asking is its bit problematic to add this before every function that calls from triggerer. Also users have their own custom triggers they have to update this before every function call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917851474 @x42005e1f Really appreciate your thoughts here and all the suggestions :) Yes, I agree, having an async version would be a good way to avoid all these patches. If I remember correctly, the lock was introduced primarily to prevent mixing API communication messages with triggerer workloads. @kaxil @ashb It seems to me that having a separate channel for triggerer workloads and API communication would be ideal in this scenario to avoid deadlocks IMHO. Alternatively, implementing async versions of all relevant methods could also work. Or, as @x42005e1f suggested, we could adopt the method he mentioned—but as he noted, we’ll need to closely monitor its coroutine-safety. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917832050 Well, I think the best solution is not to synchronize. If it is possible to rewrite communication so that messages go really atomically (or without collisions), it will get rid of all this abstruse headache. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917802668 Also note that this greenlet approach requires interrupt support - synchronous functions that contain such sync-or-async calls at least indirectly (via subcalls) must be aware that they can be interrupted and called again (i.e. they must be coroutine-safe). Otherwise, the effect will be like a multithreaded function call, but with cooperative multitasking, the possibility of deadlocks when using primitives from the threading module, and broken `threading.local`. If it is possible to write asynchronous versions for functions that use `SUPERVISOR_COMMS.lock` with little effort, it is better to do so. If not, the above approach can be considered as a temporary or permanent solution (depending on whether you are willing to keep an eye on coroutine-safety). Note that selecting the first option (separate asynchronous functions) requires that there be such functions for each use case of `SUPERVISOR_COMMS.lock` via the asynchronous API. If an asynchronous function calls at least one synchronous function that directly or indirectly uses `SUPERVISOR_COMMS.lock`, this immediately risks deadlocks, possibly even hard-to-detect ones. This caveat is only true for calls within the same thread. If `SUPERVISOR_COMMS.lock` is used synchronously in a separate thread (e.g. via `sync_to_async()`), everything is fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917682923 > [@x42005e1f](https://github.com/x42005e1f) please add your thoughts. I do not think that making the function return type dependent on runtime is a good pattern. This approach is poorly compatible with static analyzers and it will also not work correctly in all cases. Instead, I suggest shifting the focus not on how to create greenlets, but on how to pass awaitable objects from them. Here is how it can be implemented with a ready-made example for `aiologic.Lock`: ```python from __future__ import annotations import inspect import sys from collections.abc import Awaitable, Callable from functools import wraps from typing import TypeVar, cast import aiologic import greenback if sys.version_info >= (3, 10): from typing import ParamSpec else: from typing_extensions import ParamSpec _T = TypeVar("_T") _P = ParamSpec("_P") def sync_or_async( sync_func: Callable[_P, _T], async_func: Callable[_P, _T] | Callable[_P, Awaitable[_T]], ) -> Callable[_P, _T]: if inspect.iscoroutinefunction(async_func): async_impl = greenback.autoawait(async_func) else: async_impl = async_func async_impl = cast(Callable[_P, _T], async_impl) @wraps(sync_func) def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T: if greenback.has_portal(): return async_impl(*args, **kwargs) else: return sync_func(*args, **kwargs) return wrapper class CommsLock(aiologic.Lock): __slots__ = () async def _async_acquire_with_timeout( self, /, *, blocking: bool = True, timeout: float | None = None, ) -> bool: return await self.async_acquire(blocking=blocking) green_acquire = sync_or_async( aiologic.Lock.green_acquire, _async_acquire_with_timeout, # type: ignore[arg-type] ) green_release = sync_or_async( aiologic.Lock.green_release, aiologic.Lock.async_release, ) ``` ```python lock = CommsLock() async def noop() -> None: pass async def holding() -> None: async with lock: await asyncio.sleep(0) # switch back to the event loop def acquire_release() -> None: with lock: pass # do something # make CommsLock implicitly awaitable for the current task await greenback.ensure_portal() # hold the lock with another task holder = asyncio.create_task(holding()) await asyncio.sleep(0) assert lock.locked() # a task to verify that CommsLock does indeed yield to the event loop task = asyncio.create_task(noop()) assert not task.done() acquire_release() # sync-or-async call assert task.done() # there was a context switch! ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917340184 @kaxil @uranusjr considering the above all discussion this leaves me i guess two options IMHO. Whats your suggestion or thoughts ? 1. Create async version of all the comms methods that access via triggerer 2. Use something like [greenback](https://github.com/oremanj/greenback) , execute the sync version inside greenback portal. this we no need to have copy of async version. For option two i am thinking something like this below we can have decorators on the methods. and if the comms methods called from async function then these executes in greenback portal. for normal sync invocation it works without any issues. ``` lock = aiologic.Lock() global is_async_process is_async_process = False def sync_or_async(func): @functools.wraps(func) def wrapper(*args, **kwargs): if is_async_process: asyncio.get_running_loop() async def async_wrapper(): return await greenback.with_portal_run_sync(func, *args, **kwargs) return async_wrapper() else: return func(*args, **kwargs) return wrapper @sync_or_async def sample_func(a, b): with lock: return a + b async def async_main(): global is_async_process is_async_process = True await sample_func(4, 4) if __name__ == "__main__": sample_func(1, 2) asyncio.run(async_main()) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917225014 > > Looks like this package is not maintained or not sure, no activity since last Feb 2024. > > It is not always possible to tell from the commit activity of such packages whether they are maintained or not. Sometimes they are not updated just because there are no serious issues, in which case the author can focus on other projects. Even more ambiguous is the situation when tests are regularly run for new versions of dependencies and a lockfile is updated - in this case the repository may look alive with a huge number of commits, but in fact it has not been updated for a very long time, having very few commits in the source code for the whole history. > > There are also alternatives such as [greenletio](https://github.com/miguelgrinberg/greenletio) or [awaitlet](https://github.com/sqlalchemy/awaitlet). But if you want, you can implement something similar on your side - for example, SQLAlchemy has long used [its own module](https://github.com/sqlalchemy/sqlalchemy/blob/05b2442132d5ae31cfcc7a1fe95e0f6b739aa995/lib/sqlalchemy/util/concurrency.py) as part of asynchronous API implementation. > > These solutions have some disadvantages associated with stack growth, but they are usually insignificant. A more comprehensive solution is to implement (generators and) coroutines via greenlets - what I called (genlets and) corolets 3 years ago (for a non-public tutorial on asynchronous library design). But it is redundant for this task. agree :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2913968644 > Looks like this package is not maintained or not sure, no activity since last Feb 2024. It is not always possible to tell from the commit activity of such packages whether they are maintained or not. Sometimes they are not updated just because there are no serious issues, in which case the author can focus on other projects. Even more ambiguous is the situation when tests are regularly run for new versions of dependencies and a lockfile is updated - in this case the repository may look alive with a huge number of commits, but in fact it has not been updated for a very long time, having very few commits in the source code for the whole history. There are also alternatives such as [greenletio](https://github.com/miguelgrinberg/greenletio) or [awaitlet](https://github.com/sqlalchemy/awaitlet). But if you want, you can implement something similar on your side - for example, SQLAlchemy has long used [its own module](https://github.com/sqlalchemy/sqlalchemy/blob/05b2442132d5ae31cfcc7a1fe95e0f6b739aa995/lib/sqlalchemy/util/concurrency.py) as part of asynchronous API implementation. These solutions have some disadvantages associated with stack growth, but they are usually insignificant. A more comprehensive solution is to implement (generators and) coroutines via greenlets - what I called (genlets and) corolets 3 years ago (for a non-public tutorial on asynchronous library design). But it is redundant for this task. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2913817328 > If the problem is the same as what I described (which is likely), the (relatively) easiest way to solve it is to use libraries like [greenback](https://github.com/oremanj/greenback) to execute an asynchronous version when the code runs in a thread with an event loop. Requires explicit portal creation, greenlet support, and interrupt support on the synchronous code side. > > It is also possible to create asynchronous versions of functions to use directly, or delegate them to execute sequentially in the worker thread via `sync_to_async()`, but this can be difficult, especially since [`SUPERVISOR_COMMS.lock` is used even in `__getitem__()`](https://github.com/apache/airflow/blob/7ebba78236b945e0ce569607480f397e5f2e58ba/task-sdk/src/airflow/sdk/execution_time/lazy_sequence.py#L147). > > > BTW GLock where can i find this package i dont see it on PyPI? i will try with this > > You can directly copy the code from gist and make the mentioned change. GLock [was created to provide readers-writer locks](https://github.com/x42005e1f/aiologic/discussions/6) before Grouper appears in a future version of aiologic. Right now my efforts are focused on moving aiologic to the beta development stage, so releases are currently delayed. > > But be aware that if you choose GLock, you will have to deal with the loss of coroutine-safety: in particular, the asynchronous usage you mentioned will be executed in one thread at a time, but will also be able to be executed in different tasks at the same time. @x42005e1f Thats a good suggestion, greenback seems working, did some tests and everything seems fine. no triggers blocked. Looks like this package is not maintained or not sure, no activity since last Feb 2024. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2912697685 If the problem is the same as what I described (which is likely), the easiest way to solve it is to use libraries like [greenback](https://github.com/oremanj/greenback) to execute an asynchronous version when the code runs in a thread with an event loop. Requires explicit portal creation, greenlet support, and interrupt support on the synchronous code side. It is also possible to create asynchronous versions of functions to use directly, or delegate them to execute sequentially in the worker thread via `sync_to_async()`, but this can be difficult, especially since [`SUPERVISOR_COMMS.lock` is used even in `__getitem__()`](https://github.com/apache/airflow/blob/7ebba78236b945e0ce569607480f397e5f2e58ba/task-sdk/src/airflow/sdk/execution_time/lazy_sequence.py#L147). > BTW GLock where can i find this package i dont see it on PyPI? i will try with this You can directly copy the code from gist and make the mentioned change. GLock [was created to provide readers-writer locks](https://github.com/x42005e1f/aiologic/discussions/6) before Grouper appears in a future version of aiologic. Right now my efforts are focused on moving aiologic to the beta development stage, so releases are currently delayed. But be aware that if you choose GLock, you will have to deal with the loss of coroutine-safety: in particular, the asynchronous usage you mentioned will be executed in one thread at a time, but will also be able to be executed in different tasks at the same time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2912628296 > Also note that the different APIs that aiologic provides should not be mixed in the same thread if threading is used for the green (sync) API. This is a fundamental problem due to the fact that primitives work at the task level: `lock.green_acquire()` can block the event loop when `aiologic.lowlevel.current_green_library() == "threading"`, and as a result lock will never be released. This is well demonstrated in the following example: > > example.py > This is the same as using `threading.Lock` along with asyncio tasks. If you have such calls somewhere, it can be the cause of deadlocks. > > This problem of mixing two APIs in the same thread can be solved by using [GLock](https://gist.github.com/x42005e1f/a50d0744013b7bbbd7ded608d6a3845b), which can be a thread-level lock - if you use the read-preferring readers-writer locks condition (`self.group is None or self.group == group`), its APIs can safely mix (just replace `lock = aiologic.Lock()` with `lock = GLock(default_group_factory=current_thread_ident)` and that example will work without hanging). However, since it would no longer be a task level, it would not be coroutine-safe. looks like we are using that similar way some places. this is async one https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/triggerer_job_runner.py#L965 and in other places we are using like `with SUPERVISOR_COMMS.lock:` BTW GLock where can i find this package i dont see it on PyPI? i will try with this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2912414727 Also note that the different APIs that aiologic provides should not be mixed in the same thread if threading is used for the green (sync) API. This is a fundamental problem due to the fact that primitives work at the task level: `lock.green_acquire()` can block the event loop when `aiologic.lowlevel.current_green_library() == "threading"`, and as a result lock will never be released. This is well demonstrated in the following example: example.py ```python import asyncio import time import aiologic async def main(): lock = aiologic.Lock() async def a(): async with lock: print("before") await asyncio.sleep(0) print("after") # will never be printed! async def b(): with lock: time.sleep(0) await asyncio.gather(a(), b()) if __name__ == "__main__": asyncio.run(main()) ``` This is the same as using `threading.Lock` along with asyncio tasks. If you have such calls somewhere, it can be the cause of deadlocks. This problem of mixing two APIs in the same thread can be solved by using [GLock](https://gist.github.com/x42005e1f/a50d0744013b7bbbd7ded608d6a3845b), which can be a thread-level lock - if you use the read-preferring readers-writer locks condition, its APIs can safely mix. However, since it would no longer be a task level, it would not be coroutine-safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2912041108 @x42005e1f can i have your example you tested? In our trigger process we use lock inside sync_to_async. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2912116437 > [@x42005e1f](https://github.com/x42005e1f) can i have your example you tested? In our trigger process we use lock inside sync_to_async. Yes, here is a simple mixed test: ```python import asyncio import sys from concurrent.futures import ThreadPoolExecutor, as_completed import aiologic from asgiref.sync import sync_to_async if sys.version_info >= (3, 11): WaitTimeout = TimeoutError else: from concurrent.futures import TimeoutError as WaitTimeout TIMEOUT = 6.0 # must be reached (success) THREADS = 100 TASKS = 100 lock = aiologic.Lock() try: stopped = aiologic.Flag() except AttributeError: # aiologic<0.15.0 stopped = aiologic.lowlevel.Flag() def sync_acquire_release(): with lock: pass async def async_acquire_release(i): if i % 2: while True: await sync_to_async(sync_acquire_release)() if stopped: break else: while True: async with lock: await asyncio.sleep(0) if stopped: break async def hub(): await asyncio.gather(*( asyncio.create_task(async_acquire_release(i)) for i in range(TASKS) )) if __name__ == "__main__": with ThreadPoolExecutor(THREADS) as executor: interval = sys.getswitchinterval() sys.setswitchinterval(min(1e-6, interval)) try: futures = [ executor.submit(asyncio.run, hub()) for _ in range(THREADS) ] for future in as_completed(futures, timeout=TIMEOUT): future.result() # reraise except WaitTimeout: pass finally: sys.setswitchinterval(interval) stopped.set() ``` But note that `sync_to_async()` uses only one worker thread by default, which blocks all further calls until the first one is completed. This can be seen, for example, in this test: ```python import asyncio import sys import threading from asgiref.sync import sync_to_async async def main(): loop = asyncio.get_running_loop() first = asyncio.Event() second = threading.Event() def a(): print("'a' started!") loop.call_soon_threadsafe(first.set) second.wait() def b(): print("'b' started!") # will never be printed! second.set() f1 = asyncio.create_task(sync_to_async(a)()) await first.wait() print("nofitied!") f2 = asyncio.create_task(sync_to_async(b)()) await asyncio.gather(f1, f2) if __name__ == "__main__": asyncio.run(main()) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
x42005e1f commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2912014652 @ArvidMartensRenson, I would like to clarify: do threads get stuck when trying to acquire the lock when some other thread is already performing some actions holding the lock, or do they get stuck for no reason? I ask because in the second case it would mean that the problem is most likely on my side - in which case I would consider it a critical bug and release a fix version. However, I tested the lock with `sync_to_async()` yesterday right after the PR appeared and was unable to reproduce the problem in isolated tests (with both stable and the latest version of aiologic). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2910223518 @ArvidMartensRenson https://github.com/apache/airflow/pull/51085 would be helpful if you test this change please? i ran with aroud 400 triggerers and dont see any locks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2908470186 Looks like the lock inside sync_to_async function causing issues, its likely messing up with threads acquired in side the sync_to_async. We use get_ti_count, get_task_states, get_dr_count from the `RuntimeTaskInstance`. likely this is not a good choice. I have Moved those functions to async version without using sync_to_async and working fine, did some stress test around 400 triggers nothing locked everything running fine. @ArvidMartensRenson we may not need async client. i will raise changes for moving the functions to async, please test it once i raised? async client type also would be good , but in this case i dont see any need. its working without async client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2906002321 > I created an async client from the sync client, using httpx.AsyncClient as the base (like httpx.Client is for the sync client). Inherited the init client from the sync task client. Only problem I had was that the operations had to be rewritten as an async method, so I copied the code from the sync client and changed the method definition to async. I found this implementation nicer than using plain api calls with the async httpx client. > > Traffic is over localhost so even then a token is required, I reused the methods that are used in the creation of the task workload to get the token. I considered dropping the token requirement but it seemed safer and whenever the trigger service and api server are separated it will be needed. > > I implemented this solution last Thursday and for now the trigger service has not been stuck and the external workload triggers all completed. So for now I would conclude that the solution works. > > If it is desirable to have an async client I am happy to help on creating the client, however I am a little worried with duplicate code I have now, same code for the sync and async client. If someone knows a solution for this or a way to handle that problem, that would be nice. I don't see any issues with using an async client, it's part of the triggerer, and most triggers are already asynchronous. I do agree that there's some code duplication. Please feel free to submit your approach; it’ll be great to have more eyes on it and gather suggestions to refine the solution. :) > > For the record, I am using a custom implementation of the workload trigger. I was an early adopter of the trigger paradigm and the external workload trigger did not exist at that time, so we had to implement our own trigger. We never came around converting to the standard provider implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2905998488 Thanks @ArvidMartensRenson that explains me what's happening. During debugging, I noticed that requests are being sent, but no responses are being returned. Since we use a lock when sending requests, the absence of a response causes the lock to remain active, which in turn blocks any further triggers from being processed. Will check on async client part over this week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
ArvidMartensRenson commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2891834967 I created an async client from the sync client, using httpx.AsyncClient as the base (like httpx.Client is for the sync client). Inherited the init client from the sync task client. Only problem I had was that the operations had to be rewritten as an async method, so I copied the code from the sync client and changed the method definition to async. I found this implementation nicer than using plain api calls with the async httpx client. Traffic is over localhost so even then a token is required, I reused the methods that are used in the creation of the task workload to get the token. I considered dropping the token requirement but it seemed safer and whenever the trigger service and api server are separated it will be needed. I implemented this solution last Thursday and for now the trigger service has not been stuck and the external workload triggers all completed. So for now I would conclude that the solution works. If it is desirable to have an async client I am happy to help on creating the client, however I am a little worried with duplicate code I have now, same code for the sync and async client. If someone knows a solution for this or a way to handle that problem, that would be nice. For the record, I am using a custom implementation of the workload trigger. I was an early adopter of the trigger paradigm and the external workload trigger did not exist at that time, so we had to implement our own trigger. We never came around converting to the standard provider implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2891075878 > For now I solved througt the creation of an async Task SDK client which the triggers use directly. In my setup the triggers always have the api server running on the same instances, hence I could use localhost traffic. In this way it is rather similar like the in-process traffic but I do not overload the data pipe between trigger runner and supervisor. From my investigation it seems that using too much external workload triggers (count calls) together with the supervisor-runner syncs congests that pipe in such a way that all calls were blocked indefinitely resulting in a trigger process that seems alive (heartbeats still worked) but in reality it is stuck. > > Probably this is not the designed pattern, but for now this solution makes it possible for me to role out airflow 3.0 at our office. @ArvidMartensRenson we dont have async client in Task SDK, did you overwrite the Triggerer client method, does it fixed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2889946586 I am able to re produce it with one of example ExternalTaskSensor dags. It looks blocking somewhere and triggers are not getting to added to further process. Need to workout why it's blocking though we use async lock. Will check and update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
ArvidMartensRenson commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2889837087 For now I solved througt the creation of an async Task SDK client which the triggers use directly. In my setup the triggers always have the api server running on the same instances, hence I could use localhost traffic. In this way it is rather similar like the in-process traffic but I do not overload the data pipe between trigger runner and supervisor. From my investigation it seems that using too much external workload triggers (count calls) together with the supervisor-runner syncs congests that pipe in such a way that all calls were blocked indefinitely resulting in a trigger process that seems alive (heartbeats still worked) but in reality it is stuck. Probably this is not the designed pattern, but for now this solution makes it possible for me to role out airflow 3.0 at our office. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
vatsrahul1001 commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2889660176 @gopidesupavan are you still looking into this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
gopidesupavan commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2872703606 Yeah will look at this today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]
uranusjr commented on issue #50185: URL: https://github.com/apache/airflow/issues/50185#issuecomment-2857139721 Is it possible to write a dag that can reliably reproduce this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org