gemini-code-assist[bot] commented on code in PR #38701:
URL: https://github.com/apache/beam/pull/38701#discussion_r3307231570


##########
sdks/python/apache_beam/ml/inference/vllm_inference.py:
##########
@@ -109,46 +111,214 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI:
   )
 
 
+# Embedded Dynamo runtime defaults proven on the smoke test: etcd discovery,
+# TCP request plane, ZMQ event plane, KV events disabled. KV-aware routing,
+# disaggregated prefill/decode, and the Planner are not active in this mode.
+_DYNAMO_FRONTEND_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+    'discovery-backend': 'etcd',
+    'request-plane': 'tcp',
+    'event-plane': 'zmq',
+    'router-mode': 'round-robin',
+    'no-router-kv-events': None,
+}
+
+_DYNAMO_ENGINE_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+    'discovery-backend': 'etcd',
+    'request-plane': 'tcp',
+    'event-plane': 'zmq',
+    'kv-events-config': '{"enable_kv_cache_events": false}',
+}
+
+
+def _append_kwargs(cmd: list[str], kwargs: dict[str, Optional[str]]) -> None:
+  for k, v in kwargs.items():
+    cmd.append(f'--{k}')
+    # Only add values for commands with value part.
+    if v is not None:
+      cmd.append(v)
+
+
+def _uses_etcd_discovery(kwargs: dict[str, Optional[str]]) -> bool:
+  return kwargs.get('discovery-backend') == 'etcd'
+
+
 class _VLLMModelServer():
-  def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]):
+  def __init__(
+      self,
+      model_name: str,
+      vllm_server_kwargs: dict[str, Optional[str]],
+      dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None,
+      use_dynamo: bool = False):
     self._model_name = model_name
     self._vllm_server_kwargs = vllm_server_kwargs
+    self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
     self._server_started = False
     self._server_process = None
+    self._dynamo_process = None
+    self._etcd_process = None
+    self._managed_etcd_endpoint = None
     self._server_port: int = -1
     self._server_process_lock = threading.RLock()
+    self._use_dynamo = use_dynamo

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Initialize `self._etcd_data_dir` to `None` in `__init__` to track the 
temporary etcd data directory for proper cleanup.
   
   ```suggestion
       self._model_name = model_name
       self._vllm_server_kwargs = vllm_server_kwargs
       self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
       self._server_started = False
       self._server_process = None
       self._dynamo_process = None
       self._etcd_process = None
       self._etcd_data_dir = None
       self._managed_etcd_endpoint = None
       self._server_port: int = -1
       self._server_process_lock = threading.RLock()
       self._use_dynamo = use_dynamo
   ```



##########
sdks/python/apache_beam/ml/inference/vllm_inference.py:
##########
@@ -109,46 +111,214 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI:
   )
 
 
+# Embedded Dynamo runtime defaults proven on the smoke test: etcd discovery,
+# TCP request plane, ZMQ event plane, KV events disabled. KV-aware routing,
+# disaggregated prefill/decode, and the Planner are not active in this mode.
+_DYNAMO_FRONTEND_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+    'discovery-backend': 'etcd',
+    'request-plane': 'tcp',
+    'event-plane': 'zmq',
+    'router-mode': 'round-robin',
+    'no-router-kv-events': None,
+}
+
+_DYNAMO_ENGINE_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+    'discovery-backend': 'etcd',
+    'request-plane': 'tcp',
+    'event-plane': 'zmq',
+    'kv-events-config': '{"enable_kv_cache_events": false}',
+}
+
+
+def _append_kwargs(cmd: list[str], kwargs: dict[str, Optional[str]]) -> None:
+  for k, v in kwargs.items():
+    cmd.append(f'--{k}')
+    # Only add values for commands with value part.
+    if v is not None:
+      cmd.append(v)
+
+
+def _uses_etcd_discovery(kwargs: dict[str, Optional[str]]) -> bool:
+  return kwargs.get('discovery-backend') == 'etcd'
+
+
 class _VLLMModelServer():
-  def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]):
+  def __init__(
+      self,
+      model_name: str,
+      vllm_server_kwargs: dict[str, Optional[str]],
+      dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None,
+      use_dynamo: bool = False):
     self._model_name = model_name
     self._vllm_server_kwargs = vllm_server_kwargs
