Hi Marco, try passing metric namespace directly to super constructor - super().__init__(namespace="stock_agent_inference") Thanks, Ganesh.
On Wed, Jan 14, 2026 at 12:46 PM Marc _ <[email protected]> wrote: > Hi Ganesh > got some setback.... > I am getting a 'Failure: builtins.tuple: (<class 'ValueError'>, > ValueError('Metric namespace must be non-empty'), <traceback object at > 0x00000255D2BDEFC0>)' > > Here's my handler.. am i missing something > > class CloudRunAgentHandler(RemoteModelHandler): > def __init__(self, app_url: str, app_name: str, user_id: str, > metric_namespace:str): > # We initialize with a model_id to ensure the metrics namespace is > populated > self._model_id = f"CloudRun_{app_name}" > super().__init__() > self.app_url = app_url > self.app_name = app_name > self.user_id = user_id > self.metric_namespace = metric_namespace > > def share_model_across_processes(self) -> bool: > return True > > def get_metrics_namespace(self) -> str: > # This MUST return a non-empty string > return self.metric_namespace > > def create_client(self) -> httpx.AsyncClient: > return httpx.AsyncClient(timeout=60.0) > > def _get_token(self) -> str: > auth_req = google.auth.transport.requests.Request() > return id_token.fetch_id_token(auth_req, self.app_url) > > async def request( > self, > item: str, > client: httpx.AsyncClient, > inference_args: Optional[Dict[str, Any]] = None > ) -> PredictionResult: > token = self._get_token() > headers = {"Authorization": f"Bearer {token}", "Content-Type": > "application/json"} > > run_data = { > "app_name": self.app_name, > "user_id": self.user_id, > "session_id": f"beam_task_{hash(item)}", > "new_message": {"role": "user", "parts": [{"text": item}]}, > "streaming": False > } > > response = await client.post(f"{self.app_url}/run_sse", > headers=headers, json=run_data) > > raw_text = response.text.strip() > data_lines = [l for l in raw_text.split('\n') if > l.strip().startswith("data:")] > > if data_lines: > try: > import json > last_json = json.loads(data_lines[-1][5:]) > final_text = last_json.get('content', {}).get('parts', > [{}])[0].get('text', '') > return PredictionResult(example=item, inference=final_text) > except Exception as e: > return PredictionResult(example=item, inference=f"Error: {e}") > > return PredictionResult(example=item, inference="No Data") > > > def test_cloudagent(self): > from apache_beam.ml.inference.base import RunInference, PredictionResult > > agent_handler = CloudRunAgentHandler( > app_url="xxxxxxxxxx", > app_name="stock_agent", > user_id="user_123", > metric_namespace="stock_agent_inference" > ) > sink = beam.Map(print) > with TestPipeline(options=PipelineOptions()) as pipeline: > (pipeline | 'Sourcinig prompt' >> beam.Create( > ["Run a technical analysis for today's stock picks and give me > your recommendations"]) > | 'ClouodagentRun' >> RunInference(agent_handler) > | sink > ) > > kind regards > Marco > > On Mon, Jan 5, 2026 at 10:06 AM Marc _ <[email protected]> wrote: > >> ye no worry.... it runs only on my project. i will re deploy service >> >> >> On Mon, Jan 5, 2026 at 9:19 AM Ganesh Sivakumar < >> [email protected]> wrote: >> >>> Hey, it's not safe to expose your cloud run url in public forums. Your >>> code snippet had hardcoded url. >>> >>> On Mon, 5 Jan, 2026, 1:47 pm Marc _, <[email protected]> wrote: >>> >>>> Thanks a lot Ganesh! I'll have a look and report back if stuck >>>> Kind regards >>>> >>>> On Mon, Jan 5, 2026 at 8:11 AM Ganesh Sivakumar < >>>> [email protected]> wrote: >>>> >>>>> Hey Marco, best pattern for your use case would be to separate data >>>>> processing and inference(invoking your agent). implement a custom ` >>>>> RemoteModelHandler >>>>> <https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RemoteModelHandler>` >>>>> for calling the agent and use it with RunInference transform. Pipeline >>>>> like >>>>> Read(stocks data) ---> RunInference(your agent handler) --> Sink. >>>>> >>>>> Thanks, >>>>> Ganesh. >>>>> >>>>> On Mon, Jan 5, 2026 at 1:04 PM Marc _ <[email protected]> wrote: >>>>> >>>>>> Hi all >>>>>> i have an ADK agent running on CloudRun and wanted to invoke it via >>>>>> dataflow >>>>>> I have the following pipeline and DoFn but i am getting this exception >>>>>> >>>>>> Anyone could advise? >>>>>> >>>>>> Kind regards >>>>>> Marco >>>>>> >>>>>> EOF when reading a line [while running >>>>>> 'ClouodagentRun-ptransform-32'] Traceback (most recent call last): File >>>>>> "apache_beam/runners/common.py", line 1498, in >>>>>> apache_beam.runners.common.DoFnRunner.process File >>>>>> "apache_beam/runners/common.py", line 685, in >>>>>> apache_beam.runners.common.SimpleInvoker.invoke_process File >>>>>> "/template/shareloader/modules/obb_utils.py", line 726, in process return >>>>>> runner.run(self.amain(element)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File >>>>>> "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run return >>>>>> self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ >>>>>> File "/usr/local/lib/python3.11/asyncio/base_events.py", line 654, in >>>>>> run_until_complete return future.result() ^^^^^^^^^^^^^^^ File >>>>>> "/template/shareloader/modules/obb_utils.py", line 713, in amain await >>>>>> self.chat(client, self.SESSION_ID) File >>>>>> "/template/shareloader/modules/obb_utils.py", line 690, in chat raise e >>>>>> File "/template/shareloader/modules/obb_utils.py", line 683, in chat >>>>>> user_input = await asyncio.to_thread(input, f"[{self.USER_ID}]: ") >>>>>> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File >>>>>> "/usr/local/lib/python3.11/asyncio/threads.py", line 25, in to_thread >>>>>> return await loop.run_in_executor(None, func_call) >>>>>> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File >>>>>> "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run >>>>>> result = self.fn(*self.args, **self.kwargs) >>>>>> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ EOFError: EOF when reading a line >>>>>> >>>>>> def run_gcloud_agent(pipeline, debugSink): >>>>>> from shareloader.modules.obb_utils import AsyncCloudRunAgent >>>>>> (pipeline | 'Sourcinig prompt' >> beam.Create(["Run a technical >>>>>> analysis for today's stock picks and give me your recommendations"]) >>>>>> | 'ClouodagentRun' >> beam.ParDo(AsyncCloudRunAgent()) >>>>>> | debugSink >>>>>> ) >>>>>> >>>>>> >>>>>> class AsyncCloudRunAgent(AsyncProcess): >>>>>> >>>>>> def __init__(self): >>>>>> # --- Configuration (Dynamic) --- >>>>>> self.APP_URL = >>>>>> "https://stock-agent-service-682143946483.us-central1.run.app" >>>>>> self.USER_ID = "user_123" >>>>>> # Generate a single session ID for the entire conversation loop >>>>>> self.SESSION_ID = >>>>>> f"session_{datetime.now().strftime('%Y%m%d%H%M%S')}" >>>>>> self.APP_NAME = "stock_agent" >>>>>> >>>>>> >>>>>> >>>>>> # --- Authentication Function (ASYNC) --- >>>>>> >>>>>> async def get_auth_token(self) -> str: >>>>>> """ >>>>>> Programmatically fetches an ID token for the Cloud Run service >>>>>> audience. >>>>>> 'target_audience' should be the URL of your Cloud Run service. >>>>>> """ >>>>>> # Run the synchronous google-auth call in a thread to keep it >>>>>> async-friendly >>>>>> loop = asyncio.get_event_loop() >>>>>> >>>>>> def fetch_token(): >>>>>> auth_req = google.auth.transport.requests.Request() >>>>>> # This automatically uses the Dataflow Worker Service Account >>>>>> return id_token.fetch_id_token(auth_req, self.APP_URL) >>>>>> >>>>>> try: >>>>>> token = await loop.run_in_executor(None, fetch_token) >>>>>> return token >>>>>> except Exception as e: >>>>>> raise RuntimeError(f"Failed to fetch ID token: {e}") >>>>>> # --- API Interaction Functions (ASYNC) --- >>>>>> >>>>>> async def make_request(self, client: httpx.AsyncClient, method: str, >>>>>> endpoint: str, data: Dict[str, Any] = None) -> httpx.Response: >>>>>> """Helper function for authenticated asynchronous requests using >>>>>> httpx.""" >>>>>> token = await self.get_auth_token() >>>>>> headers = { >>>>>> "Authorization": f"Bearer {token}", >>>>>> "Content-Type": "application/json" >>>>>> } >>>>>> url = f"{self.APP_URL}{endpoint}" >>>>>> >>>>>> try: >>>>>> if method.upper() == 'POST': >>>>>> response = await client.post(url, headers=headers, >>>>>> json=data) >>>>>> elif method.upper() == 'DELETE': >>>>>> response = await client.delete(url, headers=headers) >>>>>> else: >>>>>> raise ValueError(f"Unsupported method: {method}") >>>>>> >>>>>> response.raise_for_status() >>>>>> return response >>>>>> except httpx.HTTPStatusError as errh: >>>>>> print(f"\n❌ **HTTP ERROR:** Status {response.status_code} >>>>>> for {url}") >>>>>> print(f"❌ **Server Response (Raw):**\n{response.text}") >>>>>> raise >>>>>> except httpx.RequestError as err: >>>>>> print(f"\n❌ An unexpected request error occurred: {err}") >>>>>> raise >>>>>> >>>>>> async def run_agent_request(self, client: httpx.AsyncClient, >>>>>> session_id: str, message: str): >>>>>> """Executes a single POST request to the /run_sse endpoint.""" >>>>>> >>>>>> print(f"\n[User] -> Sending message: '{message}'") >>>>>> >>>>>> run_data = { >>>>>> "app_name": self.APP_NAME, >>>>>> "user_id": self.USER_ID, >>>>>> "session_id": session_id, >>>>>> "new_message": {"role": "user", "parts": [{"text": >>>>>> message}]}, >>>>>> "streaming": False >>>>>> } >>>>>> >>>>>> try: >>>>>> response = await self.make_request(client, "POST", >>>>>> "/run_sse", data=run_data) >>>>>> current_status = response.status_code >>>>>> # print(f"**Request Status Code:** {current_status}") >>>>>> >>>>>> raw_text = response.text.strip() >>>>>> >>>>>> # Multi-line SSE parsing logic >>>>>> data_lines = [ >>>>>> line.strip() >>>>>> for line in raw_text.split('\n') >>>>>> if line.strip().startswith("data:") >>>>>> ] >>>>>> >>>>>> if not data_lines: >>>>>> raise json.JSONDecodeError("No 'data:' lines found in >>>>>> 200 response.", raw_text, 0) >>>>>> >>>>>> last_data_line = data_lines[-1] >>>>>> json_payload = last_data_line[len("data:"):].strip() >>>>>> agent_response = json.loads(json_payload) >>>>>> >>>>>> # Extract the final text >>>>>> final_text = agent_response.get('content', {}).get('parts', >>>>>> [{}])[0].get('text', 'Agent response structure not recognized.') >>>>>> >>>>>> print(f"[Agent] -> {final_text}") >>>>>> >>>>>> except json.JSONDecodeError as e: >>>>>> print(f"\n🚨 **JSON PARSING FAILED**!") >>>>>> print(f" Error: {e}") >>>>>> print(" --- RAW SERVER CONTENT ---") >>>>>> print(raw_text) >>>>>> print(" --------------------------") >>>>>> >>>>>> except Exception as e: >>>>>> print(f"❌ Agent request failed: {e}") >>>>>> >>>>>> # --- Interactive Chat Loop --- >>>>>> >>>>>> async def chat(self, client: httpx.AsyncClient, session_id: str): >>>>>> """Runs the main conversation loop, handling user input >>>>>> asynchronously.""" >>>>>> print("--- 💬 Start Chatting ---") >>>>>> >>>>>> try: >>>>>> # Use asyncio.to_thread to run blocking input() without >>>>>> freezing the event loop >>>>>> user_input = await asyncio.to_thread(input, >>>>>> f"[{self.USER_ID}]: ") >>>>>> >>>>>> # Send the message to the agent >>>>>> await self.run_agent_request(client, session_id, user_input) >>>>>> >>>>>> except Exception as e: >>>>>> print(f"An unexpected error occurred in the loop: {e}") >>>>>> raise e >>>>>> >>>>>> # --- Main Logic (ASYNC) --- >>>>>> >>>>>> async def amain(self, element): >>>>>> """Main asynchronous function to set up the session and start >>>>>> the loop.""" >>>>>> print(f"\n🤖 Starting Interactive Client with Session ID: >>>>>> **{self.SESSION_ID}**") >>>>>> session_data = {"state": {"preferred_language": "English", >>>>>> "visit_count": 5}} >>>>>> current_session_endpoint = >>>>>> f"/apps/{self.APP_NAME}/users/{self.USER_ID}/sessions/{self.SESSION_ID}" >>>>>> >>>>>> # httpx.AsyncClient is used as a context manager to manage >>>>>> connections >>>>>> async with httpx.AsyncClient(timeout=30.0) as client: >>>>>> >>>>>> # 1. Create Session >>>>>> print("\n## 1. Creating Session") >>>>>> try: >>>>>> await self.make_request(client, "POST", >>>>>> current_session_endpoint, data=session_data) >>>>>> print(f"✅ Session created successfully. Status 200.") >>>>>> except Exception as e: >>>>>> print(f"❌ Could not start session: {e}") >>>>>> return >>>>>> >>>>>> # 2. Start the Interactive Loop >>>>>> await self.chat(client, self.SESSION_ID) >>>>>> >>>>>> # 3. Cleanup: Delete Session (Best Practice) >>>>>> print(f"\n## 3. Deleting Session: {self.SESSION_ID}") >>>>>> try: >>>>>> await self.make_request(client, "DELETE", >>>>>> current_session_endpoint) >>>>>> print("✅ Session deleted successfully.") >>>>>> except Exception as e: >>>>>> print(f"⚠️ Warning: Failed to delete session. {e}") >>>>>> >>>>>> def process(self, element: str): >>>>>> logging.info(f'Input elements:{element}') >>>>>> with asyncio.Runner() as runner: >>>>>> return runner.run(self.amain(element)) >>>>>> >>>>>> >>>>>>
