Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-06-17 Thread via GitHub


ashb closed issue #50185: Trigger runner process locked with multiple Workflow 
triggers
URL: https://github.com/apache/airflow/issues/50185


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-06-12 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967452565

   To put it another way, a feature of the approach with delegating execution 
to another thread is that the code is executed independently of the synchronous 
code in the current thread. This means that any operation that has been 
delegated by the asynchronous code and that the synchronous code needs to wait 
for will be able to complete in parallel. This eliminates the dependency, and 
thus bypasses the problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-06-12 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967405096

   > > the synchronous code will not be able to interrupt its execution to 
resume execution of asynchronous code. It will only work if they are different 
threads.
   > 
   > And we don't get that via `sync_to_async` as all calls to that use a 
single Thread?
   
   `sync_to_async()` delegates execution to a worker thread. This eliminates 
the requirement to switch from synchronous to asynchronous code (since 
asynchronous code just waits for a future object), which in turn bypasses the 
problem - you can even synchronize in synchronous code, as long as synchronous 
code does not depend on asynchronous code in the same thread. Of course, this 
is true only when `sync_to_async()` is used for full operations - if it is used 
for single reads that are part of the whole `get_message()`, the problem 
remains.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-06-12 Thread via GitHub


ashb commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967384988

   > the synchronous code will not be able to interrupt its execution to resume 
execution of asynchronous code. It will only work if they are different threads.
   
   And we don't get that via `sync_to_async` as all calls to that use a single 
Thread?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-06-12 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967340252

   > * The code that pulls off of the requests Q and writes to requests/reads 
from stdin is 100% async code
   
   However, I read it again and realized that this is an attempt to delegate 
