Hi Ganesh
 got some setback....
I am getting a  'Failure: builtins.tuple: (<class 'ValueError'>,
ValueError('Metric namespace must be non-empty'), <traceback object at
0x00000255D2BDEFC0>)'

Here's my handler.. am i missing something

class CloudRunAgentHandler(RemoteModelHandler):
    def __init__(self, app_url: str, app_name: str, user_id: str,
metric_namespace:str):
        # We initialize with a model_id to ensure the metrics
namespace is populated
        self._model_id = f"CloudRun_{app_name}"
        super().__init__()
        self.app_url = app_url
        self.app_name = app_name
        self.user_id = user_id
        self.metric_namespace = metric_namespace

    def share_model_across_processes(self) -> bool:
        return True

    def get_metrics_namespace(self) -> str:
        # This MUST return a non-empty string
        return self.metric_namespace

    def create_client(self) -> httpx.AsyncClient:
        return httpx.AsyncClient(timeout=60.0)

    def _get_token(self) -> str:
        auth_req = google.auth.transport.requests.Request()
        return id_token.fetch_id_token(auth_req, self.app_url)

    async def request(
            self,
            item: str,
            client: httpx.AsyncClient,
            inference_args: Optional[Dict[str, Any]] = None
    ) -> PredictionResult:
        token = self._get_token()
        headers = {"Authorization": f"Bearer {token}", "Content-Type":
"application/json"}

        run_data = {
            "app_name": self.app_name,
            "user_id": self.user_id,
            "session_id": f"beam_task_{hash(item)}",
            "new_message": {"role": "user", "parts": [{"text": item}]},
            "streaming": False
        }

        response = await client.post(f"{self.app_url}/run_sse",
headers=headers, json=run_data)

        raw_text = response.text.strip()
        data_lines = [l for l in raw_text.split('\n') if
l.strip().startswith("data:")]

        if data_lines:
            try:
                import json
                last_json = json.loads(data_lines[-1][5:])
                final_text = last_json.get('content', {}).get('parts',
[{}])[0].get('text', '')
                return PredictionResult(example=item, inference=final_text)
            except Exception as e:
                return PredictionResult(example=item, inference=f"Error: {e}")

        return PredictionResult(example=item, inference="No Data")