+    self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
     self._server_started = False
     self._server_process = None
+    self._dynamo_process = None
+    self._etcd_process = None
+    self._managed_etcd_endpoint = None
     self._server_port: int = -1
     self._server_process_lock = threading.RLock()
+    self._use_dynamo = use_dynamo
 
     self.start_server()
 
+  @staticmethod
+  def _stop_process(process: Optional[subprocess.Popen]) -> None:
+    if process is None or process.poll() is not None:
+      return
+    process.terminate()
+    try:
+      process.wait(timeout=10)
+    except subprocess.TimeoutExpired:
+      process.kill()
+      process.wait()
+
+  def _stop_processes(self) -> None:
+    self._stop_process(self._dynamo_process)
+    self._stop_process(self._server_process)
+    self._stop_process(self._etcd_process)
+    if (self._managed_etcd_endpoint is not None and
+        os.environ.get('ETCD_ENDPOINTS') == self._managed_etcd_endpoint):
+      del os.environ['ETCD_ENDPOINTS']
+    self._dynamo_process = None
+    self._server_process = None
+    self._etcd_process = None
+    self._managed_etcd_endpoint = None
+    self._server_started = False
+    self._server_port = -1

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To prevent disk space leaks in `/tmp` from abandoned etcd data directories, 
and to avoid potential `AttributeError` or `NameError` during interpreter 
shutdown (when global modules like `os` or `shutil` might be 
garbage-collected), we should:
   1. Clean up the temporary etcd data directory using `shutil.rmtree` in 
`_stop_processes`.
   2. Import `os` and `shutil` locally inside `_stop_processes` to ensure they 
are available during interpreter shutdown.
   3. Use `local_os.environ.pop` to safely remove the environment variable 
without raising a `KeyError`.
   
   ```python
     def _stop_processes(self) -> None:
       import os as local_os
       import shutil as local_shutil
       self._stop_process(self._dynamo_process)
       self._stop_process(self._server_process)
       self._stop_process(self._etcd_process)
       if (self._managed_etcd_endpoint is not None and
           local_os.environ.get('ETCD_ENDPOINTS') == 
self._managed_etcd_endpoint):
         local_os.environ.pop('ETCD_ENDPOINTS', None)
       if getattr(self, '_etcd_data_dir', None):
         local_shutil.rmtree(self._etcd_data_dir, ignore_errors=True)
         self._etcd_data_dir = None
       self._dynamo_process = None
       self._server_process = None
       self._etcd_process = None
       self._managed_etcd_endpoint = None
       self._server_started = False
       self._server_port = -1
   ```



##########
sdks/python/apache_beam/ml/inference/vllm_inference.py:
##########
@@ -109,46 +111,214 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI:
   )
 
 
+# Embedded Dynamo runtime defaults proven on the smoke test: etcd discovery,
+# TCP request plane, ZMQ event plane, KV events disabled. KV-aware routing,
+# disaggregated prefill/decode, and the Planner are not active in this mode.
+_DYNAMO_FRONTEND_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+    'discovery-backend': 'etcd',
+    'request-plane': 'tcp',
+    'event-plane': 'zmq',
+    'router-mode': 'round-robin',
+    'no-router-kv-events': None,
+}
+
+_DYNAMO_ENGINE_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+    'discovery-backend': 'etcd',
+    'request-plane': 'tcp',
+    'event-plane': 'zmq',
+    'kv-events-config': '{"enable_kv_cache_events": false}',
+}
+
+
+def _append_kwargs(cmd: list[str], kwargs: dict[str, Optional[str]]) -> None:
+  for k, v in kwargs.items():
+    cmd.append(f'--{k}')
+    # Only add values for commands with value part.
+    if v is not None:
+      cmd.append(v)
+
+
+def _uses_etcd_discovery(kwargs: dict[str, Optional[str]]) -> bool:
+  return kwargs.get('discovery-backend') == 'etcd'
+
+
 class _VLLMModelServer():
-  def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]):
+  def __init__(
+      self,
+      model_name: str,
+      vllm_server_kwargs: dict[str, Optional[str]],
+      dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None,
+      use_dynamo: bool = False):
     self._model_name = model_name
     self._vllm_server_kwargs = vllm_server_kwargs
