Hi Marco, try passing metric namespace directly to super constructor -
super().__init__(namespace="stock_agent_inference")
Thanks,
Ganesh.

On Wed, Jan 14, 2026 at 12:46 PM Marc _ <[email protected]> wrote:

> Hi Ganesh
>  got some setback....
> I am getting a  'Failure: builtins.tuple: (<class 'ValueError'>,
> ValueError('Metric namespace must be non-empty'), <traceback object at
> 0x00000255D2BDEFC0>)'
>
> Here's my handler.. am i missing something
>
> class CloudRunAgentHandler(RemoteModelHandler):
>     def __init__(self, app_url: str, app_name: str, user_id: str, 
> metric_namespace:str):
>         # We initialize with a model_id to ensure the metrics namespace is 
> populated
>         self._model_id = f"CloudRun_{app_name}"
>         super().__init__()
>         self.app_url = app_url
>         self.app_name = app_name
>         self.user_id = user_id
>         self.metric_namespace = metric_namespace
>
>     def share_model_across_processes(self) -> bool:
>         return True
>
>     def get_metrics_namespace(self) -> str:
>         # This MUST return a non-empty string
>         return self.metric_namespace
>
>     def create_client(self) -> httpx.AsyncClient:
>         return httpx.AsyncClient(timeout=60.0)
>
>     def _get_token(self) -> str:
>         auth_req = google.auth.transport.requests.Request()
>         return id_token.fetch_id_token(auth_req, self.app_url)
>
>     async def request(
>             self,
>             item: str,
>             client: httpx.AsyncClient,
>             inference_args: Optional[Dict[str, Any]] = None
>     ) -> PredictionResult:
>         token = self._get_token()
>         headers = {"Authorization": f"Bearer {token}", "Content-Type": 
> "application/json"}
>
>         run_data = {
>             "app_name": self.app_name,
>             "user_id": self.user_id,
>             "session_id": f"beam_task_{hash(item)}",
>             "new_message": {"role": "user", "parts": [{"text": item}]},
>             "streaming": False
>         }
>
>         response = await client.post(f"{self.app_url}/run_sse", 
> headers=headers, json=run_data)
>
>         raw_text = response.text.strip()
>         data_lines = [l for l in raw_text.split('\n') if 
> l.strip().startswith("data:")]
>
>         if data_lines:
>             try:
>                 import json
>                 last_json = json.loads(data_lines[-1][5:])
>                 final_text = last_json.get('content', {}).get('parts', 
> [{}])[0].get('text', '')
>                 return PredictionResult(example=item, inference=final_text)
>             except Exception as e:
>                 return PredictionResult(example=item, inference=f"Error: {e}")
>
>         return PredictionResult(example=item, inference="No Data")
>
>
> def test_cloudagent(self):
>     from apache_beam.ml.inference.base import RunInference, PredictionResult
>
>     agent_handler = CloudRunAgentHandler(
>         app_url="xxxxxxxxxx",
>         app_name="stock_agent",
>         user_id="user_123",
>         metric_namespace="stock_agent_inference"
>     )
>     sink = beam.Map(print)
>     with TestPipeline(options=PipelineOptions()) as pipeline:
>         (pipeline | 'Sourcinig prompt' >> beam.Create(
>             ["Run a technical analysis for today's stock picks and give me 
> your recommendations"])
>          | 'ClouodagentRun' >> RunInference(agent_handler)
>          | sink
>          )
>
> kind regards
>  Marco
>
> On Mon, Jan 5, 2026 at 10:06 AM Marc _ <[email protected]> wrote:
>
>> ye no worry.... it runs only on my project. i will re deploy service
>>
>>
>> On Mon, Jan 5, 2026 at 9:19 AM Ganesh Sivakumar <
>> [email protected]> wrote:
>>
>>> Hey, it's not safe to expose your cloud run url in public forums. Your
>>> code snippet had hardcoded url.
>>>
>>> On Mon, 5 Jan, 2026, 1:47 pm Marc _, <[email protected]> wrote:
>>>
>>>> Thanks   a lot Ganesh!  I'll have a look and report back if stuck
>>>> Kind regards
>>>>
>>>> On Mon, Jan 5, 2026 at 8:11 AM Ganesh Sivakumar <
>>>> [email protected]> wrote:
>>>>
>>>>> Hey Marco, best pattern for your use case would be to separate data
>>>>> processing and inference(invoking your agent). implement a custom `
>>>>> RemoteModelHandler
>>>>> <https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RemoteModelHandler>`
>>>>> for calling the agent and use it with RunInference transform. Pipeline 
>>>>> like
>>>>> Read(stocks data) ---> RunInference(your agent handler) --> Sink.
>>>>>
>>>>> Thanks,
>>>>> Ganesh.
>>>>>
>>>>> On Mon, Jan 5, 2026 at 1:04 PM Marc _ <[email protected]> wrote:
>>>>>
>>>>>> Hi all
>>>>>>  i have an ADK agent running on CloudRun and wanted to invoke it via
>>>>>> dataflow
>>>>>> I have the following pipeline and DoFn but i am getting this exception
>>>>>>
>>>>>> Anyone could advise?
>>>>>>
>>>>>> Kind regards
>>>>>> Marco
>>>>>>
>>>>>> EOF when reading a line [while running
>>>>>> 'ClouodagentRun-ptransform-32'] Traceback (most recent call last): File
>>>>>> "apache_beam/runners/common.py", line 1498, in
>>>>>> apache_beam.runners.common.DoFnRunner.process File
>>>>>> "apache_beam/runners/common.py", line 685, in
>>>>>> apache_beam.runners.common.SimpleInvoker.invoke_process File
>>>>>> "/template/shareloader/modules/obb_utils.py", line 726, in process return
>>>>>> runner.run(self.amain(element)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
>>>>>> "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run return
>>>>>> self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>>>>>> File "/usr/local/lib/python3.11/asyncio/base_events.py", line 654, in
>>>>>> run_until_complete return future.result() ^^^^^^^^^^^^^^^ File
>>>>>> "/template/shareloader/modules/obb_utils.py", line 713, in amain await
>>>>>> self.chat(client, self.SESSION_ID) File
>>>>>> "/template/shareloader/modules/obb_utils.py", line 690, in chat raise e
>>>>>> File "/template/shareloader/modules/obb_utils.py", line 683, in chat
>>>>>> user_input = await asyncio.to_thread(input, f"[{self.USER_ID}]: ")
>>>>>> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
>>>>>> "/usr/local/lib/python3.11/asyncio/threads.py", line 25, in to_thread
>>>>>> return await loop.run_in_executor(None, func_call)
>>>>>> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
>>>>>> "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
>>>>>> result = self.fn(*self.args, **self.kwargs)
>>>>>> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ EOFError: EOF when reading a line
>>>>>>
>>>>>> def run_gcloud_agent(pipeline, debugSink):
>>>>>>     from shareloader.modules.obb_utils import AsyncCloudRunAgent
>>>>>>     (pipeline | 'Sourcinig prompt' >> beam.Create(["Run a technical 
>>>>>> analysis for today's stock picks and give me your recommendations"])
>>>>>>             | 'ClouodagentRun' >> beam.ParDo(AsyncCloudRunAgent())
>>>>>>              |  debugSink
>>>>>>             )
>>>>>>
>>>>>>
>>>>>> class AsyncCloudRunAgent(AsyncProcess):
>>>>>>
>>>>>>     def __init__(self):
>>>>>>         # --- Configuration (Dynamic) ---
>>>>>>         self.APP_URL = 
>>>>>> "https://stock-agent-service-682143946483.us-central1.run.app";
>>>>>>         self.USER_ID = "user_123"
>>>>>>         # Generate a single session ID for the entire conversation loop
>>>>>>         self.SESSION_ID = 
>>>>>> f"session_{datetime.now().strftime('%Y%m%d%H%M%S')}"
>>>>>>         self.APP_NAME = "stock_agent"
>>>>>>
>>>>>>
>>>>>>
>>>>>>     # --- Authentication Function (ASYNC) ---
>>>>>>
>>>>>>     async def get_auth_token(self) -> str:
>>>>>>         """
>>>>>>         Programmatically fetches an ID token for the Cloud Run service 
>>>>>> audience.
>>>>>>         'target_audience' should be the URL of your Cloud Run service.
>>>>>>         """
>>>>>>         # Run the synchronous google-auth call in a thread to keep it 
>>>>>> async-friendly
>>>>>>         loop = asyncio.get_event_loop()
>>>>>>
>>>>>>         def fetch_token():
>>>>>>             auth_req = google.auth.transport.requests.Request()
>>>>>>             # This automatically uses the Dataflow Worker Service Account
>>>>>>             return id_token.fetch_id_token(auth_req, self.APP_URL)
>>>>>>
>>>>>>         try:
>>>>>>             token = await loop.run_in_executor(None, fetch_token)
>>>>>>             return token
>>>>>>         except Exception as e:
>>>>>>             raise RuntimeError(f"Failed to fetch ID token: {e}")
>>>>>>     # --- API Interaction Functions (ASYNC) ---
>>>>>>
>>>>>>     async def make_request(self, client: httpx.AsyncClient, method: str, 
>>>>>> endpoint: str, data: Dict[str, Any] = None) -> httpx.Response:
>>>>>>         """Helper function for authenticated asynchronous requests using 
>>>>>> httpx."""
>>>>>>         token = await self.get_auth_token()
>>>>>>         headers = {
>>>>>>             "Authorization": f"Bearer {token}",
>>>>>>             "Content-Type": "application/json"
>>>>>>         }
>>>>>>         url = f"{self.APP_URL}{endpoint}"
>>>>>>
>>>>>>         try:
>>>>>>             if method.upper() == 'POST':
>>>>>>                 response = await client.post(url, headers=headers, 
>>>>>> json=data)
>>>>>>             elif method.upper() == 'DELETE':
>>>>>>                 response = await client.delete(url, headers=headers)
>>>>>>             else:
>>>>>>                 raise ValueError(f"Unsupported method: {method}")
>>>>>>
>>>>>>             response.raise_for_status()
>>>>>>             return response
>>>>>>         except httpx.HTTPStatusError as errh:
>>>>>>             print(f"\n❌ **HTTP ERROR:** Status {response.status_code} 
>>>>>> for {url}")
>>>>>>             print(f"❌ **Server Response (Raw):**\n{response.text}")
>>>>>>             raise
>>>>>>         except httpx.RequestError as err:
>>>>>>             print(f"\n❌ An unexpected request error occurred: {err}")
>>>>>>             raise
>>>>>>
>>>>>>     async def run_agent_request(self, client: httpx.AsyncClient, 
>>>>>> session_id: str, message: str):
>>>>>>         """Executes a single POST request to the /run_sse endpoint."""
>>>>>>
>>>>>>         print(f"\n[User] -> Sending message: '{message}'")
>>>>>>
>>>>>>         run_data = {
>>>>>>             "app_name": self.APP_NAME,
>>>>>>             "user_id": self.USER_ID,
>>>>>>             "session_id": session_id,
>>>>>>             "new_message": {"role": "user", "parts": [{"text": 
>>>>>> message}]},
>>>>>>             "streaming": False
>>>>>>         }
>>>>>>
>>>>>>         try:
>>>>>>             response = await self.make_request(client, "POST", 
>>>>>> "/run_sse", data=run_data)
>>>>>>             current_status = response.status_code
>>>>>>             # print(f"**Request Status Code:** {current_status}")
>>>>>>
>>>>>>             raw_text = response.text.strip()
>>>>>>
>>>>>>             # Multi-line SSE parsing logic
>>>>>>             data_lines = [
>>>>>>                 line.strip()
>>>>>>                 for line in raw_text.split('\n')
>>>>>>                 if line.strip().startswith("data:")
>>>>>>             ]
>>>>>>
>>>>>>             if not data_lines:
>>>>>>                 raise json.JSONDecodeError("No 'data:' lines found in 
>>>>>> 200 response.", raw_text, 0)
>>>>>>
>>>>>>             last_data_line = data_lines[-1]
>>>>>>             json_payload = last_data_line[len("data:"):].strip()
>>>>>>             agent_response = json.loads(json_payload)
>>>>>>
>>>>>>             # Extract the final text
>>>>>>             final_text = agent_response.get('content', {}).get('parts', 
>>>>>> [{}])[0].get('text', 'Agent response structure not recognized.')
>>>>>>
>>>>>>             print(f"[Agent] -> {final_text}")
>>>>>>
>>>>>>         except json.JSONDecodeError as e:
>>>>>>             print(f"\n🚨 **JSON PARSING FAILED**!")
>>>>>>             print(f"   Error: {e}")
>>>>>>             print("   --- RAW SERVER CONTENT ---")
>>>>>>             print(raw_text)
>>>>>>             print("   --------------------------")
>>>>>>
>>>>>>         except Exception as e:
>>>>>>             print(f"❌ Agent request failed: {e}")
>>>>>>
>>>>>>     # --- Interactive Chat Loop ---
>>>>>>
>>>>>>     async def chat(self, client: httpx.AsyncClient, session_id: str):
>>>>>>         """Runs the main conversation loop, handling user input 
>>>>>> asynchronously."""
>>>>>>         print("--- 💬 Start Chatting ---")
>>>>>>
>>>>>>         try:
>>>>>>             # Use asyncio.to_thread to run blocking input() without 
>>>>>> freezing the event loop
>>>>>>             user_input = await asyncio.to_thread(input, 
>>>>>> f"[{self.USER_ID}]: ")
>>>>>>
>>>>>>             # Send the message to the agent
>>>>>>             await self.run_agent_request(client, session_id, user_input)
>>>>>>
>>>>>>         except Exception as e:
>>>>>>             print(f"An unexpected error occurred in the loop: {e}")
>>>>>>             raise e
>>>>>>
>>>>>>     # --- Main Logic (ASYNC) ---
>>>>>>
>>>>>>     async def amain(self, element):
>>>>>>         """Main asynchronous function to set up the session and start 
>>>>>> the loop."""
>>>>>>         print(f"\n🤖 Starting Interactive Client with Session ID: 
>>>>>> **{self.SESSION_ID}**")
>>>>>>         session_data = {"state": {"preferred_language": "English", 
>>>>>> "visit_count": 5}}
>>>>>>         current_session_endpoint = 
>>>>>> f"/apps/{self.APP_NAME}/users/{self.USER_ID}/sessions/{self.SESSION_ID}"
>>>>>>
>>>>>>         # httpx.AsyncClient is used as a context manager to manage 
>>>>>> connections
>>>>>>         async with httpx.AsyncClient(timeout=30.0) as client:
>>>>>>
>>>>>>             # 1. Create Session
>>>>>>             print("\n## 1. Creating Session")
>>>>>>             try:
>>>>>>                 await self.make_request(client, "POST", 
>>>>>> current_session_endpoint, data=session_data)
>>>>>>                 print(f"✅ Session created successfully. Status 200.")
>>>>>>             except Exception as e:
>>>>>>                 print(f"❌ Could not start session: {e}")
>>>>>>                 return
>>>>>>
>>>>>>             # 2. Start the Interactive Loop
>>>>>>             await self.chat(client, self.SESSION_ID)
>>>>>>
>>>>>>             # 3. Cleanup: Delete Session (Best Practice)
>>>>>>             print(f"\n## 3. Deleting Session: {self.SESSION_ID}")
>>>>>>             try:
>>>>>>                 await self.make_request(client, "DELETE", 
>>>>>> current_session_endpoint)
>>>>>>                 print("✅ Session deleted successfully.")
>>>>>>             except Exception as e:
>>>>>>                 print(f"⚠️ Warning: Failed to delete session. {e}")
>>>>>>
>>>>>>     def process(self, element: str):
>>>>>>         logging.info(f'Input elements:{element}')
>>>>>>         with asyncio.Runner() as runner:
>>>>>>             return runner.run(self.amain(element))
>>>>>>
>>>>>>
>>>>>>

Reply via email to