def test_cloudagent(self):
    from apache_beam.ml.inference.base import RunInference, PredictionResult

    agent_handler = CloudRunAgentHandler(
        app_url="xxxxxxxxxx",
        app_name="stock_agent",
        user_id="user_123",
        metric_namespace="stock_agent_inference"
    )
    sink = beam.Map(print)
    with TestPipeline(options=PipelineOptions()) as pipeline:
        (pipeline | 'Sourcinig prompt' >> beam.Create(
            ["Run a technical analysis for today's stock picks and
give me your recommendations"])
         | 'ClouodagentRun' >> RunInference(agent_handler)
         | sink
         )

kind regards
 Marco

On Mon, Jan 5, 2026 at 10:06 AM Marc _ <[email protected]> wrote:

> ye no worry.... it runs only on my project. i will re deploy service
>
>
> On Mon, Jan 5, 2026 at 9:19 AM Ganesh Sivakumar <
> [email protected]> wrote:
>
>> Hey, it's not safe to expose your cloud run url in public forums. Your
>> code snippet had hardcoded url.
>>
>> On Mon, 5 Jan, 2026, 1:47 pm Marc _, <[email protected]> wrote:
>>
>>> Thanks   a lot Ganesh!  I'll have a look and report back if stuck
>>> Kind regards
>>>
>>> On Mon, Jan 5, 2026 at 8:11 AM Ganesh Sivakumar <
>>> [email protected]> wrote:
>>>
>>>> Hey Marco, best pattern for your use case would be to separate data
>>>> processing and inference(invoking your agent). implement a custom `
>>>> RemoteModelHandler
>>>> <https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RemoteModelHandler>`
>>>> for calling the agent and use it with RunInference transform. Pipeline like
>>>> Read(stocks data) ---> RunInference(your agent handler) --> Sink.
>>>>
>>>> Thanks,
>>>> Ganesh.
>>>>
>>>> On Mon, Jan 5, 2026 at 1:04 PM Marc _ <[email protected]> wrote:
>>>>
>>>>> Hi all
>>>>>  i have an ADK agent running on CloudRun and wanted to invoke it via
>>>>> dataflow
>>>>> I have the following pipeline and DoFn but i am getting this exception
>>>>>
>>>>> Anyone could advise?
>>>>>
>>>>> Kind regards
>>>>> Marco
>>>>>
>>>>> EOF when reading a line [while running 'ClouodagentRun-ptransform-32']
>>>>> Traceback (most recent call last): File "apache_beam/runners/common.py",
>>>>> line 1498, in apache_beam.runners.common.DoFnRunner.process File
>>>>> "apache_beam/runners/common.py", line 685, in
>>>>> apache_beam.runners.common.SimpleInvoker.invoke_process File
>>>>> "/template/shareloader/modules/obb_utils.py", line 726, in process return
>>>>> runner.run(self.amain(element)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
>>>>> "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run return
>>>>> self._loop.run_until_complete(task) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>>>>> File "/usr/local/lib/python3.11/asyncio/base_events.py", line 654, in
>>>>> run_until_complete return future.result() ^^^^^^^^^^^^^^^ File
>>>>> "/template/shareloader/modules/obb_utils.py", line 713, in amain await
>>>>> self.chat(client, self.SESSION_ID) File
>>>>> "/template/shareloader/modules/obb_utils.py", line 690, in chat raise e
>>>>> File "/template/shareloader/modules/obb_utils.py", line 683, in chat
>>>>> user_input = await asyncio.to_thread(input, f"[{self.USER_ID}]: ")
>>>>> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
>>>>> "/usr/local/lib/python3.11/asyncio/threads.py", line 25, in to_thread
>>>>> return await loop.run_in_executor(None, func_call)
>>>>> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File
>>>>> "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
>>>>> result = self.fn(*self.args, **self.kwargs)
>>>>> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ EOFError: EOF when reading a line
>>>>>
>>>>> def run_gcloud_agent(pipeline, debugSink):
>>>>>     from shareloader.modules.obb_utils import AsyncCloudRunAgent
>>>>>     (pipeline | 'Sourcinig prompt' >> beam.Create(["Run a technical 
>>>>> analysis for today's stock picks and give me your recommendations"])
>>>>>             | 'ClouodagentRun' >> beam.ParDo(AsyncCloudRunAgent())
>>>>>              |  debugSink
>>>>>             )
>>>>>
>>>>>
>>>>> class AsyncCloudRunAgent(AsyncProcess):
>>>>>
>>>>>     def __init__(self):
>>>>>         # --- Configuration (Dynamic) ---
>>>>>         self.APP_URL = 
>>>>> "https://stock-agent-service-682143946483.us-central1.run.app";
>>>>>         self.USER_ID = "user_123"
>>>>>         # Generate a single session ID for the entire conversation loop
>>>>>         self.SESSION_ID = 
>>>>> f"session_{datetime.now().strftime('%Y%m%d%H%M%S')}"
>>>>>         self.APP_NAME = "stock_agent"
>>>>>
>>>>>
>>>>>
>>>>>     # --- Authentication Function (ASYNC) ---
>>>>>
>>>>>     async def get_auth_token(self) -> str:
>>>>>         """
>>>>>         Programmatically fetches an ID token for the Cloud Run service 
>>>>> audience.
>>>>>         'target_audience' should be the URL of your Cloud Run service.
>>>>>         """
>>>>>         # Run the synchronous google-auth call in a thread to keep it 
>>>>> async-friendly
>>>>>         loop = asyncio.get_event_loop()
>>>>>
>>>>>         def fetch_token():
>>>>>             auth_req = google.auth.transport.requests.Request()
>>>>>             # This automatically uses the Dataflow Worker Service Account
>>>>>             return id_token.fetch_id_token(auth_req, self.APP_URL)
>>>>>
>>>>>         try:
>>>>>             token = await loop.run_in_executor(None, fetch_token)
>>>>>             return token
>>>>>         except Exception as e:
>>>>>             raise RuntimeError(f"Failed to fetch ID token: {e}")
>>>>>     # --- API Interaction Functions (ASYNC) ---
>>>>>
>>>>>     async def make_request(self, client: httpx.AsyncClient, method: str, 
>>>>> endpoint: str, data: Dict[str, Any] = None) -> httpx.Response:
>>>>>         """Helper function for authenticated asynchronous requests using 
>>>>> httpx."""
>>>>>         token = await self.get_auth_token()
>>>>>         headers = {
>>>>>             "Authorization": f"Bearer {token}",
>>>>>             "Content-Type": "application/json"
>>>>>         }
>>>>>         url = f"{self.APP_URL}{endpoint}"
>>>>>
>>>>>         try:
>>>>>             if method.upper() == 'POST':
>>>>>                 response = await client.post(url, headers=headers, 
>>>>> json=data)
>>>>>             elif method.upper() == 'DELETE':
>>>>>                 response = await client.delete(url, headers=headers)
>>>>>             else:
>>>>>                 raise ValueError(f"Unsupported method: {method}")
>>>>>
>>>>>             response.raise_for_status()
>>>>>             return response
>>>>>         except httpx.HTTPStatusError as errh:
>>>>>             print(f"\n❌ **HTTP ERROR:** Status {response.status_code} for 
>>>>> {url}")
>>>>>             print(f"❌ **Server Response (Raw):**\n{response.text}")
>>>>>             raise
>>>>>         except httpx.RequestError as err:
>>>>>             print(f"\n❌ An unexpected request error occurred: {err}")
>>>>>             raise
>>>>>
>>>>>     async def run_agent_request(self, client: httpx.AsyncClient, 
>>>>> session_id: str, message: str):
>>>>>         """Executes a single POST request to the /run_sse endpoint."""
>>>>>
>>>>>         print(f"\n[User] -> Sending message: '{message}'")
>>>>>
>>>>>         run_data = {
>>>>>             "app_name": self.APP_NAME,
>>>>>             "user_id": self.USER_ID,
>>>>>             "session_id": session_id,
>>>>>             "new_message": {"role": "user", "parts": [{"text": message}]},
>>>>>             "streaming": False
>>>>>         }
>>>>>
>>>>>         try:
>>>>>             response = await self.make_request(client, "POST", 
>>>>> "/run_sse", data=run_data)
>>>>>             current_status = response.status_code
>>>>>             # print(f"**Request Status Code:** {current_status}")
>>>>>
>>>>>             raw_text = response.text.strip()
>>>>>
>>>>>             # Multi-line SSE parsing logic
>>>>>             data_lines = [
>>>>>                 line.strip()
>>>>>                 for line in raw_text.split('\n')
>>>>>                 if line.strip().startswith("data:")
>>>>>             ]
>>>>>
>>>>>             if not data_lines:
>>>>>                 raise json.JSONDecodeError("No 'data:' lines found in 200 
>>>>> response.", raw_text, 0)
>>>>>
>>>>>             last_data_line = data_lines[-1]
>>>>>             json_payload = last_data_line[len("data:"):].strip()
>>>>>             agent_response = json.loads(json_payload)
>>>>>
>>>>>             # Extract the final text
>>>>>             final_text = agent_response.get('content', {}).get('parts', 
>>>>> [{}])[0].get('text', 'Agent response structure not recognized.')
>>>>>
>>>>>             print(f"[Agent] -> {final_text}")
>>>>>
>>>>>         except json.JSONDecodeError as e:
>>>>>             print(f"\n🚨 **JSON PARSING FAILED**!")
>>>>>             print(f"   Error: {e}")
>>>>>             print("   --- RAW SERVER CONTENT ---")
>>>>>             print(raw_text)
>>>>>             print("   --------------------------")
>>>>>
>>>>>         except Exception as e:
>>>>>             print(f"❌ Agent request failed: {e}")
>>>>>
>>>>>     # --- Interactive Chat Loop ---
>>>>>
>>>>>     async def chat(self, client: httpx.AsyncClient, session_id: str):
>>>>>         """Runs the main conversation loop, handling user input 
>>>>> asynchronously."""
>>>>>         print("--- 💬 Start Chatting ---")
>>>>>
>>>>>         try:
>>>>>             # Use asyncio.to_thread to run blocking input() without 
>>>>> freezing the event loop
>>>>>             user_input = await asyncio.to_thread(input, 
>>>>> f"[{self.USER_ID}]: ")
>>>>>
>>>>>             # Send the message to the agent
>>>>>             await self.run_agent_request(client, session_id, user_input)
>>>>>
>>>>>         except Exception as e:
>>>>>             print(f"An unexpected error occurred in the loop: {e}")
>>>>>             raise e
>>>>>
>>>>>     # --- Main Logic (ASYNC) ---
>>>>>
>>>>>     async def amain(self, element):
>>>>>         """Main asynchronous function to set up the session and start the 
>>>>> loop."""
>>>>>         print(f"\n🤖 Starting Interactive Client with Session ID: 
>>>>> **{self.SESSION_ID}**")
>>>>>         session_data = {"state": {"preferred_language": "English", 
>>>>> "visit_count": 5}}
>>>>>         current_session_endpoint = 
>>>>> f"/apps/{self.APP_NAME}/users/{self.USER_ID}/sessions/{self.SESSION_ID}"
>>>>>
>>>>>         # httpx.AsyncClient is used as a context manager to manage 
>>>>> connections
>>>>>         async with httpx.AsyncClient(timeout=30.0) as client:
>>>>>
>>>>>             # 1. Create Session
>>>>>             print("\n## 1. Creating Session")
>>>>>             try:
>>>>>                 await self.make_request(client, "POST", 
>>>>> current_session_endpoint, data=session_data)
>>>>>                 print(f"✅ Session created successfully. Status 200.")
>>>>>             except Exception as e:
>>>>>                 print(f"❌ Could not start session: {e}")
>>>>>                 return
>>>>>
>>>>>             # 2. Start the Interactive Loop
>>>>>             await self.chat(client, self.SESSION_ID)
>>>>>
>>>>>             # 3. Cleanup: Delete Session (Best Practice)
>>>>>             print(f"\n## 3. Deleting Session: {self.SESSION_ID}")
>>>>>             try:
>>>>>                 await self.make_request(client, "DELETE", 
>>>>> current_session_endpoint)
>>>>>                 print("✅ Session deleted successfully.")
>>>>>             except Exception as e:
>>>>>                 print(f"⚠️ Warning: Failed to delete session. {e}")
>>>>>
>>>>>     def process(self, element: str):
>>>>>         logging.info(f'Input elements:{element}')
>>>>>         with asyncio.Runner() as runner:
>>>>>             return runner.run(self.amain(element))
>>>>>
>>>>>
>>>>>

Reply via email to