+    self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
     self._server_started = False
     self._server_process = None
+    self._dynamo_process = None
+    self._etcd_process = None
+    self._managed_etcd_endpoint = None
     self._server_port: int = -1
     self._server_process_lock = threading.RLock()
+    self._use_dynamo = use_dynamo
 
     self.start_server()
 
+  @staticmethod
+  def _stop_process(process: Optional[subprocess.Popen]) -> None:
+    if process is None or process.poll() is not None:
+      return
+    process.terminate()
+    try:
+      process.wait(timeout=10)
+    except subprocess.TimeoutExpired:
+      process.kill()
+      process.wait()
+
+  def _stop_processes(self) -> None:
+    self._stop_process(self._dynamo_process)
+    self._stop_process(self._server_process)
+    self._stop_process(self._etcd_process)
+    if (self._managed_etcd_endpoint is not None and
+        os.environ.get('ETCD_ENDPOINTS') == self._managed_etcd_endpoint):
+      del os.environ['ETCD_ENDPOINTS']
+    self._dynamo_process = None
+    self._server_process = None
+    self._etcd_process = None
+    self._managed_etcd_endpoint = None
+    self._server_started = False
+    self._server_port = -1
+
+  def _process_status(self) -> str:
+    process_status = []
+    if self._server_process is not None:
+      process_status.append(
+          'frontend/server exit code: %s' % self._server_process.poll())
+    if self._dynamo_process is not None:
+      process_status.append(
+          'dynamo worker exit code: %s' % self._dynamo_process.poll())
+    if self._etcd_process is not None:
+      process_status.append('etcd exit code: %s' % self._etcd_process.poll())
+    return ', '.join(process_status) or 'no process status available'
+
+  def __del__(self):
+    self._stop_processes()
+
+  def _uses_embedded_etcd(self) -> bool:
+    return (
+        self._use_dynamo and
+        _uses_etcd_discovery(self._dynamo_frontend_kwargs) and
+        _uses_etcd_discovery(self._vllm_server_kwargs) and
+        'ETCD_ENDPOINTS' not in os.environ)
+
+  def _wait_for_etcd(self, endpoint: str, timeout_secs=30) -> None:
+    deadline = time.time() + timeout_secs
+    health_url = endpoint.rstrip('/') + '/health'
+    while time.time() < deadline and self._etcd_process.poll() is None:
+      try:
+        with urllib.request.urlopen(health_url, timeout=2) as response:
+          if response.status < 500:
+            return
+      except Exception:  # pylint: disable=broad-except
+        time.sleep(1)
+
+    process_status = self._process_status()
+    self._stop_processes()
+    raise RuntimeError(
+        "Failed to start embedded etcd for Dynamo. Process status: "
+        f"{process_status}. Install etcd in the worker container or set "
+        "ETCD_ENDPOINTS to an external etcd service.")
+
+  def _ensure_etcd(self) -> None:
+    if not self._uses_embedded_etcd():
+      return
+    if shutil.which('etcd') is None:
+      raise RuntimeError(
+          "Embedded Dynamo mode requires etcd when ETCD_ENDPOINTS is not "
+          "set. Install etcd in the worker container or set ETCD_ENDPOINTS "
+          "to an external etcd service.")
+
+    etcd_name = f'beam-dynamo-etcd-{uuid.uuid4().hex}'
+    etcd_data_dir = f'/tmp/{etcd_name}'
+    peer_port, = subprocess_server.pick_port(None)
+    etcd_cmd = [
+        'etcd',
+        '--name',
+        etcd_name,
+        '--listen-client-urls',
+        'http://127.0.0.1:{{PORT}}',
+        '--advertise-client-urls',
+        'http://127.0.0.1:{{PORT}}',
+        '--listen-peer-urls',
+        f'http://127.0.0.1:{peer_port}',
+        '--initial-advertise-peer-urls',
+        f'http://127.0.0.1:{peer_port}',
+        '--initial-cluster',
+        f'{etcd_name}=http://127.0.0.1:{peer_port}',
+        '--data-dir',
+        etcd_data_dir,
+        '--log-level',
+        'warn',
+    ]

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Store the temporary etcd data directory path in `self._etcd_data_dir` 
instead of a local variable so that it can be cleaned up when stopping the 
processes.
   
   ```python
       etcd_name = f'beam-dynamo-etcd-{uuid.uuid4().hex}'
       self._etcd_data_dir = f'/tmp/{etcd_name}'
       peer_port, = subprocess_server.pick_port(None)
       etcd_cmd = [
           'etcd',
           '--name',
           etcd_name,
           '--listen-client-urls',
           'http://127.0.0.1:{{PORT}}',
           '--advertise-client-urls',
           'http://127.0.0.1:{{PORT}}',
           '--listen-peer-urls',
           f'http://127.0.0.1:{peer_port}',
           '--initial-advertise-peer-urls',
           f'http://127.0.0.1:{peer_port}',
           '--initial-cluster',
           f'{etcd_name}=http://127.0.0.1:{peer_port}',
           '--data-dir',
           self._etcd_data_dir,
           '--log-level',
           'warn',
       ]
   ```