work to asynchronous code from synchronous code. No, it will not work that way 
because, again, the synchronous code will not be able to interrupt its 
execution to resume execution of asynchronous code. It will only work if they 
are different threads.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-06-12 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967315728

   > Actually, how about this [@x42005e1f](https://github.com/x42005e1f):
   
   If queues are also used for asynchronous code, the problem remains. Let's 
imagine that asynchronous code sent a request and then the event loop switched 
execution to another task that called the sync send code. Then the first 
response will be for the asynchronous code, and the synchronous code will wait 
forever for that response to be removed from the queue, which will never happen 
because execution will never switch back to the asynchronous code. This is now 
not a deadlock, but an eternal load on the processor.
   
   If the queues are only for synchronous code, then yes, that might solve the 
problem. However, in any case, I suggest removing `self._q_responses`, since 
such polls are almost always a bad thing. Just send a future 
(`concurrent.futures.Future`) along with the frame. Call `future.result()` in 
the sync send code, and let the producer put the response in the future via 
`future.set_result(response)`. Then the consumer will wake up and get their 
response from the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-06-12 Thread via GitHub


ashb commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967206575

   Actually, how about this @x42005e1f:
   
   - We remove the lock on CommsDecoder.
   - We create a new subclass called TriggererCommsDecoder (so we know when we 
are operating in this hybrid sync/async env)
   - On that we have two queues - one for requests, the other for responses.
   - The code that pulls off of the requests Q and writes to requests/reads 
from stdin is 100% async code
   - The sync send code (in my Pr I combined `send_msg() + get_response()` into 
a single function as it's a nicer API anyway) does something like this:
   
   ```python
   req_id = next(self.id_counter)
   self._q_requests.put(Frame(..., id=req_id)
   while True:
   resp = self._q_responses.peek()
   if resp.id != req_id:
   # Response to someone else's message. Try again
   continue
   self._q_responses.get() # Though this feels racy? Maybe we need a 
sanity check and to requeue it if the req_id doesn't match?
   ```
   
   Is that actually any better (clearer/easier to understand/more performant) 
than just using a n=1 Thread pool?
   
   WDYT as well @gopidesupavan?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-06-12 Thread via GitHub


ashb commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967156151

   Thanks, yeah. I'm only just now getting back around to looking at this issue 
and the linked PR in detail.
   
   I don't have any reader/writer thread in my WIP right now, and wasn't 
thinking of adding any, so I don't think my changes will in help at all this 
way. (In my head I was thinking "we can just send all the responses from sync 
code, make note of the request_id, then have asnyc code wait for a response to 
that matching request_id. But that won't help squat here)
   
   Thanks very much for the detailed and helpful comments @x42005e1f.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-06-12 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967111308

   > Given that constraint (that we sort of really want to support this code, 
sadly) I'm not sure my queue etc approach will help at all.
   
   As I said in the [comment on the linked 
PR](https://github.com/apache/airflow/pull/51279#issuecomment-2934712813), the 
problem is tied to the async -> sync case in the same thread, which is 
fundamentally unsolvable. However, it is enough to make synchronous and 
asynchronous codes independent of each other to bypass the problem. For 
example, if every read is actually performed in a worker thread.
   
   If your approach has something similar to this, or otherwise removes the 
dependency (created due to synchronization), then it solves this issue as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-06-12 Thread via GitHub


ashb commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2967053571

   Thinking about this a bit more and re-reading the your super helpful 
comments, I'm not sure this approach will get around the fundamental problem of 
sync_to_async.
   
   We can't easily (today/now) add async versions of get_connection etc, so we 
have the issue that the calling code ends up doing:
   
   ```python
   connection = await sync_to_async(self.get_connection)(self.aws_conn_id)
   ```
   
   Given that constraint (that we sort of really want to support this code, 
sadly) I'm not sure my queue etc approach will help at all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-06-12 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2966998036

   > I'm adding a request ID (integer, atomic auto inc using 
`itertools.counter`) and I was wondering about if we could remove the lock and 
replace it with  some other primitive from aiologic to send the response 
instead back to async code (possibly a queue? Maybe a flag? Not sure tbh, just 
a thought)
   
   If you only need to wait for a result, currently the most appropriate option 
is to use either a high-level event (`aiologic.Event` if there are many waiters 
of the same result) or a low-level event 
(`aiologic.lowlevel.create__event()` (>=0.15.0) / 
`aiologic.lowlevel.Event` (<0.15.0) if there is only one waiter). Just 
set the event after the result is saved somewhere.
   
   If you want to sequentially wait for multiple results, then yes, 
`aiologic.Queue` might be a good option. You may also be interested in 
[`culsans.Queue`](https://github.com/x42005e1f/culsans), which has more 
features.
   
   Flags (`aiologic.Flag` (>=0.15.0) / `aiologic.lowlevel.Flag` (<0.15.0)) have 
a slightly different meaning. They are used in situations where you want to 
know which thread (or state) was first. The analogy from which they get their 
name is related to conquering the mountain / moon / and so on.
   
   I have a plan to add some kind of Future / AsyncResult in the future, but I 
have not decided on the interface yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-06-12 Thread via GitHub


ashb commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2966720479

   As part of addressing 
[#46426](https://github.com/apache/airflow/issues/46426) (which I'm working on 
right now) I'm totally overhauling the protocol between supervisor and child 
process.
   
   I'm adding a request ID (integer, atomic auto inc using `itertools.counter`) 
and I was wondering about if we could remove the lock and replace it with  
some other primitive from aiologic to send the response instead back to async 
code (possibly a queue? Maybe a flag? Not sure tbh, just a thought)
   
   My WIP branch is 
https://github.com/astronomer/airflow/tree/rework-tasksdk-supervisor-comms-protocol


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-06-01 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2928285691

   By the way, below is an excerpt from my unwritten section for the aiologic 
documentation, which illustrates just how expensive context switching can 
actually be. It does not show the whole picture, but it may be useful for 
understanding what seldom-mentioned problems asynchronous programming has, 
especially in [lock-free and 
wait-free](https://concurrencyfreaks.blogspot.com/2013/05/lock-free-and-wait-free-definition-and.html)
 contexts.
   
   
   challenges.md
   
   Challenges
   ==
   
   Bridging between concurrency libraries is not the only thing that aiologic 
was
   designed to do. The purpose of this section is to show some of the problems
   considered in its design, in the hope that the interested reader will be able
   to make the best use of this library by clearly understanding its ideas.
   
   A world full of squares
   ---
   
   How much time are you willing to spend to get all the threads up and running?
   This may seem like a strange question, but it is not as simple as it seems at
   first glance.
   
   Living in the world of data, we tend to consider the time complexity of the
   algorithms we know. But what about the asynchronous world? We are used to
   seeing this world as a black box, forgetting that it is built on the same
   algorithms, albeit at a level that is not always available to us. And that
   comes at a price.
   
   Suppose we want to launch N threads to perform some long work. Whether it is
   for parallel processing of some NumPy arrays, for network operations, for
   simulating some game processes - it does not matter. Here is an example that
   models our task:
   
   ```python
   import threading
   import time
   
   N = ...
   
   stopped = False
   
   
   def work(i):
   global stopped
   
   if i == N - 1:  # the last thread
   stopped = True  # stop the work
   
   while not stopped:
   time.sleep(0)  # do some work
   
   
   for i in range(N):
   threading.Thread(target=work, args=[i]).start()
   ```
   
   In this example, we run the work in separate threads until the last thread
   starts. Let's see what happens if we set different N's.
   
   * N=100: 0.17 seconds
   * N=200: 0.55 seconds
   * N=300: 1.19 seconds
   * N=400: 2.14 seconds
   * N=500: 3.14 seconds
   * N=600: 4.69 seconds
   * N=700: 6.38 seconds
   * N=800: 8.11 seconds
   * N=900: 10.48 seconds
   * N=1000: 12.95 seconds
   
   Whoa! Increasing the number of threads by only 10 times increased the time by
   over 50 times! We can clearly see that the dependence of execution time on 
the
   number of threads is not linear - in fact, it is quadratic. Why does this
   happen?
   
   Starting a thread in Python is based on two main operations:
   
   1. Asking the operating system to start a thread.
   2. Waiting for that thread to start (e.g. to detect memory leaks).
   
   Starting each new thread forces the main thread to do a context switch.
   However, the operating system needs to emulate the concurrent execution of 
all
   threads, so the picture will usually not look like ping-pong between the main
   thread and the newly created thread - it will need to give CPU resources to 
the
   already running threads to execute as well. With a fair scheduling strategy
   that might look something like this:
   
   1. main thread
   2. thread 1 -> main thread
   3. thread 1 -> thread 2 -> main thread
   4. thread 1 -> thread 2 -> thread 3 -> main thread
   5. ...
   
   With each new thread, the required number of context switches to start the 
next
   one increases. We see a triangle, which becomes a *square* when the constant 
is
   discarded - that is where the quadratic complexity comes from!
   
   
   
   Again, this is not specific to Apache Airflow, however as a learning 
material it may have some value. Just as a footnote to how non-trivial the 
impact of approaches that rely on context switching is.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-06-01 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2928182102

   Cool thanks , can you please have a look ? 
https://github.com/apache/airflow/pull/51279 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-06-01 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2928146671

   The difference can be seen only in some very subtle cases where performing 
context switching might be too expensive (usually a large number of running 
threads that give O(n) time, possibly with additional cycles), but this is 
hardly the Apache Airflow case.
   
   And the futures solution to the problem is explained quite simply. When we 
use any kind of synchronization in the same thread, we have a deadlock because 
the previous context cannot continue its execution. However, when we use 
ThreadPoolExecutor, both operations are executed separately, because of which 
they can be executed without deadlocks - the second synchronous wait can be 
safely done without completing the asynchronous first one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-06-01 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2928074586

   @x42005e1f looks like the solution working, i dont see any too much 
performance issue. may be in my local environment un noticeable. and good to 
know the locks are not required as it uses future object to wait.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-31 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2925180245

   > > [@x42005e1f](https://github.com/x42005e1f) sorry was having hard time 
last two days at my day job, back to this now, Could you please elaborate with 
some example i couldn't think of what you referring sorry?
   > 
   > Collisions occur due to simultaneous reading of a single descriptor - 
usually the one referenced by `sys.stdin`. In `CommsDecoder.get_message()`, it 
is read synchronously via `sys.stdin` directly. In 
`TriggerRunner.sync_state_to_supervisor()`, it is read asynchronously via 
`asyncio.StreamReader`. The idea is that to resolve collisions and avoid 
deadlocks, it is sufficient to allow the file descriptor to be read in mixed 
mode in the same thread, but associate the read with the one who sent the 
request.
   > 
   > When using the lock described above (which can be implemented via GLock or 
independently), there is only one such mixed read situation - async -> sync. In 
this case we can, for example, do three things:
   > 
   > 1. Replace `asyncio.StreamReader` with 
`concurrent.futures.ThreadPoolExecutor(1)`.
   > 2. Call `executor.submit()` in `TriggerRunner.sync_state_to_supervisor()`, 
store a reference to the future in `SUPERVISOR_COMMS`, wait for the future 
asynchronously with `asyncio.wrap_future()`.
   > 3. In `CommsDecoder.get_message()`, wait for the future synchronously, if 
any, and only then read the `sys.stdin` content itself.
   > 
   > That way we can eliminate this type of collision with minimal pain. At 
least hypothetically.
   
   Looks promising, looking at now. :) thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-31 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2925164338

   > Also note that if we make all communication as `executor.submit()` calls, 
we can do without synchronization. Since the calls will be executed by a single 
worker thread, and the futures can be waited for either synchronously or 
asynchronously. The disadvantage is only a slight performance loss.
   > 
   > Obviously, this will also mean that you will no longer need aiologic, at 
least not for the kind of `SUPERVISOR_COMMS` synchronization you have now. 
Well, sophisticated solutions are rarely ever really needed in a proper 
architecture.
   
   Agree, generally nowadays people are using triggers largely, not sure about 
the how much performance it hit. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-31 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2924992192

   Also note that if we make all communication as `executor.submit()` calls, we 
can do without synchronization. Since the calls will be executed by a single 
worker thread, and the futures can be waited for either synchronously or 
asynchronously. The disadvantage is only a slight performance loss.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-31 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2924954561

   > [@x42005e1f](https://github.com/x42005e1f) sorry was having hard time last 
two days at my day job, back to this now, Could you please elaborate with some 
example i couldn't think of what you referring sorry?
   
   Collisions occur due to simultaneous reading of a single descriptor - 
usually the one referenced by `sys.stdin`. In `CommsDecoder.get_message()`, it 
is read synchronously via `sys.stdin` directly. In 
`TriggerRunner.sync_state_to_supervisor()`, it is read asynchronously via 
`asyncio.StreamReader`. The idea is that to resolve collisions and avoid 
deadlocks, it is sufficient to allow the file descriptor to be read in mixed 
mode in the same thread, but associate the read with the one who sent the 
request.
   
   When using the lock described above (which can be implemented via GLock or 
independently), there is only one such mixed read situation - async -> sync. In 
this case we can, for example, do three things:
   
   1. Replace `asyncio.StreamReader` with 
`concurrent.futures.ThreadPoolExecutor(1)`.
   2. Call `executor.submit()` in `TriggerRunner.sync_state_to_supervisor()`, 
store a reference to the future in `SUPERVISOR_COMMS`, wait for the future 
asynchronously with `asyncio.wrap_future()`.
   3. In `CommsDecoder.get_message()`, wait for the future synchronously, if 
any, and only then read the `sys.stdin` content itself.
   
   That way we can eliminate this type of collision with minimal pain. At least 
hypothetically.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-31 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2924809405

   @x42005e1f sorry was having hard time last two days at my day job, back to 
this now, Could you please elaborate with some example i couldn't think of what 
you referring sorry?
   
   If not i will go ahead and try implement the above Commslock approach with 
sync_or_async. i feel its worth doing.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-28 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917903234

   Perhaps I should clarify the thought. Instead of waiting in the synchronous 
function to finish reading the message in the asynchronous task, which 
obviously will never complete without switching back to the event loop, we can 
finish reading the message in the synchronous function. Once we finish reading 
that message, we will send that remainder to the asynchronous task on the next 
switch. And before that, the synchronous function will make the request it 
wants to make.
   
   Also, instead of adding a separate asynchronous lock, we can introduce 
separate `default_group_factory` for green (sync) and async APIs respectively 
(`=current_thread_ident` for the former, `=current_async_task_ident` for the 
latter).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-28 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917869438

   > [@kaxil](https://github.com/kaxil) [@ashb](https://github.com/ashb) It 
seems to me that having a separate channel for triggerer workloads and API 
communication would be ideal in this scenario to avoid deadlocks IMHO.
   
   There is also a very audacious solution. Let's go back to the thread-level 
lock. If we add an asynchronous lock to it for the current event loop, it will:
   
   1. Guarantee exclusive access for the thread.
   2. Guarantee exclusive access for the task.
   3. But allow access through a synchronous function called by another task.
   
   And let's just suspend the message transmission to the asynchronous task and 
let the response come to the synchronous function first, and resume sending to 
the asynchronous function afterwards. This way we will eliminate both 
collisions and deadlocks - due to cooperative multitasking it can be done 
relatively easily. But this mechanism will have to be implemented properly.
   
   > can this `greenback.ensure_portal()` be opened on global level in trigger 
process, why i am asking is its bit problematic to add this before every 
function that calls from triggerer. Also users have their own custom triggers 
they have to update this before every function call.
   
   Just call somewhere within the current task that will make sync-or-async 
calls. `greenback.ensure_portal()` keeps track of registered tasks, so it can 
be called multiple times.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-28 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917855355

   > > [@x42005e1f](https://github.com/x42005e1f) please add your thoughts.
   > 
   > I do not think that making the function return type dependent on runtime 
is a good pattern. This approach is poorly compatible with static analyzers and 
it will also not work correctly in all cases. Instead, I suggest shifting the 
focus not on how to create greenlets, but on how to pass awaitable objects from 
them. Here is how it can be implemented with a ready-made example for 
`aiologic.Lock`:
   > 
   > from __future__ import annotations
   > 
   > import inspect
   > import sys
   > 
   > from collections.abc import Awaitable, Callable
   > from functools import wraps
   > from typing import TypeVar, cast
   > 
   > import aiologic
   > import anyio
   > import greenback
   > 
   > if sys.version_info >= (3, 10):
   > from typing import ParamSpec
   > else:
   > from typing_extensions import ParamSpec
   > 
   > _T = TypeVar("_T")
   > _P = ParamSpec("_P")
   > 
   > 
   > def sync_or_async(
   > sync_func: Callable[_P, _T],
   > async_func: Callable[_P, _T] | Callable[_P, Awaitable[_T]],
   > ) -> Callable[_P, _T]:
   > if inspect.iscoroutinefunction(async_func):
   > async_impl = greenback.autoawait(async_func)
   > else:
   > async_impl = async_func
   > 
   > async_impl = cast(Callable[_P, _T], async_impl)
   > 
   > @wraps(sync_func)
   > def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T:
   > if greenback.has_portal():
   > return async_impl(*args, **kwargs)
   > else:
   > return sync_func(*args, **kwargs)
   > 
   > return wrapper
   > 
   > 
   > class CommsLock(aiologic.Lock):
   > __slots__ = ()
   > 
   > async def _async_acquire_with_timeout(
   > self,
   > /,
   > *,
   > blocking: bool = True,
   > timeout: float | None = None,
   > ) -> bool:
   > with anyio.move_on_after(timeout):
   > return await self.async_acquire(blocking=blocking)
   > 
   > return False
   > 
   > green_acquire = sync_or_async(
   > aiologic.Lock.green_acquire,
   > _async_acquire_with_timeout,  # type: ignore[arg-type]
   > )
   > green_release = sync_or_async(
   > aiologic.Lock.green_release,
   > aiologic.Lock.async_release,
   > )
   > lock = CommsLock()
   > 
   > async def noop() -> None:
   > pass
   > 
   > async def holding() -> None:
   > async with lock:
   > await asyncio.sleep(0)  # switch back to the event loop
   > 
   > def acquire_release() -> None:
   > with lock:
   > pass  # do something
   > 
   > # make CommsLock implicitly awaitable for the current task
   > await greenback.ensure_portal()
   > 
   > # hold the lock with another task
   > holder = asyncio.create_task(holding())
   > await asyncio.sleep(0)
   > assert lock.locked()
   > 
   > # a task to verify that CommsLock does indeed yield to the event loop
   > task = asyncio.create_task(noop())
   > assert not task.done()
   > 
   > acquire_release()  # sync-or-async call
   > assert task.done()  # there was a context switch!
   
   can this `greenback.ensure_portal()` be opened on global level in trigger 
process, why i am asking is its bit problematic to add this before every 
function that calls from triggerer. Also users have their own custom triggers 
they have to update this before every function call. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-28 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917851474

   @x42005e1f Really appreciate your thoughts here and all the suggestions :)
   
Yes, I agree, having an async version would be a good way to avoid all 
these patches. If I remember correctly, the lock was introduced primarily to 
prevent mixing API communication messages with triggerer workloads.
   
   @kaxil  @ashb  It seems to me that having a separate channel for triggerer 
workloads and API communication would be ideal in this scenario to avoid 
deadlocks IMHO. Alternatively, implementing async versions of all relevant 
methods could also work. Or, as @x42005e1f suggested, we could adopt the method 
he mentioned—but as he noted, we’ll need to closely monitor its 
coroutine-safety.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-28 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917832050

   Well, I think the best solution is not to synchronize. If it is possible to 
rewrite communication so that messages go really atomically (or without 
collisions), it will get rid of all this abstruse headache.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-28 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917802668

   Also note that this greenlet approach requires interrupt support - 
synchronous functions that contain such sync-or-async calls at least indirectly 
(via subcalls) must be aware that they can be interrupted and called again 
(i.e. they must be coroutine-safe). Otherwise, the effect will be like a 
multithreaded function call, but with cooperative multitasking, the possibility 
of deadlocks when using primitives from the threading module, and broken 
`threading.local`.
   
   If it is possible to write asynchronous versions for functions that use 
`SUPERVISOR_COMMS.lock` with little effort, it is better to do so. If not, the 
above approach can be considered as a temporary or permanent solution 
(depending on whether you are willing to keep an eye on coroutine-safety).
   
   Note that selecting the first option (separate asynchronous functions) 
requires that there be such functions for each use case of 
`SUPERVISOR_COMMS.lock` via the asynchronous API. If an asynchronous function 
calls at least one synchronous function that directly or indirectly uses 
`SUPERVISOR_COMMS.lock`, this immediately risks deadlocks, possibly even 
hard-to-detect ones.
   
   This caveat is only true for calls within the same thread. If 
`SUPERVISOR_COMMS.lock` is used synchronously in a separate thread (e.g. via 
`sync_to_async()`), everything is fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-28 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917682923

   > [@x42005e1f](https://github.com/x42005e1f) please add your thoughts.
   
   I do not think that making the function return type dependent on runtime is 
a good pattern. This approach is poorly compatible with static analyzers and it 
will also not work correctly in all cases. Instead, I suggest shifting the 
focus not on how to create greenlets, but on how to pass awaitable objects from 
them. Here is how it can be implemented with a ready-made example for 
`aiologic.Lock`:
   
   ```python
   from __future__ import annotations
   
   import inspect
   import sys
   
   from collections.abc import Awaitable, Callable
   from functools import wraps
   from typing import TypeVar, cast
   
   import aiologic
   import greenback
   
   if sys.version_info >= (3, 10):
   from typing import ParamSpec
   else:
   from typing_extensions import ParamSpec
   
   _T = TypeVar("_T")
   _P = ParamSpec("_P")
   
   
   def sync_or_async(
   sync_func: Callable[_P, _T],
   async_func: Callable[_P, _T] | Callable[_P, Awaitable[_T]],
   ) -> Callable[_P, _T]:
   if inspect.iscoroutinefunction(async_func):
   async_impl = greenback.autoawait(async_func)
   else:
   async_impl = async_func
   
   async_impl = cast(Callable[_P, _T], async_impl)
   
   @wraps(sync_func)
   def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T:
   if greenback.has_portal():
   return async_impl(*args, **kwargs)
   else:
   return sync_func(*args, **kwargs)
   
   return wrapper
   
   
   class CommsLock(aiologic.Lock):
   __slots__ = ()
   
   async def _async_acquire_with_timeout(
   self,
   /,
   *,
   blocking: bool = True,
   timeout: float | None = None,
   ) -> bool:
   return await self.async_acquire(blocking=blocking)
   
   green_acquire = sync_or_async(
   aiologic.Lock.green_acquire,
   _async_acquire_with_timeout,  # type: ignore[arg-type]
   )
   green_release = sync_or_async(
   aiologic.Lock.green_release,
   aiologic.Lock.async_release,
   )
   ```
   
   ```python
   lock = CommsLock()
   
   async def noop() -> None:
   pass
   
   async def holding() -> None:
   async with lock:
   await asyncio.sleep(0)  # switch back to the event loop
   
   def acquire_release() -> None:
   with lock:
   pass  # do something
   
   # make CommsLock implicitly awaitable for the current task
   await greenback.ensure_portal()
   
   # hold the lock with another task
   holder = asyncio.create_task(holding())
   await asyncio.sleep(0)
   assert lock.locked()
   
   # a task to verify that CommsLock does indeed yield to the event loop
   task = asyncio.create_task(noop())
   assert not task.done()
   
   acquire_release()  # sync-or-async call
   assert task.done()  # there was a context switch!
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-28 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917340184

   @kaxil @uranusjr considering the above all discussion this leaves me i guess 
two options IMHO. Whats your suggestion or thoughts ?
   1. Create async version of all the comms methods that access via triggerer
   2.  Use something like [greenback](https://github.com/oremanj/greenback) , 
execute the sync version inside greenback portal. this we no need to have copy 
of async version.
   
   For option two i am thinking something like this below we can have 
decorators on the methods. and if the comms methods called from  async function 
then these executes in greenback portal. for normal sync invocation it works 
without any issues. 
   
   ```
   lock = aiologic.Lock()
   
   global is_async_process
   is_async_process = False
   
   def sync_or_async(func):
   @functools.wraps(func)
   def wrapper(*args, **kwargs):
   if is_async_process:
   asyncio.get_running_loop()
   async def async_wrapper():
   return await greenback.with_portal_run_sync(func, *args, 
**kwargs)
   return async_wrapper()
   else:
   return func(*args, **kwargs)
   
   return wrapper
   
   
   @sync_or_async
   def sample_func(a, b):
   with lock:
   return a + b
   
   async def async_main():
   global is_async_process
   is_async_process = True
   await sample_func(4, 4)
   
   
   if __name__ == "__main__":
   sample_func(1, 2)
   asyncio.run(async_main())
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-28 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2917225014

   > > Looks like this package is not maintained or not sure, no activity since 
last Feb 2024.
   > 
   > It is not always possible to tell from the commit activity of such 
packages whether they are maintained or not. Sometimes they are not updated 
just because there are no serious issues, in which case the author can focus on 
other projects. Even more ambiguous is the situation when tests are regularly 
run for new versions of dependencies and a lockfile is updated - in this case 
the repository may look alive with a huge number of commits, but in fact it has 
not been updated for a very long time, having very few commits in the source 
code for the whole history.
   > 
   > There are also alternatives such as 
[greenletio](https://github.com/miguelgrinberg/greenletio) or 
[awaitlet](https://github.com/sqlalchemy/awaitlet). But if you want, you can 
implement something similar on your side - for example, SQLAlchemy has long 
used [its own 
module](https://github.com/sqlalchemy/sqlalchemy/blob/05b2442132d5ae31cfcc7a1fe95e0f6b739aa995/lib/sqlalchemy/util/concurrency.py)
 as part of asynchronous API implementation.
   > 
   > These solutions have some disadvantages associated with stack growth, but 
they are usually insignificant. A more comprehensive solution is to implement 
(generators and) coroutines via greenlets - what I called (genlets and) 
corolets 3 years ago (for a non-public tutorial on asynchronous library 
design). But it is redundant for this task.
   
   agree :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-27 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2913968644

   > Looks like this package is not maintained or not sure, no activity since 
last Feb 2024.
   
   It is not always possible to tell from the commit activity of such packages 
whether they are maintained or not. Sometimes they are not updated just because 
there are no serious issues, in which case the author can focus on other 
projects. Even more ambiguous is the situation when tests are regularly run for 
new versions of dependencies and a lockfile is updated - in this case the 
repository may look alive with a huge number of commits, but in fact it has not 
been updated for a very long time, having very few commits in the source code 
for the whole history.
   
   There are also alternatives such as 
[greenletio](https://github.com/miguelgrinberg/greenletio) or 
[awaitlet](https://github.com/sqlalchemy/awaitlet). But if you want, you can 
implement something similar on your side - for example, SQLAlchemy has long 
used [its own 
module](https://github.com/sqlalchemy/sqlalchemy/blob/05b2442132d5ae31cfcc7a1fe95e0f6b739aa995/lib/sqlalchemy/util/concurrency.py)
 as part of asynchronous API implementation.
   
   These solutions have some disadvantages associated with stack growth, but 
they are usually insignificant. A more comprehensive solution is to implement 
(generators and) coroutines via greenlets - what I called (genlets and) 
corolets 3 years ago (for a non-public tutorial on asynchronous library 
design). But it is redundant for this task.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-27 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2913817328

   > If the problem is the same as what I described (which is likely), the 
(relatively) easiest way to solve it is to use libraries like 
[greenback](https://github.com/oremanj/greenback) to execute an asynchronous 
version when the code runs in a thread with an event loop. Requires explicit 
portal creation, greenlet support, and interrupt support on the synchronous 
code side.
   > 
   > It is also possible to create asynchronous versions of functions to use 
directly, or delegate them to execute sequentially in the worker thread via 
`sync_to_async()`, but this can be difficult, especially since 
[`SUPERVISOR_COMMS.lock` is used even in 
`__getitem__()`](https://github.com/apache/airflow/blob/7ebba78236b945e0ce569607480f397e5f2e58ba/task-sdk/src/airflow/sdk/execution_time/lazy_sequence.py#L147).
   > 
   > > BTW GLock where can i find this package i dont see it on PyPI? i will 
try with this
   > 
   > You can directly copy the code from gist and make the mentioned change. 
GLock [was created to provide readers-writer 
locks](https://github.com/x42005e1f/aiologic/discussions/6) before Grouper 
appears in a future version of aiologic. Right now my efforts are focused on 
moving aiologic to the beta development stage, so releases are currently 
delayed.
   > 
   > But be aware that if you choose GLock, you will have to deal with the loss 
of coroutine-safety: in particular, the asynchronous usage you mentioned will 
be executed in one thread at a time, but will also be able to be executed in 
different tasks at the same time.
   
   @x42005e1f Thats a good suggestion, greenback seems working, did some tests 
and everything seems fine. no triggers blocked. 
   
   Looks like this package is not maintained or not sure, no activity since 
last Feb 2024.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-27 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2912697685

   If the problem is the same as what I described (which is likely), the 
easiest way to solve it is to use libraries like 
[greenback](https://github.com/oremanj/greenback) to execute an asynchronous 
version when the code runs in a thread with an event loop. Requires explicit 
portal creation, greenlet support, and interrupt support on the synchronous 
code side.
   
   It is also possible to create asynchronous versions of functions to use 
directly, or delegate them to execute sequentially in the worker thread via 
`sync_to_async()`, but this can be difficult, especially since 
[`SUPERVISOR_COMMS.lock` is used even in 
`__getitem__()`](https://github.com/apache/airflow/blob/7ebba78236b945e0ce569607480f397e5f2e58ba/task-sdk/src/airflow/sdk/execution_time/lazy_sequence.py#L147).
   
   > BTW GLock where can i find this package i dont see it on PyPI? i will try 
with this
   
   You can directly copy the code from gist and make the mentioned change. 
GLock [was created to provide readers-writer 
locks](https://github.com/x42005e1f/aiologic/discussions/6) before Grouper 
appears in a future version of aiologic. Right now my efforts are focused on 
moving aiologic to the beta development stage, so releases are currently 
delayed.
   
   But be aware that if you choose GLock, you will have to deal with the loss 
of coroutine-safety: in particular, the asynchronous usage you mentioned will 
be executed in one thread at a time, but will also be able to be executed in 
different tasks at the same time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-27 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2912628296

   > Also note that the different APIs that aiologic provides should not be 
mixed in the same thread if threading is used for the green (sync) API. This is 
a fundamental problem due to the fact that primitives work at the task level: 
`lock.green_acquire()` can block the event loop when 
`aiologic.lowlevel.current_green_library() == "threading"`, and as a result 
lock will never be released. This is well demonstrated in the following example:
   > 
   > example.py
   > This is the same as using `threading.Lock` along with asyncio tasks. If 
you have such calls somewhere, it can be the cause of deadlocks.
   > 
   > This problem of mixing two APIs in the same thread can be solved by using 
[GLock](https://gist.github.com/x42005e1f/a50d0744013b7bbbd7ded608d6a3845b), 
which can be a thread-level lock - if you use the read-preferring 
readers-writer locks condition (`self.group is None or self.group == group`), 
its APIs can safely mix (just replace `lock = aiologic.Lock()` with `lock = 
GLock(default_group_factory=current_thread_ident)` and that example will work 
without hanging). However, since it would no longer be a task level, it would 
not be coroutine-safe.
   
   looks like we are using that similar way some places. this is async one 
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/triggerer_job_runner.py#L965
 and in other places we are using like `with SUPERVISOR_COMMS.lock:`  
   
   BTW GLock where can i find this package i dont see it on PyPI? i will try 
with this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-27 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2912414727

   Also note that the different APIs that aiologic provides should not be mixed 
in the same thread if threading is used for the green (sync) API. This is a 
fundamental problem due to the fact that primitives work at the task level: 
`lock.green_acquire()` can block the event loop when 
`aiologic.lowlevel.current_green_library() == "threading"`, and as a result 
lock will never be released. This is well demonstrated in the following example:
   
   
   example.py
   
   ```python
   import asyncio
   import time
   
   import aiologic
   
   
   async def main():
   lock = aiologic.Lock()
   
   async def a():
   async with lock:
   print("before")
   await asyncio.sleep(0)
   print("after")  # will never be printed!
   
   async def b():
   with lock:
   time.sleep(0)
   
   await asyncio.gather(a(), b())
   
   
   if __name__ == "__main__":
   asyncio.run(main())
   ```
   
   
   
   This is the same as using `threading.Lock` along with asyncio tasks. If you 
have such calls somewhere, it can be the cause of deadlocks.
   
   This problem of mixing two APIs in the same thread can be solved by using 
[GLock](https://gist.github.com/x42005e1f/a50d0744013b7bbbd7ded608d6a3845b), 
which can be a thread-level lock - if you use the read-preferring 
readers-writer locks condition, its APIs can safely mix. However, since it 
would no longer be a task level, it would not be coroutine-safe.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-27 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2912041108

   @x42005e1f can i have your example you tested? In our trigger process we use 
lock inside sync_to_async.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-27 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2912116437

   > [@x42005e1f](https://github.com/x42005e1f) can i have your example you 
tested? In our trigger process we use lock inside sync_to_async.
   
   Yes, here is a simple mixed test:
   
   ```python
   import asyncio
   import sys
   
   from concurrent.futures import ThreadPoolExecutor, as_completed
   
   import aiologic
   
   from asgiref.sync import sync_to_async
   
   if sys.version_info >= (3, 11):
   WaitTimeout = TimeoutError
   else:
   from concurrent.futures import TimeoutError as WaitTimeout
   
   TIMEOUT = 6.0  # must be reached (success)
   THREADS = 100
   TASKS = 100
   
   lock = aiologic.Lock()
   try:
   stopped = aiologic.Flag()
   except AttributeError:  # aiologic<0.15.0
   stopped = aiologic.lowlevel.Flag()
   
   
   def sync_acquire_release():
   with lock:
   pass
   
   
   async def async_acquire_release(i):
   if i % 2:
   while True:
   await sync_to_async(sync_acquire_release)()
   
   if stopped:
   break
   else:
   while True:
   async with lock:
   await asyncio.sleep(0)
   
   if stopped:
   break
   
   
   async def hub():
   await asyncio.gather(*(
   asyncio.create_task(async_acquire_release(i))
   for i in range(TASKS)
   ))
   
   
   if __name__ == "__main__":
   with ThreadPoolExecutor(THREADS) as executor:
   interval = sys.getswitchinterval()
   sys.setswitchinterval(min(1e-6, interval))
   
   try:
   futures = [
   executor.submit(asyncio.run, hub())
   for _ in range(THREADS)
   ]
   
   for future in as_completed(futures, timeout=TIMEOUT):
   future.result()  # reraise
   except WaitTimeout:
   pass
   finally:
   sys.setswitchinterval(interval)
   stopped.set()
   ```
   
   But note that `sync_to_async()` uses only one worker thread by default, 
which blocks all further calls until the first one is completed. This can be 
seen, for example, in this test:
   
   ```python
   import asyncio
   import sys
   import threading
   
   from asgiref.sync import sync_to_async
   
   
   async def main():
   loop = asyncio.get_running_loop()
   first = asyncio.Event()
   second = threading.Event()
   
   def a():
   print("'a' started!")
   loop.call_soon_threadsafe(first.set)
   second.wait()
   
   def b():
   print("'b' started!")  # will never be printed!
   second.set()
   
   f1 = asyncio.create_task(sync_to_async(a)())
   await first.wait()
   print("nofitied!")
   f2 = asyncio.create_task(sync_to_async(b)())
   
   await asyncio.gather(f1, f2)
   
   
   if __name__ == "__main__":
   asyncio.run(main())
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-27 Thread via GitHub


x42005e1f commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2912014652

   @ArvidMartensRenson, I would like to clarify: do threads get stuck when 
trying to acquire the lock when some other thread is already performing some 
actions holding the lock, or do they get stuck for no reason? I ask because in 
the second case it would mean that the problem is most likely on my side - in 
which case I would consider it a critical bug and release a fix version. 
However, I tested the lock with `sync_to_async()` yesterday right after the PR 
appeared and was unable to reproduce the problem in isolated tests (with both 
stable and the latest version of aiologic).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-26 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2910223518

   @ArvidMartensRenson https://github.com/apache/airflow/pull/51085 would be 
helpful if you test this change please?
   
   i ran with aroud 400 triggerers and dont see any locks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-25 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2908470186

   Looks like the lock inside sync_to_async function causing issues, its likely 
messing up with threads acquired in side the sync_to_async. We use 
get_ti_count, get_task_states, get_dr_count from the `RuntimeTaskInstance`. 
likely this is not a good choice.
   
   I have Moved those functions to async version without using sync_to_async 
and working fine, did some stress test around 400 triggers nothing locked 
everything running fine.
   
   @ArvidMartensRenson we may not need async client. i will raise changes for 
moving the functions to async, please test it once i raised? async client type 
also would be good , but in this case i dont see any need. its working without 
async client. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-23 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2906002321

   > I created an async client from the sync client, using httpx.AsyncClient as 
the base (like httpx.Client is for the sync client). Inherited the init client 
from the sync task client. Only problem I had was that the operations had to be 
rewritten as an async method, so I copied the code from the sync client and 
changed the method definition to async. I found this implementation nicer than 
using plain api calls with the async httpx client.
   > 
   > Traffic is over localhost so even then a token is required, I reused the 
methods that are used in the creation of the task workload to get the token. I 
considered dropping the token requirement but it seemed safer and whenever the 
trigger service and api server are separated it will be needed.
   > 
   > I implemented this solution last Thursday and for now the trigger service 
has not been stuck and the external workload triggers all completed. So for now 
I would conclude that the solution works.
   > 
   > If it is desirable to have an async client I am happy to help on creating 
the client, however I am a little worried with duplicate code I have now, same 
code for the sync and async client. If someone knows a solution for this or a 
way to handle that problem, that would be nice.
   
   I don't see any issues with using an async client, it's part of the 
triggerer, and most triggers are already asynchronous. I do agree that there's 
some code duplication. Please feel free to submit your approach; it’ll be great 
to have more eyes on it and gather suggestions to refine the solution. :) 
   
   > 
   > For the record, I am using a custom implementation of the workload 
trigger. I was an early adopter of the trigger paradigm and the external 
workload trigger did not exist at that time, so we had to implement our own 
trigger. We never came around converting to the standard provider 
implementation.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-23 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2905998488

   Thanks @ArvidMartensRenson that explains me what's happening. During 
debugging, I noticed that requests are being sent, but no responses are being 
returned. Since we use a lock when sending requests, the absence of a response 
causes the lock to remain active, which in turn blocks any further triggers 
from being processed.
   
   Will check on async client part over this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-19 Thread via GitHub


ArvidMartensRenson commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2891834967

   I created an async client from the sync client, using httpx.AsyncClient as 
the base (like httpx.Client is for the sync client). Inherited the init client 
from the sync task client. Only problem I had was that the operations had to be 
rewritten as an async method, so I copied the code from the sync client and 
changed the method definition to async. I found this implementation nicer than 
using plain api calls with the async httpx client. 
   
   Traffic is over localhost so even then a token is required, I reused the 
methods that are used in the creation of the task workload to get the token. I 
considered dropping the token requirement but it seemed safer and whenever the 
trigger service and api server are separated it will be needed.
   
   I implemented this solution last Thursday and for now the trigger service 
has not been stuck and the external workload triggers all completed. So for now 
I would conclude that the solution works.
   
   If it is desirable to have an async client I am happy to help on creating 
the client, however I am a little worried with duplicate code I have now, same 
code for the sync and async client. If someone knows a solution for this or a 
way to handle that problem, that would be nice. 
   
   For the record, I am using a custom implementation of the workload trigger. 
I was an early adopter of the trigger paradigm and the external workload 
trigger did not exist at that time, so we had to implement our own trigger. We 
never came around converting to the standard provider implementation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-19 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2891075878

   > For now I solved througt the creation of an async Task SDK client which 
the triggers use directly. In my setup the triggers always have the api server 
running on the same instances, hence I could use localhost traffic. In this way 
it is rather similar like the in-process traffic but I do not overload the data 
pipe between trigger runner and supervisor. From my investigation it seems that 
using too much external workload triggers (count calls) together with the 
supervisor-runner syncs congests that pipe in such a way that all calls were 
blocked indefinitely resulting in a trigger process that seems alive 
(heartbeats still worked) but in reality it is stuck.
   > 
   > Probably this is not the designed pattern, but for now this solution makes 
it possible for me to role out airflow 3.0 at our office.
   
   @ArvidMartensRenson we dont have async client in Task SDK, did you overwrite 
the Triggerer client method, does it fixed? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-19 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2889946586

   I am able to re produce it with one of example ExternalTaskSensor dags. It 
looks blocking somewhere and triggers are not getting to added to further 
process.
   
   Need to workout why it's blocking though we use async lock. Will check and 
update.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-18 Thread via GitHub


ArvidMartensRenson commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2889837087

   For now I solved througt the creation of an async Task SDK client which the 
triggers use directly. In my setup the triggers always have the api server 
running on the same instances, hence I could use localhost traffic. In this way 
it is rather similar like the in-process traffic but I do not overload the data 
pipe between trigger runner and supervisor. From my investigation it seems that 
using too much external workload triggers (count calls) together with the 
supervisor-runner syncs congests that pipe in such a way that all calls were 
blocked indefinitely resulting in a trigger process that seems alive 
(heartbeats still worked) but in reality it is stuck.
   
   Probably this is not the designed pattern, but for now this solution makes 
it possible for me to role out airflow 3.0 at our office.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-18 Thread via GitHub


vatsrahul1001 commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2889660176

   @gopidesupavan are you still looking into this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-12 Thread via GitHub


gopidesupavan commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2872703606

   Yeah will look at this today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Trigger runner process locked with multiple Workflow triggers [airflow]

2025-05-06 Thread via GitHub


uranusjr commented on issue #50185:
URL: https://github.com/apache/airflow/issues/50185#issuecomment-2857139721

   Is it possible to write a dag that can reliably reproduce this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org