akshayjadiyanv commented on code in PR #38701:
URL: https://github.com/apache/beam/pull/38701#discussion_r3308624581
##########
sdks/python/apache_beam/ml/inference/vllm_inference.py:
##########
@@ -109,46 +111,214 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI:
)
+# Embedded Dynamo runtime defaults proven on the smoke test: etcd discovery,
+# TCP request plane, ZMQ event plane, KV events disabled. KV-aware routing,
+# disaggregated prefill/decode, and the Planner are not active in this mode.
+_DYNAMO_FRONTEND_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'router-mode': 'round-robin',
+ 'no-router-kv-events': None,
+}
+
+_DYNAMO_ENGINE_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'kv-events-config': '{"enable_kv_cache_events": false}',
+}
+
+
+def _append_kwargs(cmd: list[str], kwargs: dict[str, Optional[str]]) -> None:
+ for k, v in kwargs.items():
+ cmd.append(f'--{k}')
+ # Only add values for commands with value part.
+ if v is not None:
+ cmd.append(v)
+
+
+def _uses_etcd_discovery(kwargs: dict[str, Optional[str]]) -> bool:
+ return kwargs.get('discovery-backend') == 'etcd'
+
+
class _VLLMModelServer():
- def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]):
+ def __init__(
+ self,
+ model_name: str,
+ vllm_server_kwargs: dict[str, Optional[str]],
+ dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None,
+ use_dynamo: bool = False):
self._model_name = model_name
self._vllm_server_kwargs = vllm_server_kwargs
+ self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
self._server_started = False
self._server_process = None
+ self._dynamo_process = None
+ self._etcd_process = None
+ self._managed_etcd_endpoint = None
self._server_port: int = -1
self._server_process_lock = threading.RLock()
+ self._use_dynamo = use_dynamo
Review Comment:
Done in 329dd286e — `self._etcd_data_dir: Optional[str] = None` added next
to the other process handles in `__init__`.
##########
sdks/python/apache_beam/ml/inference/vllm_inference.py:
##########
@@ -109,46 +111,214 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI:
)
+# Embedded Dynamo runtime defaults proven on the smoke test: etcd discovery,
+# TCP request plane, ZMQ event plane, KV events disabled. KV-aware routing,
+# disaggregated prefill/decode, and the Planner are not active in this mode.
+_DYNAMO_FRONTEND_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'router-mode': 'round-robin',
+ 'no-router-kv-events': None,
+}
+
+_DYNAMO_ENGINE_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'kv-events-config': '{"enable_kv_cache_events": false}',
+}
+
+
+def _append_kwargs(cmd: list[str], kwargs: dict[str, Optional[str]]) -> None:
+ for k, v in kwargs.items():
+ cmd.append(f'--{k}')
+ # Only add values for commands with value part.
+ if v is not None:
+ cmd.append(v)
+
+
+def _uses_etcd_discovery(kwargs: dict[str, Optional[str]]) -> bool:
+ return kwargs.get('discovery-backend') == 'etcd'
+
+
class _VLLMModelServer():
- def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]):
+ def __init__(
+ self,
+ model_name: str,
+ vllm_server_kwargs: dict[str, Optional[str]],
+ dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None,
+ use_dynamo: bool = False):
self._model_name = model_name
self._vllm_server_kwargs = vllm_server_kwargs
+ self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
self._server_started = False
self._server_process = None
+ self._dynamo_process = None
+ self._etcd_process = None
+ self._managed_etcd_endpoint = None
self._server_port: int = -1
self._server_process_lock = threading.RLock()
+ self._use_dynamo = use_dynamo
self.start_server()
+ @staticmethod
+ def _stop_process(process: Optional[subprocess.Popen]) -> None:
+ if process is None or process.poll() is not None:
+ return
+ process.terminate()
+ try:
+ process.wait(timeout=10)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ process.wait()
Review Comment:
Done in 329dd286e — the `terminate` / `wait` / `kill` calls are now wrapped
in a single `try/except OSError` so a `ProcessLookupError` (or any other
OS-level race) is swallowed instead of aborting the rest of `_stop_processes`.
##########
sdks/python/apache_beam/ml/inference/vllm_inference.py:
##########
@@ -109,46 +111,214 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI:
)
+# Embedded Dynamo runtime defaults proven on the smoke test: etcd discovery,
+# TCP request plane, ZMQ event plane, KV events disabled. KV-aware routing,
+# disaggregated prefill/decode, and the Planner are not active in this mode.
+_DYNAMO_FRONTEND_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'router-mode': 'round-robin',
+ 'no-router-kv-events': None,
+}
+
+_DYNAMO_ENGINE_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'kv-events-config': '{"enable_kv_cache_events": false}',
+}
+
+
+def _append_kwargs(cmd: list[str], kwargs: dict[str, Optional[str]]) -> None:
+ for k, v in kwargs.items():
+ cmd.append(f'--{k}')
+ # Only add values for commands with value part.
+ if v is not None:
+ cmd.append(v)
+
+
+def _uses_etcd_discovery(kwargs: dict[str, Optional[str]]) -> bool:
+ return kwargs.get('discovery-backend') == 'etcd'
+
+
class _VLLMModelServer():
- def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]):
+ def __init__(
+ self,
+ model_name: str,
+ vllm_server_kwargs: dict[str, Optional[str]],
+ dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None,
+ use_dynamo: bool = False):
self._model_name = model_name
self._vllm_server_kwargs = vllm_server_kwargs
+ self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
self._server_started = False
self._server_process = None
+ self._dynamo_process = None
+ self._etcd_process = None
+ self._managed_etcd_endpoint = None
self._server_port: int = -1
self._server_process_lock = threading.RLock()
+ self._use_dynamo = use_dynamo
self.start_server()
+ @staticmethod
+ def _stop_process(process: Optional[subprocess.Popen]) -> None:
+ if process is None or process.poll() is not None:
+ return
+ process.terminate()
+ try:
+ process.wait(timeout=10)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ process.wait()
+
+ def _stop_processes(self) -> None:
+ self._stop_process(self._dynamo_process)
+ self._stop_process(self._server_process)
+ self._stop_process(self._etcd_process)
+ if (self._managed_etcd_endpoint is not None and
+ os.environ.get('ETCD_ENDPOINTS') == self._managed_etcd_endpoint):
+ del os.environ['ETCD_ENDPOINTS']
+ self._dynamo_process = None
+ self._server_process = None
+ self._etcd_process = None
+ self._managed_etcd_endpoint = None
+ self._server_started = False
+ self._server_port = -1
Review Comment:
Mostly accepted in 329dd286e:
* `shutil.rmtree(self._etcd_data_dir, ignore_errors=True)` is now called
from `_stop_processes`, so worker restarts no longer leak
`/tmp/beam-dynamo-etcd-*` dirs.
* `del os.environ['ETCD_ENDPOINTS']` was switched to
`os.environ.pop('ETCD_ENDPOINTS', None)` so the cleanup is idempotent.
I skipped the `local_os`/`local_shutil` rebinding inside `_stop_processes`.
The shutdown-safety concern is real, but `os`/`shutil` are already used
unguarded throughout the rest of this class (`_uses_embedded_etcd`,
`_ensure_etcd`, `_wait_for_etcd`), so threading the local-imports pattern
through only `_stop_processes` is inconsistent and a bit hard to read. Instead
I wrapped `__del__` itself in `try/except Exception` — that gives the same
finalizer-during-shutdown guarantee with a single guard and zero churn
elsewhere.
##########
sdks/python/apache_beam/ml/inference/vllm_inference.py:
##########
@@ -109,46 +111,214 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI:
)
+# Embedded Dynamo runtime defaults proven on the smoke test: etcd discovery,
+# TCP request plane, ZMQ event plane, KV events disabled. KV-aware routing,
+# disaggregated prefill/decode, and the Planner are not active in this mode.
+_DYNAMO_FRONTEND_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'router-mode': 'round-robin',
+ 'no-router-kv-events': None,
+}
+
+_DYNAMO_ENGINE_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'kv-events-config': '{"enable_kv_cache_events": false}',
+}
+
+
+def _append_kwargs(cmd: list[str], kwargs: dict[str, Optional[str]]) -> None:
+ for k, v in kwargs.items():
+ cmd.append(f'--{k}')
+ # Only add values for commands with value part.
+ if v is not None:
+ cmd.append(v)
+
+
+def _uses_etcd_discovery(kwargs: dict[str, Optional[str]]) -> bool:
+ return kwargs.get('discovery-backend') == 'etcd'
+
+
class _VLLMModelServer():
- def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]):
+ def __init__(
+ self,
+ model_name: str,
+ vllm_server_kwargs: dict[str, Optional[str]],
+ dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None,
+ use_dynamo: bool = False):
self._model_name = model_name
self._vllm_server_kwargs = vllm_server_kwargs
+ self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
self._server_started = False
self._server_process = None
+ self._dynamo_process = None
+ self._etcd_process = None
+ self._managed_etcd_endpoint = None
self._server_port: int = -1
self._server_process_lock = threading.RLock()
+ self._use_dynamo = use_dynamo
self.start_server()
+ @staticmethod
+ def _stop_process(process: Optional[subprocess.Popen]) -> None:
+ if process is None or process.poll() is not None:
+ return
+ process.terminate()
+ try:
+ process.wait(timeout=10)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ process.wait()
+
+ def _stop_processes(self) -> None:
+ self._stop_process(self._dynamo_process)
+ self._stop_process(self._server_process)
+ self._stop_process(self._etcd_process)
+ if (self._managed_etcd_endpoint is not None and
+ os.environ.get('ETCD_ENDPOINTS') == self._managed_etcd_endpoint):
+ del os.environ['ETCD_ENDPOINTS']
+ self._dynamo_process = None
+ self._server_process = None
+ self._etcd_process = None
+ self._managed_etcd_endpoint = None
+ self._server_started = False
+ self._server_port = -1
+
+ def _process_status(self) -> str:
+ process_status = []
+ if self._server_process is not None:
+ process_status.append(
+ 'frontend/server exit code: %s' % self._server_process.poll())
+ if self._dynamo_process is not None:
+ process_status.append(
+ 'dynamo worker exit code: %s' % self._dynamo_process.poll())
+ if self._etcd_process is not None:
+ process_status.append('etcd exit code: %s' % self._etcd_process.poll())
+ return ', '.join(process_status) or 'no process status available'
+
+ def __del__(self):
+ self._stop_processes()
+
+ def _uses_embedded_etcd(self) -> bool:
+ return (
+ self._use_dynamo and
+ _uses_etcd_discovery(self._dynamo_frontend_kwargs) and
+ _uses_etcd_discovery(self._vllm_server_kwargs) and
+ 'ETCD_ENDPOINTS' not in os.environ)
+
+ def _wait_for_etcd(self, endpoint: str, timeout_secs=30) -> None:
+ deadline = time.time() + timeout_secs
+ health_url = endpoint.rstrip('/') + '/health'
+ while time.time() < deadline and self._etcd_process.poll() is None:
+ try:
+ with urllib.request.urlopen(health_url, timeout=2) as response:
+ if response.status < 500:
+ return
+ except Exception: # pylint: disable=broad-except
+ time.sleep(1)
+
+ process_status = self._process_status()
+ self._stop_processes()
+ raise RuntimeError(
+ "Failed to start embedded etcd for Dynamo. Process status: "
+ f"{process_status}. Install etcd in the worker container or set "
+ "ETCD_ENDPOINTS to an external etcd service.")
+
+ def _ensure_etcd(self) -> None:
+ if not self._uses_embedded_etcd():
+ return
+ if shutil.which('etcd') is None:
+ raise RuntimeError(
+ "Embedded Dynamo mode requires etcd when ETCD_ENDPOINTS is not "
+ "set. Install etcd in the worker container or set ETCD_ENDPOINTS "
+ "to an external etcd service.")
+
+ etcd_name = f'beam-dynamo-etcd-{uuid.uuid4().hex}'
+ etcd_data_dir = f'/tmp/{etcd_name}'
+ peer_port, = subprocess_server.pick_port(None)
+ etcd_cmd = [
+ 'etcd',
+ '--name',
+ etcd_name,
+ '--listen-client-urls',
+ 'http://127.0.0.1:{{PORT}}',
+ '--advertise-client-urls',
+ 'http://127.0.0.1:{{PORT}}',
+ '--listen-peer-urls',
+ f'http://127.0.0.1:{peer_port}',
+ '--initial-advertise-peer-urls',
+ f'http://127.0.0.1:{peer_port}',
+ '--initial-cluster',
+ f'{etcd_name}=http://127.0.0.1:{peer_port}',
+ '--data-dir',
+ etcd_data_dir,
+ '--log-level',
+ 'warn',
+ ]
Review Comment:
Done in 329dd286e — `_ensure_etcd` now writes the temp directory to
`self._etcd_data_dir` and `_stop_processes` is the one place that owns the
rmtree.
##########
sdks/python/apache_beam/ml/inference/vllm_inference.py:
##########
@@ -109,46 +111,214 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI:
)
+# Embedded Dynamo runtime defaults proven on the smoke test: etcd discovery,
+# TCP request plane, ZMQ event plane, KV events disabled. KV-aware routing,
+# disaggregated prefill/decode, and the Planner are not active in this mode.
+_DYNAMO_FRONTEND_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'router-mode': 'round-robin',
+ 'no-router-kv-events': None,
+}
+
+_DYNAMO_ENGINE_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'kv-events-config': '{"enable_kv_cache_events": false}',
+}
+
+
+def _append_kwargs(cmd: list[str], kwargs: dict[str, Optional[str]]) -> None:
+ for k, v in kwargs.items():
+ cmd.append(f'--{k}')
+ # Only add values for commands with value part.
+ if v is not None:
+ cmd.append(v)
+
+
+def _uses_etcd_discovery(kwargs: dict[str, Optional[str]]) -> bool:
+ return kwargs.get('discovery-backend') == 'etcd'
+
+
class _VLLMModelServer():
- def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]):
+ def __init__(
+ self,
+ model_name: str,
+ vllm_server_kwargs: dict[str, Optional[str]],
+ dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None,
+ use_dynamo: bool = False):
self._model_name = model_name
self._vllm_server_kwargs = vllm_server_kwargs
+ self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
self._server_started = False
self._server_process = None
+ self._dynamo_process = None
+ self._etcd_process = None
+ self._managed_etcd_endpoint = None
self._server_port: int = -1
self._server_process_lock = threading.RLock()
+ self._use_dynamo = use_dynamo
self.start_server()
+ @staticmethod
+ def _stop_process(process: Optional[subprocess.Popen]) -> None:
+ if process is None or process.poll() is not None:
+ return
+ process.terminate()
+ try:
+ process.wait(timeout=10)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ process.wait()
+
+ def _stop_processes(self) -> None:
+ self._stop_process(self._dynamo_process)
+ self._stop_process(self._server_process)
+ self._stop_process(self._etcd_process)
+ if (self._managed_etcd_endpoint is not None and
+ os.environ.get('ETCD_ENDPOINTS') == self._managed_etcd_endpoint):
+ del os.environ['ETCD_ENDPOINTS']
+ self._dynamo_process = None
+ self._server_process = None
+ self._etcd_process = None
+ self._managed_etcd_endpoint = None
+ self._server_started = False
+ self._server_port = -1
+
+ def _process_status(self) -> str:
+ process_status = []
+ if self._server_process is not None:
+ process_status.append(
+ 'frontend/server exit code: %s' % self._server_process.poll())
+ if self._dynamo_process is not None:
+ process_status.append(
+ 'dynamo worker exit code: %s' % self._dynamo_process.poll())
+ if self._etcd_process is not None:
+ process_status.append('etcd exit code: %s' % self._etcd_process.poll())
+ return ', '.join(process_status) or 'no process status available'
+
+ def __del__(self):
+ self._stop_processes()
+
+ def _uses_embedded_etcd(self) -> bool:
+ return (
+ self._use_dynamo and
+ _uses_etcd_discovery(self._dynamo_frontend_kwargs) and
+ _uses_etcd_discovery(self._vllm_server_kwargs) and
+ 'ETCD_ENDPOINTS' not in os.environ)
+
+ def _wait_for_etcd(self, endpoint: str, timeout_secs=30) -> None:
+ deadline = time.time() + timeout_secs
+ health_url = endpoint.rstrip('/') + '/health'
+ while time.time() < deadline and self._etcd_process.poll() is None:
+ try:
+ with urllib.request.urlopen(health_url, timeout=2) as response:
+ if response.status < 500:
+ return
+ except Exception: # pylint: disable=broad-except
+ time.sleep(1)
+
+ process_status = self._process_status()
+ self._stop_processes()
+ raise RuntimeError(
+ "Failed to start embedded etcd for Dynamo. Process status: "
+ f"{process_status}. Install etcd in the worker container or set "
+ "ETCD_ENDPOINTS to an external etcd service.")
+
+ def _ensure_etcd(self) -> None:
+ if not self._uses_embedded_etcd():
+ return
+ if shutil.which('etcd') is None:
+ raise RuntimeError(
+ "Embedded Dynamo mode requires etcd when ETCD_ENDPOINTS is not "
+ "set. Install etcd in the worker container or set ETCD_ENDPOINTS "
+ "to an external etcd service.")
+
+ etcd_name = f'beam-dynamo-etcd-{uuid.uuid4().hex}'
+ etcd_data_dir = f'/tmp/{etcd_name}'
+ peer_port, = subprocess_server.pick_port(None)
+ etcd_cmd = [
+ 'etcd',
+ '--name',
+ etcd_name,
+ '--listen-client-urls',
+ 'http://127.0.0.1:{{PORT}}',
+ '--advertise-client-urls',
+ 'http://127.0.0.1:{{PORT}}',
+ '--listen-peer-urls',
+ f'http://127.0.0.1:{peer_port}',
+ '--initial-advertise-peer-urls',
+ f'http://127.0.0.1:{peer_port}',
+ '--initial-cluster',
+ f'{etcd_name}=http://127.0.0.1:{peer_port}',
+ '--data-dir',
+ etcd_data_dir,
+ '--log-level',
+ 'warn',
+ ]
+ self._etcd_process, etcd_port = start_process(etcd_cmd)
+ endpoint = f'http://127.0.0.1:{etcd_port}'
+ os.environ['ETCD_ENDPOINTS'] = endpoint
+ self._managed_etcd_endpoint = endpoint
+ self._wait_for_etcd(endpoint)
+
def start_server(self, retries=3):
with self._server_process_lock:
if not self._server_started:
- server_cmd = [
- sys.executable,
- '-m',
- 'vllm.entrypoints.openai.api_server',
- '--model',
- self._model_name,
- '--port',
- '{{PORT}}',
- ]
- for k, v in self._vllm_server_kwargs.items():
- server_cmd.append(f'--{k}')
- # Only add values for commands with value part.
- if v is not None:
- server_cmd.append(v)
+ self._stop_processes()
+ self._ensure_etcd()
+ if self._use_dynamo:
+ # Dynamo embedded mode uses the frontend as its OpenAI-compatible
+ # local endpoint and a separate vLLM worker process.
+ server_cmd = [
+ sys.executable,
+ '-m',
+ 'dynamo.frontend',
+ '--http-port',
+ '{{PORT}}',
+ ]
+ _append_kwargs(server_cmd, self._dynamo_frontend_kwargs)
+ else:
+ server_cmd = [
+ sys.executable,
+ '-m',
+ 'vllm.entrypoints.openai.api_server',
+ '--model',
+ self._model_name,
+ '--port',
+ '{{PORT}}',
+ ]
+ _append_kwargs(server_cmd, self._vllm_server_kwargs)
self._server_process, self._server_port = start_process(server_cmd)
+ if self._use_dynamo:
+ server_cmd = [
+ sys.executable,
+ '-m',
+ 'dynamo.vllm',
+ '--model',
+ self._model_name,
+ ]
+ _append_kwargs(server_cmd, self._vllm_server_kwargs)
+ self._dynamo_process, _ = start_process(server_cmd)
+
self.check_connectivity(retries)
def get_server_port(self) -> int:
if not self._server_started:
self.start_server()
return self._server_port
- def check_connectivity(self, retries=3):
+ def check_connectivity(self, retries=3, timeout_secs=600):
+ start_time = time.time()
with getVLLMClient(self._server_port) as client:
- while self._server_process.poll() is None:
+ while (time.time() - start_time < timeout_secs and
+ self._server_process.poll() is None and
+ (self._dynamo_process is None or
+ self._dynamo_process.poll() is None)):
Review Comment:
Done in 329dd286e — the `check_connectivity` loop now also exits early if
`self._etcd_process.poll() is not None`, so a dead embedded etcd fails fast
instead of waiting out the 10-minute `timeout_secs`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]