##########
sdks/python/apache_beam/ml/inference/vllm_inference.py:
##########
@@ -109,46 +111,214 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI:
   )
 
 
+# Embedded Dynamo runtime defaults proven on the smoke test: etcd discovery,
+# TCP request plane, ZMQ event plane, KV events disabled. KV-aware routing,
+# disaggregated prefill/decode, and the Planner are not active in this mode.
+_DYNAMO_FRONTEND_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+    'discovery-backend': 'etcd',
+    'request-plane': 'tcp',
+    'event-plane': 'zmq',
+    'router-mode': 'round-robin',
+    'no-router-kv-events': None,
+}
+
+_DYNAMO_ENGINE_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+    'discovery-backend': 'etcd',
+    'request-plane': 'tcp',
+    'event-plane': 'zmq',
+    'kv-events-config': '{"enable_kv_cache_events": false}',
+}
+
+
+def _append_kwargs(cmd: list[str], kwargs: dict[str, Optional[str]]) -> None:
+  for k, v in kwargs.items():
+    cmd.append(f'--{k}')
+    # Only add values for commands with value part.
+    if v is not None:
+      cmd.append(v)
+
+
+def _uses_etcd_discovery(kwargs: dict[str, Optional[str]]) -> bool:
+  return kwargs.get('discovery-backend') == 'etcd'
+
+
 class _VLLMModelServer():
-  def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]):
+  def __init__(
+      self,
+      model_name: str,
+      vllm_server_kwargs: dict[str, Optional[str]],
+      dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None,
+      use_dynamo: bool = False):
     self._model_name = model_name
     self._vllm_server_kwargs = vllm_server_kwargs
+    self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
     self._server_started = False
     self._server_process = None
+    self._dynamo_process = None
+    self._etcd_process = None
+    self._managed_etcd_endpoint = None
     self._server_port: int = -1
     self._server_process_lock = threading.RLock()
+    self._use_dynamo = use_dynamo
 
     self.start_server()
 
+  @staticmethod
+  def _stop_process(process: Optional[subprocess.Popen]) -> None:
+    if process is None or process.poll() is not None:
+      return
+    process.terminate()
+    try:
+      process.wait(timeout=10)
+    except subprocess.TimeoutExpired:
+      process.kill()
+      process.wait()

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Wrap the process termination sequence in a `try...except OSError` block. If 
a process exits concurrently between the `poll()` check and 
`terminate()`/`kill()`, calling these methods can raise an `OSError` (such as 
`ProcessLookupError`), which would crash the cleanup sequence and leave other 
processes running.
   
   ```suggestion
     @staticmethod
     def _stop_process(process: Optional[subprocess.Popen]) -> None:
       if process is None or process.poll() is not None:
         return
       try:
         process.terminate()
         try:
           process.wait(timeout=10)
         except subprocess.TimeoutExpired:
           process.kill()
           process.wait()
       except OSError:
         pass
   ```



##########
sdks/python/apache_beam/ml/inference/vllm_inference.py:
##########
@@ -109,46 +111,214 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI:
   )
 
 
