Hi Ganesh
got some setback....
I am getting a 'Failure: builtins.tuple: (<class 'ValueError'>,
ValueError('Metric namespace must be non-empty'), <traceback object at
0x00000255D2BDEFC0>)'
Here's my handler.. am i missing something
class CloudRunAgentHandler(RemoteModelHandler):
def __init__(self, app_url: str, app_name: str, user_id: str,
metric_namespace:str):
# We initialize with a model_id to ensure the metrics
namespace is populated
self._model_id = f"CloudRun_{app_name}"
super().__init__()
self.app_url = app_url
self.app_name = app_name
self.user_id = user_id
self.metric_namespace = metric_namespace
def share_model_across_processes(self) -> bool:
return True
def get_metrics_namespace(self) -> str:
# This MUST return a non-empty string
return self.metric_namespace
def create_client(self) -> httpx.AsyncClient:
return httpx.AsyncClient(timeout=60.0)
def _get_token(self) -> str:
auth_req = google.auth.transport.requests.Request()
return id_token.fetch_id_token(auth_req, self.app_url)
async def request(
self,
item: str,
client: httpx.AsyncClient,
inference_args: Optional[Dict[str, Any]] = None
) -> PredictionResult:
token = self._get_token()
headers = {"Authorization": f"Bearer {token}", "Content-Type":
"application/json"}
run_data = {
"app_name": self.app_name,
"user_id": self.user_id,
"session_id": f"beam_task_{hash(item)}",
"new_message": {"role": "user", "parts": [{"text": item}]},
"streaming": False
}
response = await client.post(f"{self.app_url}/run_sse",
headers=headers, json=run_data)
raw_text = response.text.strip()
data_lines = [l for l in raw_text.split('\n') if
l.strip().startswith("data:")]
if data_lines:
try:
import json
last_json = json.loads(data_lines[-1][5:])
final_text = last_json.get('content', {}).get('parts',
[{}])[0].get('text', '')
return PredictionResult(example=item, inference=final_text)
except Exception as e:
return PredictionResult(example=item, inference=f"Error: {e}")
return PredictionResult(example=item, inference="No Data")
def test_cloudagent(self):
from apache_beam.ml.inference.base import RunInference, PredictionResult
agent_handler = CloudRunAgentHandler(
app_url="xxxxxxxxxx",
app_name="stock_agent",
user_id="user_123",
metric_namespace="stock_agent_inference"
)
sink = beam.Map(print)
with TestPipeline(options=PipelineOptions()) as pipeline:
(pipeline | 'Sourcinig prompt' >> beam.Create(
["Run a technical analysis for today's stock picks and
give me your recommendations"])
| 'ClouodagentRun' >> RunInference(agent_handler)
| sink
)
kind regards
Marco
On Mon, Jan 5, 2026 at 10:06 AM Marc _ <[email protected]> wrote:
> ye no worry.... it runs only on my project. i will re deploy service
>
>
> On Mon, Jan 5, 2026 at 9:19 AM Ganesh Sivakumar <
> [email protected]> wrote:
>
>> Hey, it's not safe to expose your cloud run url in public forums. Your
>> code snippet had hardcoded url.
>>
>> On Mon, 5 Jan, 2026, 1:47 pm Marc _, <[email protected]> wrote:
>>
>>> Thanks a lot Ganesh! I'll have a look and report back if stuck
>>> Kind regards
>>>
>>> On Mon, Jan 5, 2026 at 8:11 AM Ganesh Sivakumar <
>>> [email protected]> wrote:
>>>
>>>> Hey Marco, best pattern for your use case would be to separate data
>>>> processing and inference(invoking your agent). implement a custom `
>>>> RemoteModelHandler
>>>> <https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RemoteModelHandler>`
>>>> for calling the agent and use it with RunInference transform. Pipeline like
>>>> Read(stocks data) ---> RunInference(your agent handler) --> Sink.
>>>>
>>>> Thanks,
>>>> Ganesh.
>>>>
>>>> On Mon, Jan 5, 2026 at 1:04 PM Marc _ <[email protected]> wrote:
>>>>
>>>>> Hi all
>>>>> i have an ADK agent running on CloudRun and wanted to invoke it via
>>>>> dataflow
>>>>> I have the following pipeline and DoFn but i am getting this exception
>>>>>
>>>>> Anyone could advise?
>>>>>
>>>>> Kind regards
>>>>> Marco
>>>>>
>>>>> EOF when reading a line [while running 'ClouodagentRun-ptransform-32']
>>>>> Traceback (most recent call last): File "apache_beam/runners/common.py",
>>>>> line 1498, in apache_beam.runners.common.DoFnRunner.process File
>>>>> "apache_beam/runners/common.py", line 685, in
>>>>> apache_beam.runners.common.SimpleInvoker.invoke_process File
>>>>> "/template/shareloader/modules/obb_utils.py", line 726, in process return
>>>>> runner.run(self.amain(element)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
>>>>> "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run return
>>>>> self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>>>>> File "/usr/local/lib/python3.11/asyncio/base_events.py", line 654, in
>>>>> run_until_complete return future.result() ^^^^^^^^^^^^^^^ File
>>>>> "/template/shareloader/modules/obb_utils.py", line 713, in amain await
>>>>> self.chat(client, self.SESSION_ID) File
>>>>> "/template/shareloader/modules/obb_utils.py", line 690, in chat raise e
>>>>> File "/template/shareloader/modules/obb_utils.py", line 683, in chat
>>>>> user_input = await asyncio.to_thread(input, f"[{self.USER_ID}]: ")
>>>>> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
>>>>> "/usr/local/lib/python3.11/asyncio/threads.py", line 25, in to_thread
>>>>> return await loop.run_in_executor(None, func_call)
>>>>> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
>>>>> "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
>>>>> result = self.fn(*self.args, **self.kwargs)
>>>>> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ EOFError: EOF when reading a line
>>>>>
>>>>> def run_gcloud_agent(pipeline, debugSink):
>>>>> from shareloader.modules.obb_utils import AsyncCloudRunAgent
>>>>> (pipeline | 'Sourcinig prompt' >> beam.Create(["Run a technical
>>>>> analysis for today's stock picks and give me your recommendations"])
>>>>> | 'ClouodagentRun' >> beam.ParDo(AsyncCloudRunAgent())
>>>>> | debugSink
>>>>> )
>>>>>
>>>>>
>>>>> class AsyncCloudRunAgent(AsyncProcess):
>>>>>
>>>>> def __init__(self):
>>>>> # --- Configuration (Dynamic) ---
>>>>> self.APP_URL =
>>>>> "https://stock-agent-service-682143946483.us-central1.run.app"
>>>>> self.USER_ID = "user_123"
>>>>> # Generate a single session ID for the entire conversation loop
>>>>> self.SESSION_ID =
>>>>> f"session_{datetime.now().strftime('%Y%m%d%H%M%S')}"
>>>>> self.APP_NAME = "stock_agent"
>>>>>
>>>>>
>>>>>
>>>>> # --- Authentication Function (ASYNC) ---
>>>>>
>>>>> async def get_auth_token(self) -> str:
>>>>> """
>>>>> Programmatically fetches an ID token for the Cloud Run service
>>>>> audience.
>>>>> 'target_audience' should be the URL of your Cloud Run service.
>>>>> """
>>>>> # Run the synchronous google-auth call in a thread to keep it
>>>>> async-friendly
>>>>> loop = asyncio.get_event_loop()
>>>>>
>>>>> def fetch_token():
>>>>> auth_req = google.auth.transport.requests.Request()
>>>>> # This automatically uses the Dataflow Worker Service Account
>>>>> return id_token.fetch_id_token(auth_req, self.APP_URL)
>>>>>
>>>>> try:
>>>>> token = await loop.run_in_executor(None, fetch_token)
>>>>> return token
>>>>> except Exception as e:
>>>>> raise RuntimeError(f"Failed to fetch ID token: {e}")
>>>>> # --- API Interaction Functions (ASYNC) ---
>>>>>
>>>>> async def make_request(self, client: httpx.AsyncClient, method: str,
>>>>> endpoint: str, data: Dict[str, Any] = None) -> httpx.Response:
>>>>> """Helper function for authenticated asynchronous requests using
>>>>> httpx."""
>>>>> token = await self.get_auth_token()
>>>>> headers = {
>>>>> "Authorization": f"Bearer {token}",
>>>>> "Content-Type": "application/json"
>>>>> }
>>>>> url = f"{self.APP_URL}{endpoint}"
>>>>>
>>>>> try:
>>>>> if method.upper() == 'POST':
>>>>> response = await client.post(url, headers=headers,
>>>>> json=data)
>>>>> elif method.upper() == 'DELETE':
>>>>> response = await client.delete(url, headers=headers)
>>>>> else:
>>>>> raise ValueError(f"Unsupported method: {method}")
>>>>>
>>>>> response.raise_for_status()
>>>>> return response
>>>>> except httpx.HTTPStatusError as errh:
>>>>> print(f"\n❌ **HTTP ERROR:** Status {response.status_code} for
>>>>> {url}")
>>>>> print(f"❌ **Server Response (Raw):**\n{response.text}")
>>>>> raise
>>>>> except httpx.RequestError as err:
>>>>> print(f"\n❌ An unexpected request error occurred: {err}")
>>>>> raise
>>>>>
>>>>> async def run_agent_request(self, client: httpx.AsyncClient,
>>>>> session_id: str, message: str):
>>>>> """Executes a single POST request to the /run_sse endpoint."""
>>>>>
>>>>> print(f"\n[User] -> Sending message: '{message}'")
>>>>>
>>>>> run_data = {
>>>>> "app_name": self.APP_NAME,
>>>>> "user_id": self.USER_ID,
>>>>> "session_id": session_id,
>>>>> "new_message": {"role": "user", "parts": [{"text": message}]},
>>>>> "streaming": False
>>>>> }
>>>>>
>>>>> try:
>>>>> response = await self.make_request(client, "POST",
>>>>> "/run_sse", data=run_data)
>>>>> current_status = response.status_code
>>>>> # print(f"**Request Status Code:** {current_status}")
>>>>>
>>>>> raw_text = response.text.strip()
>>>>>
>>>>> # Multi-line SSE parsing logic
>>>>> data_lines = [
>>>>> line.strip()
>>>>> for line in raw_text.split('\n')
>>>>> if line.strip().startswith("data:")
>>>>> ]
>>>>>
>>>>> if not data_lines:
>>>>> raise json.JSONDecodeError("No 'data:' lines found in 200
>>>>> response.", raw_text, 0)
>>>>>
>>>>> last_data_line = data_lines[-1]
>>>>> json_payload = last_data_line[len("data:"):].strip()
>>>>> agent_response = json.loads(json_payload)
>>>>>
>>>>> # Extract the final text
>>>>> final_text = agent_response.get('content', {}).get('parts',
>>>>> [{}])[0].get('text', 'Agent response structure not recognized.')
>>>>>
>>>>> print(f"[Agent] -> {final_text}")
>>>>>
>>>>> except json.JSONDecodeError as e:
>>>>> print(f"\n🚨 **JSON PARSING FAILED**!")
>>>>> print(f" Error: {e}")
>>>>> print(" --- RAW SERVER CONTENT ---")
>>>>> print(raw_text)
>>>>> print(" --------------------------")
>>>>>
>>>>> except Exception as e:
>>>>> print(f"❌ Agent request failed: {e}")
>>>>>
>>>>> # --- Interactive Chat Loop ---
>>>>>
>>>>> async def chat(self, client: httpx.AsyncClient, session_id: str):
>>>>> """Runs the main conversation loop, handling user input
>>>>> asynchronously."""
>>>>> print("--- 💬 Start Chatting ---")
>>>>>
>>>>> try:
>>>>> # Use asyncio.to_thread to run blocking input() without
>>>>> freezing the event loop
>>>>> user_input = await asyncio.to_thread(input,
>>>>> f"[{self.USER_ID}]: ")
>>>>>
>>>>> # Send the message to the agent
>>>>> await self.run_agent_request(client, session_id, user_input)
>>>>>
>>>>> except Exception as e:
>>>>> print(f"An unexpected error occurred in the loop: {e}")
>>>>> raise e
>>>>>
>>>>> # --- Main Logic (ASYNC) ---
>>>>>
>>>>> async def amain(self, element):
>>>>> """Main asynchronous function to set up the session and start the
>>>>> loop."""
>>>>> print(f"\n🤖 Starting Interactive Client with Session ID:
>>>>> **{self.SESSION_ID}**")
>>>>> session_data = {"state": {"preferred_language": "English",
>>>>> "visit_count": 5}}
>>>>> current_session_endpoint =
>>>>> f"/apps/{self.APP_NAME}/users/{self.USER_ID}/sessions/{self.SESSION_ID}"
>>>>>
>>>>> # httpx.AsyncClient is used as a context manager to manage
>>>>> connections
>>>>> async with httpx.AsyncClient(timeout=30.0) as client:
>>>>>
>>>>> # 1. Create Session
>>>>> print("\n## 1. Creating Session")
>>>>> try:
>>>>> await self.make_request(client, "POST",
>>>>> current_session_endpoint, data=session_data)
>>>>> print(f"✅ Session created successfully. Status 200.")
>>>>> except Exception as e:
>>>>> print(f"❌ Could not start session: {e}")
>>>>> return
>>>>>
>>>>> # 2. Start the Interactive Loop
>>>>> await self.chat(client, self.SESSION_ID)
>>>>>
>>>>> # 3. Cleanup: Delete Session (Best Practice)
>>>>> print(f"\n## 3. Deleting Session: {self.SESSION_ID}")
>>>>> try:
>>>>> await self.make_request(client, "DELETE",
>>>>> current_session_endpoint)
>>>>> print("✅ Session deleted successfully.")
>>>>> except Exception as e:
>>>>> print(f"⚠️ Warning: Failed to delete session. {e}")
>>>>>
>>>>> def process(self, element: str):
>>>>> logging.info(f'Input elements:{element}')
>>>>> with asyncio.Runner() as runner:
>>>>> return runner.run(self.amain(element))
>>>>>
>>>>>
>>>>>