kajocina opened a new issue, #581:
URL: https://github.com/apache/burr/issues/581
The streaming state machine when having a raised error inside one of the
actions doesn't have a way of gracefully shutting down the stream in the
"finally" clause of try/except/finally block.
## Steps to replicate behavior
This is the self-contained snipper to reproduce the behaviour. It's a
FastAPI endpoint which you can run with python and then curl it. The code
raises an error suddenly when the counter is equal 5, but you can comment it
out to see that it should finish without this: `curl: (18) transfer closed with
outstanding read data remaining`
Python sippet:
```
import datetime
from typing import AsyncGenerator
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import uvicorn
from burr.core import State, ApplicationBuilder, expr
from burr.core.action import streaming_action
app = FastAPI(title="Bug Report Demo")
@streaming_action(reads=["counter"], writes=["counter"])
async def increment(state: State):
"""Mock action for report - increment the counter by 1"""
current_count = state["counter"]
exception_occurred = False
try:
if current_count == 5:
raise ValueError("Raising to show some exception from some
function in the action!")
except ValueError:
yield {"response": "error! some helpful info for the frontend..."},
None
exception_occurred = True
raise
finally:
if exception_occurred:
yield {"response": "gracefully stopping the stream..."}, None
yield {}, state
current_count += 1
print("Count: ", current_count)
yield {"response": "increment"}, None
yield {}, state.update(counter=current_count)
@streaming_action(reads=["counter"], writes=[])
async def exit_counter(state: State):
"""Print the current count and the current time"""
current_count = state["counter"]
print(f"Finished counting to {current_count} at
{datetime.datetime.now():%H:%M:%S %Y-%m-%d}")
yield {"response": "exit_counter"}, None
yield {}, state
async def build_app():
"""Build the state machine application"""
return await (
ApplicationBuilder()
.with_actions(increment, exit_counter)
.with_transitions(
("increment", "increment", expr("counter < 10")),
("increment", "exit_counter"),
)
.with_state(counter=0)
.with_entrypoint("increment")
.abuild()
)
async def generate_stream() -> AsyncGenerator[str, None]:
"""Generate the stream of responses"""
app = await build_app()
async for action, streaming_container in app.astream_iterate(
inputs={},
halt_after=["exit_counter"],
):
async for item in streaming_container:
yield str(item) + "\n\n"
@app.get("/stream")
async def stream_endpoint():
"""Endpoint to demonstrate the bug report with streaming response"""
return StreamingResponse(
generate_stream(),
media_type="text/plain",
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
```
## Stack Traces
Curl output:
`curl -N http://localhost:8000/stream
{'response': 'increment'}
{'response': 'increment'}
{'response': 'increment'}
{'response': 'increment'}
{'response': 'increment'}
{'response': 'error! some helpful info for the frontend...'}
{'response': 'gracefully stopping the stream...'}
curl: (18) transfer closed with outstanding read data remaining`
# Expected behavior
I would expect the ability to gracefully close the stream i.e. stream back
the error contents to the frontend (or any other consumer) after raising the
Exception and then closing the burr operation by yielding the State instead of
None. It seems now it's not really possible because the last yield in finally
"gracefully stopping the stream..." is the last thing that is ever ran.
It would perhaps be good to be able to yield a textual response while
closing the stream with:
`yield response_with_error_info, state`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]