+# Embedded Dynamo runtime defaults proven on the smoke test: etcd discovery,
+# TCP request plane, ZMQ event plane, KV events disabled. KV-aware routing,
+# disaggregated prefill/decode, and the Planner are not active in this mode.
+_DYNAMO_FRONTEND_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+    'discovery-backend': 'etcd',
+    'request-plane': 'tcp',
+    'event-plane': 'zmq',
+    'router-mode': 'round-robin',
+    'no-router-kv-events': None,
+}
+
+_DYNAMO_ENGINE_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+    'discovery-backend': 'etcd',
+    'request-plane': 'tcp',
+    'event-plane': 'zmq',
+    'kv-events-config': '{"enable_kv_cache_events": false}',
+}
+
+
+def _append_kwargs(cmd: list[str], kwargs: dict[str, Optional[str]]) -> None:
+  for k, v in kwargs.items():
+    cmd.append(f'--{k}')
+    # Only add values for commands with value part.
+    if v is not None:
+      cmd.append(v)
+
+
+def _uses_etcd_discovery(kwargs: dict[str, Optional[str]]) -> bool:
+  return kwargs.get('discovery-backend') == 'etcd'
+
+
 class _VLLMModelServer():
-  def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]):
+  def __init__(
+      self,
+      model_name: str,
+      vllm_server_kwargs: dict[str, Optional[str]],
+      dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None,
+      use_dynamo: bool = False):
     self._model_name = model_name
     self._vllm_server_kwargs = vllm_server_kwargs
+    self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
     self._server_started = False
     self._server_process = None
+    self._dynamo_process = None
+    self._etcd_process = None
+    self._managed_etcd_endpoint = None
     self._server_port: int = -1
     self._server_process_lock = threading.RLock()
+    self._use_dynamo = use_dynamo
 
     self.start_server()
 
+  @staticmethod
+  def _stop_process(process: Optional[subprocess.Popen]) -> None:
+    if process is None or process.poll() is not None:
+      return
+    process.terminate()
+    try:
+      process.wait(timeout=10)
+    except subprocess.TimeoutExpired:
+      process.kill()
+      process.wait()
+
+  def _stop_processes(self) -> None:
+    self._stop_process(self._dynamo_process)
+    self._stop_process(self._server_process)
+    self._stop_process(self._etcd_process)
+    if (self._managed_etcd_endpoint is not None and
+        os.environ.get('ETCD_ENDPOINTS') == self._managed_etcd_endpoint):
+      del os.environ['ETCD_ENDPOINTS']
+    self._dynamo_process = None
+    self._server_process = None
+    self._etcd_process = None
+    self._managed_etcd_endpoint = None
+    self._server_started = False
+    self._server_port = -1
+
+  def _process_status(self) -> str:
+    process_status = []
+    if self._server_process is not None:
+      process_status.append(
+          'frontend/server exit code: %s' % self._server_process.poll())
+    if self._dynamo_process is not None:
+      process_status.append(
+          'dynamo worker exit code: %s' % self._dynamo_process.poll())
+    if self._etcd_process is not None:
+      process_status.append('etcd exit code: %s' % self._etcd_process.poll())
+    return ', '.join(process_status) or 'no process status available'
+
+  def __del__(self):
+    self._stop_processes()
+
+  def _uses_embedded_etcd(self) -> bool:
+    return (
+        self._use_dynamo and
+        _uses_etcd_discovery(self._dynamo_frontend_kwargs) and
+        _uses_etcd_discovery(self._vllm_server_kwargs) and
+        'ETCD_ENDPOINTS' not in os.environ)
+
+  def _wait_for_etcd(self, endpoint: str, timeout_secs=30) -> None:
+    deadline = time.time() + timeout_secs
+    health_url = endpoint.rstrip('/') + '/health'
+    while time.time() < deadline and self._etcd_process.poll() is None:
+      try:
+        with urllib.request.urlopen(health_url, timeout=2) as response:
+          if response.status < 500:
+            return
+      except Exception:  # pylint: disable=broad-except
+        time.sleep(1)
+
+    process_status = self._process_status()
+    self._stop_processes()
+    raise RuntimeError(
+        "Failed to start embedded etcd for Dynamo. Process status: "
+        f"{process_status}. Install etcd in the worker container or set "
+        "ETCD_ENDPOINTS to an external etcd service.")
+
+  def _ensure_etcd(self) -> None:
+    if not self._uses_embedded_etcd():
+      return
+    if shutil.which('etcd') is None:
+      raise RuntimeError(
+          "Embedded Dynamo mode requires etcd when ETCD_ENDPOINTS is not "
+          "set. Install etcd in the worker container or set ETCD_ENDPOINTS "
+          "to an external etcd service.")
+
+    etcd_name = f'beam-dynamo-etcd-{uuid.uuid4().hex}'
+    etcd_data_dir = f'/tmp/{etcd_name}'
+    peer_port, = subprocess_server.pick_port(None)
+    etcd_cmd = [
+        'etcd',
+        '--name',
+        etcd_name,
+        '--listen-client-urls',
+        'http://127.0.0.1:{{PORT}}',
+        '--advertise-client-urls',
+        'http://127.0.0.1:{{PORT}}',
+        '--listen-peer-urls',
+        f'http://127.0.0.1:{peer_port}',
+        '--initial-advertise-peer-urls',
+        f'http://127.0.0.1:{peer_port}',
+        '--initial-cluster',
+        f'{etcd_name}=http://127.0.0.1:{peer_port}',
+        '--data-dir',
+        etcd_data_dir,
+        '--log-level',
+        'warn',
+    ]
+    self._etcd_process, etcd_port = start_process(etcd_cmd)
+    endpoint = f'http://127.0.0.1:{etcd_port}'
+    os.environ['ETCD_ENDPOINTS'] = endpoint
+    self._managed_etcd_endpoint = endpoint
+    self._wait_for_etcd(endpoint)
+
   def start_server(self, retries=3):
     with self._server_process_lock:
       if not self._server_started:
-        server_cmd = [
-            sys.executable,
-            '-m',
-            'vllm.entrypoints.openai.api_server',
-            '--model',
-            self._model_name,
-            '--port',
-            '{{PORT}}',
-        ]
-        for k, v in self._vllm_server_kwargs.items():
-          server_cmd.append(f'--{k}')
-          # Only add values for commands with value part.
-          if v is not None:
-            server_cmd.append(v)
+        self._stop_processes()
+        self._ensure_etcd()
+        if self._use_dynamo:
+          # Dynamo embedded mode uses the frontend as its OpenAI-compatible
+          # local endpoint and a separate vLLM worker process.
+          server_cmd = [
+              sys.executable,
+              '-m',
+              'dynamo.frontend',
+              '--http-port',
+              '{{PORT}}',
+          ]
+          _append_kwargs(server_cmd, self._dynamo_frontend_kwargs)
+        else:
+          server_cmd = [
+              sys.executable,
+              '-m',
+              'vllm.entrypoints.openai.api_server',
+              '--model',
+              self._model_name,
+              '--port',
+              '{{PORT}}',
+          ]
+          _append_kwargs(server_cmd, self._vllm_server_kwargs)
         self._server_process, self._server_port = start_process(server_cmd)
 
+        if self._use_dynamo:
+          server_cmd = [
+              sys.executable,
+              '-m',
+              'dynamo.vllm',
+              '--model',
+              self._model_name,
+          ]
+          _append_kwargs(server_cmd, self._vllm_server_kwargs)
+          self._dynamo_process, _ = start_process(server_cmd)
+
       self.check_connectivity(retries)
 
   def get_server_port(self) -> int:
     if not self._server_started:
       self.start_server()
     return self._server_port
 
-  def check_connectivity(self, retries=3):
+  def check_connectivity(self, retries=3, timeout_secs=600):
+    start_time = time.time()
     with getVLLMClient(self._server_port) as client:
-      while self._server_process.poll() is None:
+      while (time.time() - start_time < timeout_secs and
+             self._server_process.poll() is None and
+             (self._dynamo_process is None or
+              self._dynamo_process.poll() is None)):

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Add a check for `self._etcd_process` in the connectivity polling loop. If 
the embedded etcd process dies, the frontend and engine won't be able to 
communicate, and the server will fail. Checking it here allows the loop to 
fail-fast instead of waiting for the full 10-minute timeout.
   
   ```python
         while (time.time() - start_time < timeout_secs and
                self._server_process.poll() is None and
                (self._dynamo_process is None or
                 self._dynamo_process.poll() is None) and
                (self._etcd_process is None or
                 self._etcd_process.poll() is None)):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to